首页 > C/C++, TCP/IP, 协程 > libtask协程库实现源码学习

libtask协程库实现源码学习

2014年6月29日 发表评论 阅读评论 13046次阅读    

协程的概念不多说了,轻量级线程,其最大的优势就是协程之间的切换代价非常低,理论上是单线程运行,只是在应用层进行了上下文手动切换。

其最重要的实现函数是makecontext, getcontext, swapcontext 这一组函数,具体的协程上下文切换由他们完成。具体就不多说了,这些函数在glibc里面是已汇编的形式提供的,实际上做的工作就是讲各个寄存器,堆栈指针,指令指针等全部保存起来,或者进行切换,从而达到协程切换的目的。我们知道程序在CPU上运行的时候,注意依赖2个重要的东西:

  • 1.堆栈指针ESP寄存器用来找到堆栈的顶部位置,从而实现去参数的目的。 这个告诉CPU堆栈在哪,也就是数据在哪。
  • 2.指令指针EIP寄存器用来存放下一次CPU将要执行的代码段指令的偏移量。这个是CPU将要执行的代码。

程序的运行主要依靠上面2个变量,他们代表了一个线程的上下文环境。改变他们就可以达到执行其他位置的代码的目的。

0.基本原理

这里稍微介绍一下makecontext就可以理解其含义了。代码在sysdeps/unix/sysv/linux/i386/makecontext.S, 但是是汇编代码,在libtask里面提供了一份C语言版本的简化代码,注释如下:

#ifdef NEEDX86MAKECONTEXT
void
makecontext(ucontext_t *ucp, void (*func)(void), int argc, ...)
{
    int *sp;

    sp = (int*)ucp->uc_stack.ss_sp+ucp->uc_stack.ss_size/4;//移动堆栈到末尾,然后从后面开始将参数拷贝上去
    sp -= argc;
    sp = (void*)((uintptr_t)sp - (uintptr_t)sp%16); /* 16-align for OS X */
    memmove(sp, &argc+1, argc*sizeof(int));

    *--sp = 0;      /* return address */
    ucp->uc_mcontext.mc_eip = (long)func;//设置指令指针寄存器的位置为参数的func
    ucp->uc_mcontext.mc_esp = (int)sp;//堆栈寄存器。
}
#endif

上面需要注意的是堆栈是向下生长的,makecontext需要用户提供堆栈空间,这个我们自己按实际需求设置就行,只要不会造成堆栈溢出就可以了。设置的堆栈空间会在makecontext里面进行初始化,比如设置参数,返回值等。进行swapcontext的时候就会用到mc_eip, mc_esp等。

1.使用libtask协程

下面看一下官方的httpload.c的协程例子,实现HTTP下载或者说压力测试吧。

#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <task.h>
#include <stdlib.h>

enum{   STACK = 32768 };
char *server;
char *url;
void fetchtask(void*);

void taskmain(int argc, char **argv){
        int i, n;

        if(argc != 4){
                fprintf(stderr, "usage: httpload n server url\n");
                taskexitall(1);
        }
        n = atoi(argv[1]);
        server = argv[2];
        url = argv[3];

        for(i=0; i<n; i++){
                taskcreate(fetchtask, 0, STACK);//创建协程,设置为可以运行的状态.
                //其实fetchtask不是上下文切换的第一个函数,taskstart才是,后者立即调用fetchtask
                while(taskyield() > 1)//主动释放CPU,这里循环其实是为了给其他协程足够的机会
                        ;
                sleep(1);
        }
}

void fetchtask(void *v) {
        int fd, n;
        char buf[512];

        fprintf(stderr, "starting...\n");
        for(;;){
                if((fd = netdial(TCP, server, 80)) < 0){//异步连接服务器,会造成协程切换
                        fprintf(stderr, "dial %s: %s (%s)\n", server, strerror(errno), taskgetstate());
                        continue;
                }
                snprintf(buf, sizeof buf, "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n", url, server);
                fdwrite(fd, buf, strlen(buf));
                while((n = fdread(fd, buf, sizeof buf)) > 0){
                        //buf[n] = '\0';
                        //printf("buf:%s", buf);
                }
                close(fd);
                write(1, ".", 1);
        }
}

