协程thread
概念
Photon thread 即是我们通常所说的协程(coroutine/fiber)。之所以使用 thread 命名而不使用后两者,是因为我们希望用户察觉不到Photon程序跟传统多线程程序的差异。
thread 本质上就是一个函数,一个执行单元。
thread 必须存在于 vCPU 里。thread 可以在 vCPU 之间迁移。
命名空间
photon::
头文件
<photon/thread/thread.h>
类型
photon::thread
→ Photon协程实体
photon::join_handle
→ Join句柄,用于Join协程
photon::vcpu_base
→ vCPU实体的基类。目前不能修改 vCPU,这是不可变值。
API
thread_create
photon::thread* photon::thread_create(thread_entry start, void* arg,
uint64_t stack_size = DEFAULT_STACK_SIZE,
uint16_t reserved_space = 0);
描述
创建一个协程,把它放进 RunQueue
。由调度器决定何时开始执行这个协程函数。
该API调用过程中没有发生协程切换,这意味着调用方会继续下一条语句的执行。
参数
start
协程函数入口thread_entry
的类型是void* (*)(void*)
arg
协程函数的单参数,指针类型。stack_size
协程栈大小,默认8MBreserved_space
用于向新的协程栈上拷贝参数。默认0
请确保你传入的参数跟函数有着相同的生命周期,而不是在函数开始执行之前就析构了。这样可能造成无效的内存访问。
如果无法保证,可以使用下文的 thread_create11
,它能够将所有的参数复制到新的协程的函数栈上。
stack_size是指协程栈的大小,这里采用的Linux默认的函数栈大小,8MB。
注意不要导致栈溢出,Linux会在运行时报栈溢出错误,但Photon由于是在用户态模拟的栈,因此不会立刻检测到栈溢出错误,继续运行下去会导致内存被踩。 通常情况下,不写递归函数即是安全的。
虽然我们可以使用内核提供的mprotect等功能保护这个内存段,但是会影响性能,因此这里是让用户自己把握。
每次创建新的协程都会调用malloc申请内存,因此频繁创建建议使用协程池 thread-pool
,或者 pooled_stack_allocator
。
关于OS实际使用的内存量,即RES,并不是malloc完之后立刻增长8MB,而是根据实际写入page的映射情况动态增长。
int main() {
photon::init();
for (int i = 0; i < 2000; ++i) {
photon::thread_create11([]{
photon::thread_usleep(-1UL);
});
}
photon::thread_usleep(-1UL);
}
如上文例子,创建了2000个协程都在sleep。进程虚拟内存16G,但物理内存只有24M。
Return
新的 thread 的指针
thread_create11
template<typename F, typename... ARGUMENTS>
photon::thread* photon::thread_create11(F f, ARGUMENTS&& ...args); // (1)
template<typename FUNCTOR, typename... ARGUMENTS>
photon::thread* photon::thread_create11(FUNCTOR&& f, ARGUMENTS&& ...args); // (2)
template<typename CLASS, typename F, typename... ARGUMENTS>
photon::thread* photon::thread_create11(F f, CLASS* obj, ARGUMENTS&& ...args); // (3)
描述
这是 thread_create
在C++11语法下的封装,同时支持以拷贝的方式传入多个参数。
头文件
<photon/thread/thread11.h>
用法
thread_create11 有三种 API,对应不同场景。
- F 是一个全局函数
double func(int a, char b);
auto th = photon::thread_create11(func, 1, '2');
- FUNCTOR 是一个 function object.
int a = 1; char b = '2';
auto th = photon::thread_create11([&] {
// Access a and b
});
- CLASS 是一个类型,F 是它的成员函数
class A {
void func(int a, char b);
void run() {
photon::thread_create11(&A::func, this, 1, '2');
}
};
参数以值传递的方式传给协程函数,如果需要传引用,请使用 std::ref
或者 std::cref
处理变量
Return
新的 thread 的指针
thread_enable_join
photon::join_handle* photon::thread_enable_join(photon::thread* th, bool flag = true);
Description
- 新建的协程默认是不用Join的,退出时自动释放内存
- 但如果通过
thread_enable_join
开启了 join 却最终没有去执行thread_join
,则会产生资源泄露。 - 得到的
join_handle
指针是thread_join
的参数。
Parameters
th
Thread 指针.flag
Enable or disable. Defaults to true.
Return
join_handle
指针。
thread_join
void photon::thread_join(photon::join_handle* jh);
描述
等待目标协程结束。
你不能 join 一个已经退出的或者不存在的 thread,这将导致 core dump。
参数
jh
:join_handle
指针,从上文的thread_enable_join
获得。
返回值
无
thread_yield
void photon::thread_yield();
Description
切换到其他协程。本协程不会进入 SleepQueue
,而是仍然在 RunQueue
上。
- 协程的调度有如下的特点:除非主动让出CPU,否则该协程函数会一直继续执行
- 协程的
thread_yield
由高性能汇编实现,切换开销在10纳秒左右 thread_yield
只是若干种能够让出CPU的方法中的其中一种,除了它以外,还有下文将要介绍的thread_usleep
,以及各种协程锁和同步原语。thread_yield
只负责中断当前函数的执行,具体下一个协程是哪个它并不知道,由调度器决定。在一段时间后,调度器可能决定又返回本函数,则沿着thread_yield
的下一条语句继续执行。- 普通用户用到yield的场景不太多,它一般出现在锁和同步原语的内部,或者一些偏底层的模块内。如果正确的话,用户可以假定他们使用的所有IO函数都是非阻塞的,即会在恰当的时候进行 yield。
Parameters
None
Return
None
thread_usleep
int photon::thread_usleep(uint64_t useconds);
Description
暂停当前协程(
CURRENT
)的执行,将其插入SleepQueue
队列,睡眠指定时间(微秒)。最后 yield 让出 CPU。进入
SleepQueue
里的协程会在将来会被唤醒。除了用户主动调用睡眠,所有的文件IO(psync除外)、网络收发、锁和同步原语的底层等待都是通过这个
thread_usleep
实现的。
Parameters
useconds
睡眠时间(微秒)。-1UL 表示永久睡眠
Return
- 返回 0 表示成功,即至少已睡眠了
useconds
微秒的时间。 - 返回 -1 表示被提前唤醒了,
errno
是由调用thread_interrupt
的唤醒方设定的。
thread_interrupt
void photon::thread_interrupt(photon::thread* th, int error_number = EINTR);
描述
打断目标协程,从睡眠中将其唤醒。
在协程场景下,将任务的提交、执行、收割分为前台和后台,是常见的一种设计模式。 例如我们可以在前台协程中将任务提交到某个队列后立刻睡眠,后台协程在检查到任务执行完之后,去唤醒前台协程继续执行。
你不能 interrupt 一个已经退出的或者不存在的 thread,这将导致 core dump。
参数
th
目标 thread.error_number
设置目标 thread 看到的errno
. 默认是EINTR
.
返回值
无
备注
调用方和目标协程可能不在同一个 vCPU 上,这种情况下,StandbyQueue
就会被使用到,作为跨线程交互手段。
thread_shutdown
int photon::thread_shutdown(photon::thread* th, bool flag = true);
Description
Set the shutdown flag to disable further scheduling of the target thread, then fire an interrupt.
Parameters
th
Target thread.flag
Set the shutdown flag or not.
Return
None.
thread_migrate
int photon::thread_migrate(photon::thread* th, photon::vcpu_base* vcpu);
Description
把一个 READY
状态的 thread 迁移到另一个 vCPU.
- 虽然是M:1 模型,我们仍然可以将协程函数迁移到另一个线程上去执行。
- 优先使用run-to-completion模型,减少跨线程交互。一旦创建后,非必要不将协程调度到其他线程,除非遇到本线程的CPU瓶颈。
Parameters
th
Target thread.vcpu
Target vCPU.
Return
Returns 0 on success, returns -1 on error.
get_vcpu
vcpu_base* get_vcpu(thread* th = CURRENT);
Description
Get the vCPU of the target thread.
Parameters
th
Target thread. Defaults toCURRENT
thread.
Return
Returns a pointer of vcpu_base
.
协程池
Additional Header
<photon/thread/thread-pool.h>
Description
The thread-pool's implementation is based on identity-pool.h. It supports both static allocation (on stack) and dynamic allocation (on heap).
// on heap
auto p1 = photon::ThreadPoolBase::new_thread_pool(100);
auto th1 = p1->thread_create(&func, nullptr);
// on stack
photon::ThreadPool<400> p2;
auto th2 = p2.thread_create(&func, nullptr);
计时器
Additional Header
<photon/thread/thread-pool.h>
Description
Create a timer object with default_timedout
in usec, callback function on_timer
,
and callback argument arg
. The timer object is implemented as a special thread, so
it has a stack_size
, and the on_timer
is invoked within the thread's context.
The timer object is deleted automatically after it is finished.
一个 non-repeating 的计时器基本上等于创建一个新协程,并执行 thread_sleep。
Timer(uint64_t default_timeout, Entry on_timer,
bool repeating = true, uint64_t stack_size = 1024 * 64);
int Timer::cancel();
int Timer::reset(uint64_t new_timeout = -1);
int Timer::stop();