首页 > C/C++, TCP/IP, UNIX/LINUX, 协程 > libtask协程库实现源码学习-异步I/O

libtask协程库实现源码学习-异步I/O

2014年6月30日 发表评论 阅读评论 3796次阅读    

上篇文章写了libtask协程库实现的基本原理,最后说道协程编程一个很大的关键点是,程序员需要知道什么时候应该进行协程切换,什么地方需要异步I/O,什么地方的代码是顺序运行的等。

这里回顾一下具体哪些地方需要做协程切换,异步I/O: 所有可能会造成阻塞的操作,都必须进行异步IO处理,及时切换协程,绝对不能在协程里面做阻塞操作,因为阻塞了大家都阻塞了,就黄了。

顺藤摸瓜,上次的http压力测试小程序里面,协程执行函数fetchtask就是协程运行的主要函数,看下其实现:

void fetchtask(void *v) {
    int fd, n;
    char buf[512];

    fprintf(stderr, "starting...\n");
    for(;;){
        if((fd = netdial(TCP, server, 80)) < 0){//异步连接服务器,会造成协程切换
            fprintf(stderr, "dial %s: %s (%s)\n", server, strerror(errno), taskgetstate());
            continue;
        }
        snprintf(buf, sizeof buf, "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n", url, server);
        fdwrite(fd, buf, strlen(buf));//异步数据读写,这里可能会造成协程切换,因为一定有阻塞操作
        while((n = fdread(fd, buf, sizeof buf)) > 0){///异步读取
            //buf[n] = '\0';
            //printf("buf:%s", buf);
        }
        close(fd);
        write(1, ".", 1);
    }
}

上面的netdial用来连接服务器,其实就是socket, connect等函数的封装,需要注意的是其调用了fcntl(fd, F_SETFL, fcntl(fd, F_GETFL)|O_NONBLOCK);函数,将这个SOCK设置为非阻塞状态,所以才能异步IO了。

connect调用之后,实际上SOCK不一定连接成功了,所以就需要进行事件监听,看看是否真的连接成功了,代码里调用了fdwait(fd, 'w');

