在介绍WebRTC的线程模型之前,先介绍Webrtc线程模型中用到的几个简单、常用的模块或函数。

一、 设置线程名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//platform_thread.cc

void SetCurrentThreadName(const char* name) {
#if defined(WEBRTC_WIN)
struct {
DWORD dwType;
LPCSTR szName;
DWORD dwThreadID;
DWORD dwFlags;
} threadname_info = {0x1000, name, static_cast<DWORD>(-1), 0};

__try {
::RaiseException(0x406D1388, 0, sizeof(threadname_info) / sizeof(DWORD),
reinterpret_cast<ULONG_PTR*>(&threadname_info));
} __except (EXCEPTION_EXECUTE_HANDLER) {
}
#elif defined(WEBRTC_LINUX) || defined(WEBRTC_ANDROID)
prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(name));
#elif defined(WEBRTC_MAC) || defined(WEBRTC_IOS)
pthread_setname_np(name);
#endif
}

二、 原子操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//atomicops.h
class AtomicOps {
public:
#if defined(WEBRTC_WIN)
// Assumes sizeof(int) == sizeof(LONG), which it is on Win32 and Win64.
static int Increment(volatile int* i) {
return ::InterlockedIncrement(reinterpret_cast<volatile LONG*>(i));
}
static int Decrement(volatile int* i) {
return ::InterlockedDecrement(reinterpret_cast<volatile LONG*>(i));
}
static int AcquireLoad(volatile const int* i) {
return *i;
}
static void ReleaseStore(volatile int* i, int value) {
*i = value;
}
static int CompareAndSwap(volatile int* i, int old_value, int new_value) {
return ::InterlockedCompareExchange(reinterpret_cast<volatile LONG*>(i),
new_value,
old_value);
}
// Pointer variants.
template <typename T>
static T* AcquireLoadPtr(T* volatile* ptr) {
return *ptr;
}
template <typename T>
static T* CompareAndSwapPtr(T* volatile* ptr, T* old_value, T* new_value) {
return static_cast<T*>(::InterlockedCompareExchangePointer(
reinterpret_cast<PVOID volatile*>(ptr), new_value, old_value));
}
#else
static int Increment(volatile int* i) {
return __sync_add_and_fetch(i, 1);
}
static int Decrement(volatile int* i) {
return __sync_sub_and_fetch(i, 1);
}
static int AcquireLoad(volatile const int* i) {
return __atomic_load_n(i, __ATOMIC_ACQUIRE);
}
static void ReleaseStore(volatile int* i, int value) {
__atomic_store_n(i, value, __ATOMIC_RELEASE);
}
static int CompareAndSwap(volatile int* i, int old_value, int new_value) {
return __sync_val_compare_and_swap(i, old_value, new_value);
}
// Pointer variants.
template <typename T>
static T* AcquireLoadPtr(T* volatile* ptr) {
return __atomic_load_n(ptr, __ATOMIC_ACQUIRE);
}
template <typename T>
static T* CompareAndSwapPtr(T* volatile* ptr, T* old_value, T* new_value) {
return __sync_val_compare_and_swap(ptr, old_value, new_value);
}
#endif
};

使用示例

1
2
3
bool MessageQueue::IsQuitting() {
return AtomicOps::AcquireLoad(&stop_) != 0;
}

三、线程模型原理

WebRTC的线程功能由Thread类提供。Thread继承于消息队列MessageQueue,这样WebRTC中的每个线程都有了自己的消息循环,外部可以向该线程的消息循环Post消息Message,然后该线程轮询从消息循环Get到消息后处理消息。

UML如下:

四、Message和MessageQueue