从上面可以看出,使用libtask库后,程序的开始函数变为了taskmain, 原先的main被接管了。调用taskcreate创建协程,执行函数设置为fetchtask, 堆栈大小32K,其工作就是建立几个协程,然后发送HTTP请求。具体不多介绍啦,待会围绕这个例子来介绍下其原理。

2. 创建协程taskcreate

协程创建方法为 taskcreate(fetchtask, 0, STACK);,参数为执行函数和参数,还有堆栈大小。其为协程分配了Task数据结构,并且仅限初始化,将协程设置为可运行状态,这样就可以执行了。

Task结构比较简单,一些链表指针,加上一个ucontext_t 的成员,堆栈指针地址,大小,以及执行函数和参数,如下:

struct Task {
    char    name[256];  // offset known to acid
    char    state[256];
    Task    *next;//协程的双向链表
    Task    *prev;
    Task    *allnext;
    Task    *allprev;
    Context context;//实际上就是ucontext_t的包装,后者就是系统里面的ucontext结构
    uvlong  alarmtime;
    uint    id;
    uchar   *stk;//堆栈起始位置,使用的时候是按向下使用的方式的
    uint    stksize;//堆栈长度
    int exiting;
    int alltaskslot;//alltask[]的当前占用最高槽位数,总数能到alltaskslot+ 64 - alltaskslot%64
    int system;
    int ready;//为1代表在运行状态
    void    (*startfn)(void*);
    void    *startarg;
    void    *udata;
};
int
taskcreate(void (*fn)(void*), void *arg, uint stack)
{
    int id;
    Task *t;

    t = taskalloc(fn, arg, stack);//初始化申请Task数据结构,设置堆栈内容等
    taskcount++;//当前OK的协程数目,不包括系统级别的协程
    id = t->id;
    if(nalltask%64 == 0){//一次申请64个槽位,只能往后放
        alltask = realloc(alltask, (nalltask+64)*sizeof(alltask[0]));
        if(alltask == nil){
            fprint(2, "out of memory\n");
            abort();
        }
    }
    t->alltaskslot = nalltask;
    alltask[nalltask++] = t;
    taskready(t);//加入taskrunqueue的运行队列
    return id;
}

其中taskalloc函数进行了内存分配,数据初始化,主要通过makecontext设置了协程的上下文结构context,将协程的初始化运行函数设置为taskstart,后者会直接调用应用层的执行函数的。详见代码注释:

static Task*
taskalloc(void (*fn)(void*), void *arg, uint stack)
{//申请Task结构,初始化信号,堆栈,运行函数等,调用makecontext模拟堆栈内容,设置堆栈指针esp位置灯
    Task *t;
    sigset_t zero;
    uint x, y;
    ulong z;

    /* allocate the task and stack together */
    t = malloc(sizeof *t+stack);//存放Task结构和后面的堆栈
    if(t == nil){
        fprint(2, "taskalloc malloc: %r\n");
        abort();
    }
    memset(t, 0, sizeof *t);
    t->stk = (uchar*)(t+1);//注意这里加1,实际上加的是一个Task结构,也就是移动了sizeof(Task).
    t->stksize = stack;
    t->id = ++taskidgen;//就是个全局静态变量自增的id
    t->startfn = fn;
    t->startarg = arg;

    /* do a reasonable initialization */
    memset(&t->context.uc, 0, sizeof t->context.uc);
    sigemptyset(&zero);
    sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);//这里获取一下现在的信号屏蔽字到uc_sigmask存起来,切换协程的时候会设
