在介绍WebRTC的线程模型之前,先介绍Webrtc线程模型中用到的几个简单、常用的模块或函数。
一、 设置线程名 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void SetCurrentThreadName (const char * name) {#if defined(WEBRTC_WIN) struct { DWORD dwType; LPCSTR szName; DWORD dwThreadID; DWORD dwFlags; } threadname_info = {0x1000 , name, static_cast <DWORD>(-1 ), 0 }; __try { ::RaiseException (0x406D1388 , 0 , sizeof (threadname_info) / sizeof (DWORD), reinterpret_cast <ULONG_PTR*>(&threadname_info)); } __except (EXCEPTION_EXECUTE_HANDLER) { } #elif defined(WEBRTC_LINUX) || defined(WEBRTC_ANDROID) prctl (PR_SET_NAME, reinterpret_cast <unsigned long >(name)); #elif defined(WEBRTC_MAC) || defined(WEBRTC_IOS) pthread_setname_np (name); #endif }
二、 原子操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class AtomicOps { public : #if defined(WEBRTC_WIN) static int Increment (volatile int * i) { return ::InterlockedIncrement (reinterpret_cast <volatile LONG*>(i)); } static int Decrement (volatile int * i) { return ::InterlockedDecrement (reinterpret_cast <volatile LONG*>(i)); } static int AcquireLoad (volatile const int * i) { return *i; } static void ReleaseStore (volatile int * i, int value) { *i = value; } static int CompareAndSwap (volatile int * i, int old_value, int new_value) { return ::InterlockedCompareExchange (reinterpret_cast <volatile LONG*>(i), new_value, old_value); } template <typename T> static T* AcquireLoadPtr (T* volatile * ptr) { return *ptr; } template <typename T> static T* CompareAndSwapPtr (T* volatile * ptr, T* old_value, T* new_value) { return static_cast <T*>(::InterlockedCompareExchangePointer ( reinterpret_cast <PVOID volatile *>(ptr), new_value, old_value)); } #else static int Increment (volatile int * i) { return __sync_add_and_fetch(i, 1 ); } static int Decrement (volatile int * i) { return __sync_sub_and_fetch(i, 1 ); } static int AcquireLoad (volatile const int * i) { return __atomic_load_n(i, __ATOMIC_ACQUIRE); } static void ReleaseStore (volatile int * i, int value) { __atomic_store_n(i, value, __ATOMIC_RELEASE); } static int CompareAndSwap (volatile int * i, int old_value, int new_value) { return __sync_val_compare_and_swap(i, old_value, new_value); } template <typename T> static T* AcquireLoadPtr (T* volatile * ptr) { return __atomic_load_n(ptr, __ATOMIC_ACQUIRE); } template <typename T> static T* CompareAndSwapPtr (T* volatile * ptr, T* old_value, T* new_value) { return __sync_val_compare_and_swap(ptr, old_value, new_value); } #endif };
使用示例
1 2 3 bool MessageQueue::IsQuitting () { return AtomicOps::AcquireLoad (&stop_) != 0 ; }
三、线程模型原理 WebRTC的线程功能由Thread
类提供。Thread继承于消息队列MessageQueue
,这样WebRTC中的每个线程都有了自己的消息循环,外部可以向该线程的消息循环Post消息Message
,然后该线程轮询从消息循环Get
到消息后处理消息。
UML如下:
四、Message和MessageQueue Message
用于定义单个消息:
posted_from 标记改条消息发送自哪个函数,一般都是直接用RTC_FROM_HERE
宏来赋值;
message_id 32位整数,消息ID;
pdata 消息携带的数据指针,虽然定义MessageData*
类型,但也可以等同于void*
。
ts_sensitive 消息的敏感时间点。在使用MessageQueue::Post
等方法发送消息时,若time_sensitive == true
则设置ts_sensitive = TimeMillis() + kMaxMsgLatency
即当前时间 + 最大消息延迟时间。当消息被从队列中取出的时间大于该时间,则会打印警告日志。
phandler 消息处理器,MessageHandler接口的指针。用户需要继承该类,并重写其OnMessage
虚函数。
MessageQueue
实现了WebRTC中消息队列的功能,如Post
方法用于添加消息;Get
方法用于从队列取出消息,若队列没有消息则一直等待,具体的等待和唤醒的方式通过SocketServer
的Wait
和WakeUp
来实现。
SocketServer
是一个纯抽象类,NullSocketServer
和PhysicalSocketServer
都派生自该类。NullSocketServer
比较简单,没有创建SOCKET,只处理本地事件(CreateEvent系列函数);PhysicalSocketServer
会创建SOCKET,并可以处理网络事件(WSACreateEvent系列函数)。在单纯的线程模型中,SocketServer
只用于处理等待和唤醒操作。
五、 Thread Thread类提供了2个静态函数(Create
, CreateWithSocketServer
)来构造Thread实例。
不建议使用new Thread()
调用默认构造函数的方式,代码注释中已经给出了解释:
下面是Thread类构造的大致过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 +-----------------------------+ | | | new Thread(SocketServer*) | | | +---+-------------------------+ | | | +-----------------------------------+ | | | +---------> new MessageQueue(SocketServer*) | | | +-----------------------------------+
构造完Thread实例之后,调用Start
来启动线程,框架会根据Start函数的Runnable参数是否为NULL来判断是需要运行用户自定义的Runnable->Run()
, 还是运行默认ProcessMessage
去循环从消息队列中获取消息来处理。
若运行用户自定义的Runnable->Run()
,则用户需要继承rtc::Runnable
去重载Run()
;
若运行默认的ProcessMessage
,则用户则需要定义Message
,向线程中Post
该Message
,让线程来执行。
流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 +---------------------------+ | | | Thread->Start(Runnable*) | | | +-+-------------------------+ | | +-------------------------+ | | | +-------> CreateThrad(PreRun) | | | +-------------------------+ |Runnable* is NULL? | | +------------------+ | | | +-----------> Runnable->Run() | | | | | +------------------+ | | | +------------------+ | | | +-----------> Thread->Run() | | | +-+----------------+ | | | +--------------------------+ | | | +----> Thread->ProcessMessage() | | | +--+-----------------------+ | | +--------------------------------+ | | | +-----> Loop call MessageQueue->Get() | | | +--------------------------------+
六、 示例 6.1 实现Runnbale 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include "rtc_base/thread.h" class MyTask : public rtc::Runnable {public : MyTask (const std::string &name) : name_ (name) { } protected : void Run (Thread* thread) { std::cout << "task name: " << name_ << std::endl; std::cout << "my thread id: " << thread->GetId () << std::endl; } private : std::string name_; }; int main () { std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create (); thread->Start (new MyTask ("task1" )); getchar (); return 0 ; }
6.2 发送Message 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class MyTaskHandler : public rtc::MessageHandler {public : MyTaskHandler (const std::string &name) : name_ (name) { } protected : void OnMessage (Message* msg) { std::cout << "task name: " << name_ << std::endl; std::cout << "my thread id: " << Thread::Current ()->GetId () << std::endl; } private : std::string name_; }; int main () { std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create (); thread->Start (nullptr ); thread->Post (RTC_FROM_HERE, new MyTaskHandler ("task2" ), 0 , nullptr , false ); getchar (); return 0 ; }