將隊列中的作業調度到多個執行緒
我有一個函式必須處理一組目錄中的所有文件(5-300 個文件之間的任何文件)。要使用的並行執行緒數由使用者指定(通常為 4)。這個想法是在 4 個單獨的執行緒中啟動函式。當一個執行緒返回時,我必須開始處理下一個(第 5 個)文件,依此類推,直到所有文件都完成。
在 Windows 上,
WaitForMultipleObjects()
這裡bWaitAll=False
可以幫助我。我有一個可以填充並填充到數組中的結構map<UINT, string>::iterator iter = m_FileList.begin(); string outputPath = GetOutputPath(); void ***threadArgs = (void***)malloc(sizeof(void**)*numThreads); HANDLE *hdl = (HANDLE*)malloc(sizeof(HANDLE)*numThreads); DWORD *thr = (DWORD*)malloc(sizeof(DWORD)*numThreads); for (int t = 0; iter != m_FileList.end() && t < numThreads; t++, iter++) { threadArgs[t] = prepThreadData(t, iter->second, opPath); printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]); hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]); if (hdl[t] == NULL) { err = GetLastError(); printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]); } } for (;iter != m_FileList.end(); iter++) { int t = (int)WaitForMultipleObjects(numThreads, hdl, FALSE, INFINITE); if (t == WAIT_FAILED) { err = GetLastError(); printf("main: thread failed %x %x\n", t, err); } if (t - WAIT_OBJECT_0 >= 0 && t - WAIT_OBJECT_0 < numThreads) { free(threadArgs[t][1]); free(threadArgs[t][2]); free(threadArgs[t]); threadArgs[t] = prepThreadData(t, iter->second, opPath); printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]); hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]); if (hdl[t] == NULL) { err = GetLastError(); printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]); } } } if (WAIT_FAILED == WaitForMultipleObjects(numThreads - 1, hdl, TRUE, INFINITE)) { err = GetLastError(); printf("main: thread failed %x %x\n", err); }
我現在的問題是使用 pthreads 獲得類似的功能。我能想到的最好的方法是使用信號量,當其中一個可用時,生成一個新執行緒,而不是使用 threadArgs 數組,我將只使用一個為每個執行緒生成分配記憶體的指針。此外,為了便於記憶體管理,分配給 threadArgs 的記憶體
$$ t $$然後將由生成的執行緒擁有。 有更好的解決方案嗎?或者是否有類似於
WaitForMutlipleObjects()
pthreads 的東西?更具體地說,如果我替換CreateThread()
為pthread_create()
,我應該替換WaitForMultipleObjects()
為什麼?
聽起來你想要一個工作隊列。您可以使用需要處理的文件集合填充該隊列,並使用一個函式從隊列中取出一個項目,該隊列執行必要的鎖定以防止執行緒之間的競爭。然後開始你想要多少執行緒。每個執行緒將從隊列中取出一個項目,對其進行處理,然後將下一個項目出隊。當隊列變空時,執行緒可以阻塞等待更多輸入,或者如果您知道將沒有更多輸入,則執行緒可以終止。
這是一個簡單的例子:
#include <cstdio> #include <mutex> #include <queue> #include <thread> template<typename T> class ThreadSafeQueue { public: void enqueue(const T& element) { std::lock_guard<std::mutex> lock(m_mutex); m_queue.push(element); } bool dequeue(T& value) { std::lock_guard<std::mutex> lock(m_mutex); if (m_queue.empty()) { return false; } value = m_queue.front(); m_queue.pop(); return true; } private: std::mutex m_mutex; std::queue<T> m_queue; }; static void threadEntry(const int threadNumber, ThreadSafeQueue<std::string>* const queue) { std::string filename; while (queue->dequeue(filename)) { printf("Thread %d processing file '%s'\n", threadNumber, filename.c_str()); } } int main() { ThreadSafeQueue<std::string> queue; // Populate queue for (int i = 0; i < 100000; ++i) { queue.enqueue("filename_" + std::to_string(i) + ".txt"); } const size_t NUM_THREADS = 4; // Spin up some threads std::thread threads[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; ++i) { threads[i] = std::thread(threadEntry, i, &queue); } // Wait for threads to finish for (int i = 0; i < NUM_THREADS; ++i) { threads[i].join(); } return 0; }
編譯:
$ g++ example.cpp -pthread
該程序定義
ThreadSafeQueue
了一個帶有內部鎖定的隊列,以使多個執行緒能夠同時訪問它。該
main
函式首先填充隊列。然後它啟動 4 個執行緒。每個執行緒從隊列中讀取一個值並“處理”它(這裡,通過將消息列印到標準輸出)。當隊列為空時,執行緒終止。該main
函式在返回之前等待執行緒終止。請注意,此設計假定所有元素線上程開始之前都已填充到隊列中。通過一些更改,它可以擴展為支持線上程執行時處理新工作。