Multithreading

將隊列中的作業調度到多個執行緒

  • September 16, 2020

我有一個函式必須處理一組目錄中的所有文件(5-300 個文件之間的任何文件)。要使用的並行執行緒數由使用者指定(通常為 4)。這個想法是在 4 個單獨的執行緒中啟動函式。當一個執行緒返回時,我必須開始處理下一個(第 5 個)文件,依此類推,直到所有文件都完成。

在 Windows 上,WaitForMultipleObjects()這裡bWaitAll=False可以幫助我。我有一個可以填充並填充到數組中的結構

map<UINT, string>::iterator iter = m_FileList.begin();
string outputPath = GetOutputPath();
void ***threadArgs = (void***)malloc(sizeof(void**)*numThreads);
HANDLE *hdl = (HANDLE*)malloc(sizeof(HANDLE)*numThreads);
DWORD *thr = (DWORD*)malloc(sizeof(DWORD)*numThreads);

for (int t = 0; iter != m_FileList.end() && t < numThreads; t++, iter++)
{
   threadArgs[t] = prepThreadData(t, iter->second, opPath);
   printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
   hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
   if (hdl[t] == NULL)
   {
       err = GetLastError();
       printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
   }
}

for (;iter != m_FileList.end(); iter++)
{
   int t = (int)WaitForMultipleObjects(numThreads, hdl, FALSE, INFINITE);
   if (t == WAIT_FAILED)
   {
       err = GetLastError();
       printf("main: thread failed %x %x\n", t, err);
   }
   if (t - WAIT_OBJECT_0 >= 0 && t - WAIT_OBJECT_0 < numThreads)
   {
       free(threadArgs[t][1]);
       free(threadArgs[t][2]);
       free(threadArgs[t]);
       threadArgs[t] = prepThreadData(t, iter->second, opPath);
       printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
       hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
       if (hdl[t] == NULL)
       {
           err = GetLastError();
           printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
       }
   }
}
if (WAIT_FAILED == WaitForMultipleObjects(numThreads - 1, hdl, TRUE, INFINITE))     
{
   err = GetLastError();
   printf("main: thread failed %x %x\n", err);
}

我現在的問題是使用 pthreads 獲得類似的功能。我能想到的最好的方法是使用信號量,當其中一個可用時,生成一個新執行緒,而不是使用 threadArgs 數組,我將只使用一個為每個執行緒生成分配記憶體的指針。此外,為了便於記憶體管理,分配給 threadArgs 的記憶體

$$ t $$然後將由生成的執行緒擁有。 有更好的解決方案嗎?或者是否有類似於WaitForMutlipleObjects()pthreads 的東西?更具體地說,如果我替換CreateThread()pthread_create(),我應該替換WaitForMultipleObjects()為什麼?

聽起來你想要一個工作隊列。您可以使用需要處理的文件集合填充該隊列,並使用一個函式從隊列中取出一個項目,該隊列執行必要的鎖定以防止執行緒之間的競爭。然後開始你想要多少執行緒。每個執行緒將從隊列中取出一個項目,對其進行處理,然後將下一個項目出隊。當隊列變空時,執行緒可以阻塞等待更多輸入,或者如果您知道將沒有更多輸入,則執行緒可以終止。

這是一個簡單的例子:

#include <cstdio>
#include <mutex>
#include <queue>
#include <thread>

template<typename T>
class ThreadSafeQueue {
public:
   void enqueue(const T& element)
   {
       std::lock_guard<std::mutex> lock(m_mutex);

       m_queue.push(element);
   }

   bool dequeue(T& value)
   {
       std::lock_guard<std::mutex> lock(m_mutex);

       if (m_queue.empty()) {
           return false;
       }

       value = m_queue.front();
       m_queue.pop();

       return true;
   }

private:
   std::mutex m_mutex;
   std::queue<T> m_queue;
};

static void threadEntry(const int threadNumber, ThreadSafeQueue<std::string>* const queue)
{
   std::string filename;

   while (queue->dequeue(filename)) {
       printf("Thread %d processing file '%s'\n", threadNumber, filename.c_str());
   }
}

int main()
{
   ThreadSafeQueue<std::string> queue;

   // Populate queue
   for (int i = 0; i < 100000; ++i) {
       queue.enqueue("filename_" + std::to_string(i) + ".txt");
   }

   const size_t NUM_THREADS = 4;

   // Spin up some threads
   std::thread threads[NUM_THREADS];
   for (int i = 0; i < NUM_THREADS; ++i) {
       threads[i] = std::thread(threadEntry, i, &queue);
   }

   // Wait for threads to finish
   for (int i = 0; i < NUM_THREADS; ++i) {
       threads[i].join();
   }

   return 0;
}

編譯:

$ g++ example.cpp -pthread

該程序定義ThreadSafeQueue了一個帶有內部鎖定的隊列,以使多個執行緒能夠同時訪問它。

main函式首先填充隊列。然後它啟動 4 個執行緒。每個執行緒從隊列中讀取一個值並“處理”它(這裡,通過將消息列印到標準輸出)。當隊列為空時,執行緒終止。該main函式在返回之前等待執行緒終止。

請注意,此設計假定所有元素線上程開始之前都已填充到隊列中。通過一些更改,它可以擴展為支持線上程執行時處理新工作。

引用自:https://unix.stackexchange.com/questions/609653