• <td id="ui4cm"></td>
  • <bdo id="ui4cm"><legend id="ui4cm"></legend></bdo>
  • 當前位置:首頁 > 安卓源碼 > 技術博客 >

    使用C ++ 11線程支持庫創建帶有事件循環,消息隊列和計時器的輔助線程

    時間:2020-09-14 15:31 來源:互聯網 作者:源碼搜藏 瀏覽: 收藏 挑錯 推薦 打印

    事件循環(有時稱為消息循環)是等待并調度傳入事件的線程。 線程阻塞等待請求的到達,然后將事件分派給事件處理程序函數。 循環通常使用消息隊列來保存傳入消息。 依次對每個消息進行出隊,解碼,然后執行操作。 事件循環是實現進程間通信的一種方法。 所有

    事件循環(有時稱為消息循環)是等待并調度傳入事件的線程。線程阻塞等待請求的到達,然后將事件分派給事件處理程序函數。循環通常使用消息隊列來保存傳入消息。依次對每個消息進行出隊,解碼,然后執行操作。事件循環是實現進程間通信的一種方法。

    所有操作系統都支持多線程應用程序。每個操作系統都有用于創建線程,消息隊列和計時器的唯一函數調用。隨著C ++ 11線程支持庫的出現,現在可以創建可移植的代碼并避免特定于OS的函數調用。本文提供了一個簡單的示例,說明如何僅依靠C ++標準庫來創建線程事件循環,消息隊列和計時器服務。任何支持線程庫的C ++ 11編譯器都應該能夠編譯附加的源代碼。

    背景

    通常,我需要一個線程來充當事件循環。線程將入站消息出隊,并根據唯一的消息標識符將數據調度到適當的函數處理程序。能夠調用功能的計時器支持對于低速輪詢很方便,如果在預期的時間內沒有發生任何事情,則可以生成超時。很多時候,輔助線程是在啟動時創建的,直到應用程序終止后才被銷毀。

    該實現的關鍵要求是,傳入消息必須在同一線程實例上執行。盡管std::async 可以使用池中的臨時線程,但是此類確保所有傳入消息使用同一線程。例如,可以使用不是線程安全的代碼來實現子系統。單個WorkerThread 實例用于安全地將函數調用分派到子系統中。

    乍一看,C ++線程支持似乎缺少一些關鍵功能。是的,std::thread 可以拆分一個線程,但是沒有線程安全隊列,也沒有計時器-大多數OS都提供的服務。我將展示如何使用C ++標準庫創建這些“缺失”功能,并提供許多程序員熟悉的事件處理循環。

    工作線程

    WorkerThread 類封裝了所有必要的事件循環機制。一個簡單的類接口允許線程創建,將消息發布到事件循環以及最終的線程終止。界面如下圖所示:

    class WorkerThread
    {
    public:
        /// Constructor
        WorkerThread(const char* threadName);
    
        /// Destructor
        ~WorkerThread();
    
        /// Called once to create the worker thread
        /// @return True if thread is created. False otherwise. 
        bool CreateThread();
    
        /// Called once a program exit to exit the worker thread
        void ExitThread();
    
        /// Get the ID of this thread instance
        /// @return The worker thread ID
        std::thread::id GetThreadId();
    
        /// Get the ID of the currently executing thread
        /// @return The current thread ID
        static std::thread::id GetCurrentThreadId();
    
        /// Add a message to the thread queue
        /// @param[in] data - thread specific message information
        void PostMsg(std::shared_ptr<UserData> msg);
    
    private:
        WorkerThread(const WorkerThread&) = delete;
        WorkerThread& operator=(const WorkerThread&) = delete;
    
        /// Entry point for the worker thread
        void Process();
    
        /// Entry point for timer thread
        void TimerThread();
    
        std::unique_ptr<std::thread> m_thread;
        std::queue<std::shared_ptr<ThreadMsg>> m_queue;
        std::mutex m_mutex;
        std::condition_variable m_cv;
        std::atomic<bool> m_timerExit;
        const char* THREAD_NAME;
    };

    首先要注意的是,std::thread 它用于創建主工作線程。主要工作線程功能是Process()。

    bool WorkerThread::CreateThread()
    {
        if (!m_thread)
            m_thread = new thread(&WorkerThread::Process, this);
        return true;
    }

    事件循環

    Process() 事件循環如下所示。該線程依賴于a std::queue<ThreadMsg*> 消息隊列。std::queue 不是線程安全的,因此對隊列的所有訪問都必須由互斥保護。std::condition_variable 用于暫停線程,直到收到新消息已添加到隊列的通知。

    void WorkerThread::Process()
    {
        m_timerExit = false;
        std::thread timerThread(&WorkerThread::TimerThread, this);
    
        while (1)
        {
            std::shared_ptr<ThreadMsg> msg;
            {
                // Wait for a message to be added to the queue
                std::unique_lock<std::mutex> lk(m_mutex);
                while (m_queue.empty())
                    m_cv.wait(lk);
    
                if (m_queue.empty())
                    continue;
    
                msg = m_queue.front();
                m_queue.pop();
            }
    
            switch (msg->id)
            {
                case MSG_POST_USER_DATA:
                {
                    ASSERT_TRUE(msg->msg != NULL);
    
                    auto userData = std::static_pointer_cast<UserData>(msg->msg);
                    cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;
    
                    break;
                }
    
                case MSG_TIMER:
                    cout << "Timer expired on " << THREAD_NAME << endl;
                    break;
    
                case MSG_EXIT_THREAD:
                {
                    m_timerExit = true;
                    timerThread.join();
                    return;
                }
    
                default:
                    ASSERT();
            }
        }
    }

    PostMsg() ThreadMsg 在堆上創建一個新對象,將該消息添加到隊列中,然后使用條件變量通知工作線程。

    void WorkerThread::PostMsg(std::shared_ptr<UserData> data)
    {
        ASSERT_TRUE(m_thread);
    
        // Create a new ThreadMsg
        std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_POST_USER_DATA, data));
    
        // Add user data msg to queue and notify worker thread
        std::unique_lock<std::mutex> lk(m_mutex);
        m_queue.push(threadMsg);
        m_cv.notify_one();
    }

    循環將繼續處理消息,直到MSG_EXIT_THREAD 收到并退出線程為止。

    void WorkerThread::ExitThread()
    {
        if (!m_thread)
            return;
    
        // Create a new ThreadMsg
        std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_EXIT_THREAD, 0));
    
        // Put exit thread message into the queue
        {
            lock_guard<mutex> lock(m_mutex);
            m_queue.push(threadMsg);
            m_cv.notify_one();
        }
    
        m_thread->join();
        m_thread = nullptr;
    }

    事件循環(Win32)

    下面的代碼片段將std::thread 上面事件循環與使用Windows API的類似Win32版本進行了對比。注意GetMessage() API用于代替std::queue。使用將消息發布到OS消息隊列PostThreadMessage()。最后,timerSetEvent() 用于將WM_USER_TIMER 消息放入隊列。所有這些服務均由OS提供。std::thread WorkerThread 此處介紹實現避免了原始OS調用,但實現功能與Win32版本相同,而僅依賴于C ++標準庫。

    unsigned long WorkerThread::Process(void* parameter)
    {
        MSG msg;
        BOOL bRet;
    
        // Start periodic timer
        MMRESULT timerId = timeSetEvent(250, 10, &WorkerThread::TimerExpired, 
                           reinterpret_cast<DWORD>(this), TIME_PERIODIC);
    
        while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
        {
            switch (msg.message)
            {
                case WM_DISPATCH_DELEGATE:
                {
                    ASSERT_TRUE(msg.wParam != NULL);
    
                    // Convert the ThreadMsg void* data back to a UserData*
                    const UserData* userData = static_cast<const UserData*>(msg.wParam);
    
                    cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;
    
                    // Delete dynamic data passed through message queue
                    delete userData;
                    break;
                }
    
                case WM_USER_TIMER:
                    cout << "Timer expired on " << THREAD_NAME << endl;
                    break;
    
                case WM_EXIT_THREAD:
                    timeKillEvent(timerId);
                    return 0;
    
                default:
                    ASSERT();
            }
        }
        return 0;
    }

    計時器

    使用輔助專用線程將低分辨率定期計時器消息插入隊列。計時器線程在內部創建Process()。

    void WorkerThread::Process()
    {
        m_timerExit = false;
        std::thread timerThread(&WorkerThread::TimerThread, this);
    
    ...

    計時器線程的唯一責任是MSG_TIMER 每250ms 插入一條消息。在此實現中,無法防止計時器線程將多個計時器消息注入到隊列中。如果工作線程落后并且無法足夠快地服務于消息隊列,則可能發生這種情況。根據工作線程,處理負載以及計時器消息的插入速度,可以采用其他邏輯來防止泛濫隊列。

    void WorkerThread::TimerThread()
    {
        while (!m_timerExit)
        {
            // Sleep for 250mS then put a MSG_TIMER into the message queue
            std::this_thread::sleep_for(250ms);
    
            std::shared_ptr<ThreadMsg> threadMsg (new ThreadMsg(MSG_TIMER, 0));
    
            // Add timer msg to queue and notify worker thread
            std::unique_lock<std::mutex> lk(m_mutex);
            m_queue.push(threadMsg);
            m_cv.notify_one();
        }
    }

    用法

    main()下面函數顯示了如何使用WorkerThread 該類。創建兩個工作線程,并將消息發布到每個工作線程。短暫延遲后,兩個線程均退出。

    // Worker thread instances
    WorkerThread workerThread1("WorkerThread1");
    WorkerThread workerThread2("WorkerThread2");
    
    int main(void)
    {    
        // Create worker threads
        workerThread1.CreateThread();
        workerThread2.CreateThread();
    
        // Create message to send to worker thread 1
        std::shared_ptr<UserData> userData1(new UserData());
        userData1->msg = "Hello world";
        userData1->year = 2017;
    
        // Post the message to worker thread 1
        workerThread1.PostMsg(userData1);
    
        // Create message to send to worker thread 2
        std::shared_ptr<UserData> userData2(new UserData());
        userData2->msg = "Goodbye world";
        userData2->year = 2017;
    
        // Post the message to worker thread 2
        workerThread2.PostMsg(userData2);
    
        // Give time for messages processing on worker threads
        this_thread::sleep_for(1s);
    
        workerThread1.ExitThread();
        workerThread2.ExitThread();
    
        return 0;
    }

    結論

    C ++線程支持庫提供了獨立于平臺的方式來編寫多線程應用程序代碼,而無需依賴于特定于操作系統的API。WorkerThread 這里介紹類是事件循環的基本實現,但所有基礎知識都已準備就緒,可以進行擴展。

    使用C ++ 11線程支持庫創建帶有事件循環,消息隊列和計時器的輔助線程 轉載http://www.robbiejoe.com/appboke/52393.html

    技術博客閱讀排行

    最新文章

    亚洲AV无码一区东京热
  • <td id="ui4cm"></td>
  • <bdo id="ui4cm"><legend id="ui4cm"></legend></bdo>