置的

    /* must initialize with current context */
    if(getcontext(&t->context.uc) < 0){//初始化获取一下当前上下文状态,下面在修改堆栈,指令等地址
        fprint(2, "getcontext: %r\n");
        abort();
    }

    /* call makecontext to do the real work. */
    /* leave a few words open on both ends */
    t->context.uc.uc_stack.ss_sp = t->stk+8;//堆栈起始位置,使用的时候会从结束位置开始向下使用的
    t->context.uc.uc_stack.ss_size = t->stksize-64;
    /*
     * All this magic is because you have to pass makecontext a
     * function that takes some number of word-sized variables,
     * and on 64-bit machines pointers are bigger than words.
     */
//print("make %p\n", t);
    z = (ulong)t;
    y = z;
    z >>= 16;   /* hide undefined 32-bit shift from 32-bit compilers */
    x = z>>16;
    makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);
    //由于makecontext需要参数必须是4个字节的,所以这里讲t的指针转为64位8个字节,然后分y,x 2个四字节传参数
    //以备makecontext能够模拟设置堆栈传参,并设置sp等堆栈指针位置

    return t;
}

协程创建后,所有的协程都会放到alltask数组中,每隔64个realloc扩容一下, 其中可以被调度运行的协程放在taskrunqueue双向链表里面。新协程放在链表尾部。

3.协程的调度/运行

协程创建后,是怎么调度运行的呢?这里需要看一下真正的Main函数了,libtask的main接管函数首先自己调用taskcreate 创建了taskmainstart协程,其实就是我们应用层编写的taskmain函数了。

然后调用taskscheduler进行协程调度。

static void
taskmainstart(void *v)
{
    taskname("taskmain");
    taskmain(taskargc, taskargv);
}

int
main(int argc, char **argv)
{
    struct sigaction sa, osa;

    memset(&sa, 0, sizeof sa);
    sa.sa_handler = taskinfo;
    sa.sa_flags = SA_RESTART;
    sigaction(SIGQUIT, &sa, &osa);

#ifdef SIGINFO
    sigaction(SIGINFO, &sa, &osa);
#endif

    argv0 = argv[0];
    taskargc = argc;
    taskargv = argv;

    if(mainstacksize == 0)
        mainstacksize = 256*1024;
    taskcreate(taskmainstart, nil, mainstacksize);//创建初始协程,调用应用层的taskmain函数,此时taskmainstart协程还没有运行,等待
调度
    taskscheduler();//进行协程调度
    fprint(2, "taskscheduler returned in main!\n");
    abort();
    return 0;
}

下面重点在taskscheduler,所有要进行协程切换的地方,都会讲协程切换到这个上下文:taskschedcontext, 实际上就是我们的主线程所在的协程上下文。下面代码比较简单,重在理解:

static void
taskscheduler(void)
{//协程调度函数
    int i;
    Task *t;

    taskdebug("scheduler enter");
    for(;;){
        if(taskcount == 0)
            exit(taskexitval);
        t = taskrunqueue.head;//从头部开始唤起
        if(t == nil){
            fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount);
            exit(1);
        }
        deltask(&taskrunqueue, t);//从待调度链表中移出来,调度它运行
        t->ready = 0;
        taskrunning = t;//标记正在执行的协程
        tasknswitch++;
        taskdebug("run %d (%s)", t->id, t->name);
        contextswitch(&taskschedcontext, &t->context);//真正进行上下文切换,这样就切换到了其他协程运行,比如taskmainstart
//print("back in scheduler\n");
        taskrunning = nil;
        if(t->exiting){
            if(!t->system)
                taskcount--;
            i = t->alltaskslot;
            alltask[i] = alltask[--nalltask];
            alltask[i]->alltaskslot = i;
            free(t);
        }
    }
}

上面contextswitch进行了实际的上下文切换,比如切换到了taskmainstart 协程,这样当前的上下文执行环境会保存到taskschedcontext里面, contextswitch函数不会返回,直到再次有主动切换到这里为止。