int netdial(int istcp, char *server, int port)
{//非 阻塞连接server,connect后会切换协程到IO fdtask
//``````
    taskstate("netdial");
    proto = istcp ? SOCK_STREAM : SOCK_DGRAM;
    if((fd = socket(AF_INET, proto, 0)) < 0){
        taskstate("socket failed");
        return -1;
    }
    fdnoblock(fd);
//1·······
    if(connect(fd, (struct sockaddr*)&sa, sizeof sa) < 0 && errno != EINPROGRESS){
        taskstate("connect failed");
        close(fd);
        return -1;
    }
    /* wait for finish */
    fdwait(fd, 'w');//等待处理完成,并且会释放CPU的。
//······

下面来看一下fdwait函数的实现, 这个函数类似于epoll_ctl的封装函数,只是libtask这里使用的是poll,而不是epoll。这个函数按需启动fdtask这个异步I/O控制协程,将当前FD加入到poll数组中。进行协程切换。

void
fdwait(int fd, int rw)
{//按需启动fdtask这个异步I/O控制协程,将当前FD加入到poll数组中。进行协程切换。
    int bits;

    if(!startedfdtask){
        startedfdtask = 1;
        taskcreate(fdtask, 0, 32768);//这个是IO等待poll的线程,所有阻塞IO都走这里进行监听,唤醒等
    }

    if(npollfd >= MAXFD){
        fprint(2, "too many poll file descriptors\n");
        abort();
    }

    taskstate("fdwait for %s", rw=='r' ? "read" : rw=='w' ? "write" : "error");
    bits = 0;
    switch(rw){
    case 'r':
        bits |= POLLIN;
        break;
    case 'w':
        bits |= POLLOUT;
        break;
    }

    //将这个FD挂入到pollfd里面,这里面是由fdtask协程进行等待唤醒等管理的、
    //等这个FD有事件的时候,会将本协程设置为可运行的状态,并且fdtask也会主动yeild让出CPU。
    polltask[npollfd] = taskrunning;
    pollfd[npollfd].fd = fd;
    pollfd[npollfd].events = bits;
    pollfd[npollfd].revents = 0;
    npollfd++;
    taskswitch();//注意这里并没有修改这个协程的运行状态,这样他下次还可能跑起来,而不是由IO协程唤起,这里是否是bug? ps: 见下面描述。
}

从上面的注释中可以看到,fdwait就是让协程在rw的可写或者可读事件上等待,所谓的等待,其实是协程切换,切换到其他协程运行。这里有点疑问的是taskswitch并没有修改当前协程的状态为不可执行,所以下次它还是可能会被自然唤醒,而不是由I/O协程唤醒的。这里是否会造成重复操作?知道的同学告诉我下。

不过这里多唤醒一次,当前协程也就是再次尝试I/O,基本还是会EAGAIN, 然后又调用fdwait,又睡下去。这样不会有bug,但会浪费CPU?

PS: 后来想了想,这个会的,因为当前协程在taskscheduler里面调度它运行的时候,使用了deltask(&taskrunqueue, t);//从待调度链表中移出来,调度它运行,因此现在我要再fdwait里面直接调用taskswitch();,那么当前这个协程是不会被加到taskrunqueue链表里面,也就没有机会得到执行。

那么什么时候得到执行呢?答案是:只有当有人主动将其加到taskrunqueue里面,才能执行,这个人就是fdtask I/O监听协程,这是唯一的机会。所以写代码的时候,如果要切换协程,一定得想清楚这一点,别知道怎么切换出去了,不知道什么时候该切换回来就悲剧了。

下面最后一点,来看一下taskcreate 这个后台的I/O事件通知协程的工作。执行函数为fdtask,,代码注释如下:

void
fdtask(void *v)
{
    int i, ms;
    Task *t;
    uvlong now;

    tasksystem();//把自己设置为系统级协程,不会taskexit退出
    taskname("fdtask");
    for(;;){
        /* let everyone else run */
        while(taskyield() > 0)
            ;
        /* we're the only one runnable - poll for i/o */
        errno = 0;
        taskstate("poll");
        if((t=sleeping.head) == nil){
            ms = -1;//没有人在sleep,所以就poll一直等待了,这个好危险啊,
            //如果上层不小心yeild了,并且没有dalay的,然后所有fd都没有活跃。那就完蛋了
        }else{
            /* sleep at most 5s */
            now = nsec();
            if(now >= t->alarmtime)
                ms = 0;
            else if(now+5*1000*1000*1000LL >= t->alarmtime)
                ms = (t->alarmtime - now)/1000000;
            else
                ms = 5000;
        }
        if(poll(pollfd, npollfd, ms) < 0){
            if(errno == EINTR)
                continue;
            fprint(2, "poll: %s\n", strerror(errno));
            taskexitall(0);
        }
        /* wake up the guys who deserve it */
        for(i=0; i<npollfd; i++){
            while(i < npollfd && pollfd[i].revents){
                taskready(polltask[i]);//将这些有情况的协程设置为可运行状态,这样这个for下一轮的时候就会调用taskyield主动让出CPU
                --npollfd;
                pollfd[i] = pollfd[npollfd];
                polltask[i] = polltask[npollfd];
            }
        }

        now = nsec();
        while((t=sleeping.head) && now >= t->alarmtime){
            deltask(&sleeping, t);//看看定时器有没有到时间的
            if(!t->system && --sleepingcounted == 0)
                taskcount--;
            taskready(t);
        }
    }
}

从上面可以看出,这个协程主要是做守护工作,帮助其他协程监听I/O可读可写事件,然后唤起对方;另外还有一个工作就是,处理sleep操作,在这里面叫delay,应用调用taskdelay就可以进行睡眠了。

实现用poll,因此写code时要注意,这里性能有问题的,最好改为epoll, 有空我试试吧。

上面函数唤起的方式实际上为:对有事件的协程调用taskready(polltask[i]);,设置可运行状态,然后主动调用while(taskyield() > 0) ;让其他协程得到运行机会,也就是从fdwait的taskswitch里面重新得到执行机会。

下面看下读写的时候的封装函数:


int
fdwrite(int fd, void *buf, int n)
{
    int m, tot;

    for(tot=0; tot<n; tot+=m){
        while((m=write(fd, (char*)buf+tot, n-tot)) < 0 && errno == EAGAIN)
            fdwait(fd, 'w');//关键:如果写入时返回EAGAIN说明差不多了,得过会才能写入。那么这里需要放入epoll,把本协程挂起
        if(m < 0)
            return m;
        if(m == 0)
            break;
    }
    return tot;
}

由此可以想到,实际上做I/O请求的时候,是类似epoll里面的不断读、写,直到eagain了,就放入poll里面等待事件。不同的是这里是放到fdtask专门的协程里面去等待的,而不是当前的I/O协程,这样可以换到其他协程执行CPU。

Share
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。