Message用于定义单个消息:

  • posted_from
    标记改条消息发送自哪个函数,一般都是直接用RTC_FROM_HERE宏来赋值;
  • message_id
    32位整数,消息ID;
  • pdata
    消息携带的数据指针,虽然定义MessageData*类型,但也可以等同于void*
  • ts_sensitive
    消息的敏感时间点。在使用MessageQueue::Post等方法发送消息时,若time_sensitive == true则设置ts_sensitive = TimeMillis() + kMaxMsgLatency 即当前时间 + 最大消息延迟时间。当消息被从队列中取出的时间大于该时间,则会打印警告日志。
  • phandler
    消息处理器,MessageHandler接口的指针。用户需要继承该类,并重写其OnMessage虚函数。

MessageQueue实现了WebRTC中消息队列的功能,如Post方法用于添加消息;Get方法用于从队列取出消息,若队列没有消息则一直等待,具体的等待和唤醒的方式通过SocketServerWaitWakeUp来实现。

SocketServer是一个纯抽象类,NullSocketServerPhysicalSocketServer都派生自该类。NullSocketServer比较简单,没有创建SOCKET,只处理本地事件(CreateEvent系列函数);PhysicalSocketServer会创建SOCKET,并可以处理网络事件(WSACreateEvent系列函数)。在单纯的线程模型中,SocketServer只用于处理等待和唤醒操作。

五、 Thread

Thread类提供了2个静态函数(Create, CreateWithSocketServer)来构造Thread实例。

不建议使用new Thread()调用默认构造函数的方式,代码注释中已经给出了解释:

1
2
3
4
5
6
// DEPRECATED.
// The default constructor should not be used because it hides whether or
// not a socket server will be associated with the thread. Most instances
// of Thread do actually not need one, so please use either of the Create*
// methods to construct an instance of Thread.
Thread();

下面是Thread类构造的大致过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
+-----------------------------+
| |
| new Thread(SocketServer*) |
| |
+---+-------------------------+
|
|
| +-----------------------------------+
| | |
+---------> new MessageQueue(SocketServer*) |
| |
+-----------------------------------+

构造完Thread实例之后,调用Start来启动线程,框架会根据Start函数的Runnable参数是否为NULL来判断是需要运行用户自定义的Runnable->Run(), 还是运行默认ProcessMessage去循环从消息队列中获取消息来处理。

  • 若运行用户自定义的Runnable->Run(),则用户需要继承rtc::Runnable去重载Run()
  • 若运行默认的ProcessMessage,则用户则需要定义Message,向线程中PostMessage,让线程来执行。

流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
+---------------------------+
| |
| Thread->Start(Runnable*) |
| |
+-+-------------------------+
|
| +-------------------------+
| | |
+-------> CreateThrad(PreRun) |
| |
+-------------------------+
|Runnable* is NULL?
|
| +------------------+
| | |
+-----------> Runnable->Run() |
| | |
| +------------------+
|
|
| +------------------+
| | |
+-----------> Thread->Run() |
| |
+-+----------------+
|
|
| +--------------------------+
| | |
+----> Thread->ProcessMessage() |
| |
+--+-----------------------+
|
| +--------------------------------+
| | |
+-----> Loop call MessageQueue->Get() |
| |
+--------------------------------+

六、 示例

6.1 实现Runnbale

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include "rtc_base/thread.h"

class MyTask : public rtc::Runnable {
public:
MyTask(const std::string &name) : name_(name) {
}
protected:
void Run(Thread* thread) {
std::cout << "task name: " << name_ << std::endl;
std::cout << "my thread id: " << thread->GetId() << std::endl;
}
private:
std::string name_;
};

int main()
{
std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
thread->Start(new MyTask("task1"));

getchar();
return 0;
}

6.2 发送Message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MyTaskHandler : public rtc::MessageHandler {
public:
MyTaskHandler(const std::string &name) : name_(name) {

}
protected:
void OnMessage(Message* msg) {
std::cout << "task name: " << name_ << std::endl;
std::cout << "my thread id: " << Thread::Current()->GetId() << std::endl;
}
private:
std::string name_;
};

int main()
{
std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
thread->Start(nullptr);

thread->Post(RTC_FROM_HERE, new MyTaskHandler("task2"), 0, nullptr, false);

getchar();
return 0;
}