那么,什么时候能够返回来呢?答案是:对方主动放弃CPU的时候,比如最开头的taskyield函数,taskyield函数会将当前运行的taskrunning协程设置为可运行状态,挂到taskrunqueue后面, 然后将其切换到taskschedcontext运行,也就是上面的taskscheduler运行的协程,实际上是taskscheduler里面的contextswitch函数里面,随后contextswitch将返回到taskscheduler里面继续进行调度运行。

void
taskswitch(void)
{//主动进行协程切换,其实就是切换到main协程,到那里去进行实际的切换
    needstack(0);//检测堆栈溢出
    contextswitch(&taskrunning->context, &taskschedcontext);
}

void
taskready(Task *t)
{
    t->ready = 1;//状态设置为可以运行的状态,然后加到运行 任务队列末尾里面去
    addtask(&taskrunqueue, t);
}

int
taskyield(void)
{
    int n;

    n = tasknswitch;//用来计算自愿放弃协程后,到恢复所发生的切换次数
    taskready(taskrunning);//挂到taskrunqueue后面
    taskstate("yield");
    taskswitch();
    return tasknswitch - n - 1;
}

介绍的差不多了,这里也许能想到,如果某个协程不小心执行了一行sleep(1000)的代码,那么会真的整个进程就sleep进去的,因为这里没有任何机会进行协程调度,所以在协程程序里面,需要换为taskdelay进行睡眠,其里面的实现下次写一下。

到这里差不多了,其实只要理解了协程的运行上下文概念,就差不多了。后面有时间再稍微记录一下libtask里面对于异步IO的代码。

协程主要适合于一些IO比较频繁的系统,在这样的系统中,使用协程跟多线程的优缺点比较如下:

  • 1. 单线程异步IO: 代码难度比较大,需要自己处理异步I/O, epoll等, 优点是性能高,代码执行是顺序的,不需要关心锁,竞争等情况;
  • 2.协程: 比单线程异步I/O容易编程,代码更好写,协程里面是顺序编程的,但协程之间是独立栈,共享堆内存,单线程执行环境,在一个CPU上运行。协程切换代价比线程少多了,只需要十几条汇编指令切换寄存器。每秒据说能达到上百万次切换。
  • 3.多线程同步I/O: 代码相对也好写,跟协程一样独立栈,共享堆内存。 需要处理同步禁止问题,但线程切换代价特别大,linux里面没有原生的线程,是用进程实现的。
  • 4.多进程: 这个得看具体应用了,需要共享数据的应用不适合用多进程,否则得共享数据。但代码相对容易编写;

这里只是简单介绍下协程的实现方式之一,具体使用哪一种看具体应用,灵活使用即可。

总之在使用libtask的时候,需要随时心里想着我这code是在一个协程里面运行的,数据都是共享的,什么时候放弃CPU,什么时候可能会发生切换,码农们需要知晓,代码且写且珍惜,不然容易出错造就悲剧,好东西总会有代价的。

Share
  1. kulv
    2015年3月9日22:44 | #1

    @sdbc
    恩,是的,反正线程也是一个执行上下文。linux里面更是一个进程的实现。

  2. kulv
    2015年3月9日22:43 | #2

    @克莱尔孙
    谢谢指正了!确实错别字有点多^.^,打得太快没注意check

  3. cholerae
    2015年2月26日12:35 | #3

    ucontext已经deprecated了,能不用还是最好别用

  4. sdbc
    2015年2月10日11:42 | #4

    就是说。协程,不一定就基于单线程,完全可以多线程的。
    为线程池系统提供更好的并行性。

  5. sdbc
    2015年2月10日11:38 | #5

    最近实现了线程池+epoll+ucontext的服务器,可以多个线程+AIO。

  6. 克莱尔孙
    2015年1月30日12:29 | #6

    contextswitch函数不会返回,知道再次有主动切换到这里为止。

    如果taskmain中逻辑执行完了,没有涉及切换的逻辑,也是可以返回得。楼主是不是想表达别的意思。

    另外,每篇文章的错别字都比较多,希望能改改 - -

  1. 2014年6月30日08:29 | #1

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。