首页 > C/C++, Memcached, 各种Server > memcached源码学习-线程框架

memcached源码学习-线程框架

2014年7月8日 发表评论 阅读评论 3817次阅读    

看了看memcached, memcached 主要的线程框架是master-slave的主线程-工作线程模式,单进程,多线程,之间通过管道和链表通信,基本就是这样。

下面具体看下代码。

worker工作线程

memcached服务器使用libevent库进行网络事件的监听等,在main函数的开头,解析完所有的配置参数后,主线程会先创建一个struct event_base *main_base 全局结构,代表这是主线程的event结构,用来进行循环等的。接下来做了一些简单的hash等初始化,然后调用了一个比较关键的函数: thread_init:


    /* initialize main thread libevent instance */
    main_base = event_init();

    /* initialize other stuff */
    stats_init();
    assoc_init(settings.hashpower_init);
    conn_init();//默认分配200个连接结构
    slabs_init(settings.maxbytes, settings.factor, preallocate);

    /*
     * ignore SIGPIPE signals; we can use errno == EPIPE if we
     * need that information
     */
    //当服务器close一个连接时,若client端接着发数据。根据TCP协议的规定,会收到一个RST响应,
    //client再往这个服务器发送数据时,系统会发出一个SIGPIPE信号给进程,告诉进程这个连接已经断开了,不要再写了。
    if (sigignore(SIGPIPE) == -1) {
        perror("failed to ignore SIGPIPE; sigaction");
        exit(EX_OSERR);
    }
    /* start up worker threads if MT mode */
    thread_init(settings.num_threads, main_base);//默认4个线程,启动其libevet结构。设置监听管道,管道用来让主线程告诉自己,有新事>
件了
    //上面工作线程已经准备好了,下面主线程终于可以设置自己的东西了,比如打开LISTEN socket,进行accepted监听。
    //memcached的模式为:主线程负责accept,完了管道告诉工作线程去处理实际的请求。2着不同的event_base

    if (start_assoc_maintenance_thread() == -1) {//启动维护线程
        exit(EXIT_FAILURE);
    }

上面thread_init 就是创建了配置的工作线程,来看看其内容。memcached线程之间对各个数据的访问都是用互斥锁,条件变量等。 然后为每个线程申请一个LIBEVENT_THREAD 结构, 来看看这个结构的内容:

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
    uint8_t item_lock_type;     /* use fine-grained or global item lock */
} LIBEVENT_THREAD;

很简单,线程ID, 属于这个线程的event_base, new_conn_queue, 这些后面再介绍。上面变量中,最重要的就是notify_receive_fd 和notify_send_fd, 他们是用来进行管道通信的。一个专门读,一个专门写。然后调用setup_thread 将notify_receive_fd 加入到event里面,设置相关的回调。

void thread_init(int nthreads, struct event_base *main_base) {
    //初始化线程之间的锁,初始化工作线程,为每个线程分配里边event结构, 并且启动event_base_loop()循环
    //工作线程监听notify_receive_fd,  可读回调为thread_libevent_process
//`````

    //为每个线程申请一个LIBEVENT_THREAD线程结构,存放event等
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }
    //
    dispatcher_thread.base = main_base;
    dispatcher_thread.thread_id = pthread_self();

    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {//新建一个双向管道
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];//这是接收的,工作线程监听这个fd的可读事件即可
        threads[i].notify_send_fd = fds[1];//这是发送的

        //工作线程初始化,初始化里边event, 将notify_receive_fd加入event里面回调为thread_libevent_process
        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    /* Create threads after we've done all the libevent setup. */
    for (i = 0; i < nthreads; i++) {//真正创建工作线程,执行函数为worker_libevent
        create_worker(worker_libevent, &threads[i]);
    }

上面setup_thread 用来初始化事件的。他负责将notify_receive_fd 放到线程自有的event_base, 里面。并初始化me->new_conn_queue, 前者用来通知对方,有新连接了,后者用来记录新连接的具体内容是什么。具体怎么通知后面写下。

static void setup_thread(LIBEVENT_THREAD *me) {
    //工作线程初始化,初始化里边event, 将notify_receive_fd加入event里面回调为thread_libevent_process
    me->base = event_init();//创建一个属于线程自己的event结构,这里也是一个循环哈
    if (! me->base) {
        fprintf(stderr, "Can't allocate event base\n");
        exit(1);
    }

    //下面将我自己的me->notify_receive_fd管道加入到事件循环里面,监听可读事件,回调为thread_libevent_process
    /* Listen for notifications from other threads */
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);///设置关系

    if (event_add(&me->notify_event, 0) == -1) {//加入
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }

    me->new_conn_queue = malloc(sizeof(struct conn_queue));//上面me->notify_receive_fd用来告诉对方,有新东西了,实际上东西放在new
_conn_queue里面的

上面me->base = event_init() 指令为当线程分配一个属于线程自己的I/O位置了。具体里边event怎么使用旧不多说了。

注意后面的event_set 函数的参数,设置可读写的执行函数为thread_libevent_process,这样等有事件的时候,线程独立的event_base就会触发返回,比如epoll_wait返回了。

thread_init 最后调用create_worker 函数,真正的创建线程,设置线程函数等。这个时候由于event_base已经设置好了各项参数,将事件加入epoll等事件监听函数,所以可以真正创建线程了,这个由create_worker 完成,其实就是个普通的pthread_create 创建线程。

master线程

master线程也就是主线程, 在创建好工作线程后,会调用server_sockets等函数,打开监听句柄等操作。 调用关系为: main-> server_sockets->server_socket; 后者负责创建一TCP连接,进行监听事件。 这里注意memcache支持TCP 和UDP ,这里只讲TCP的情况;

static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {
    //创建socket,然后根据其是TCP还是UDP,设置相关的结构,到目前为止就可以监听了</pre>
//·····

        if (IS_UDP(transport)) {//如果是UDP协议,那么直接将这个监听端口分破给num_threads_per_udp个线程,让他们一起处理
            //因为是数据报文,接收的时候是一个报文一个报文接收,所以随便某个线程接收了都行,不需要accept。
        } else {//tcp模式
            //将这个TCP连接放到main_base事件循环中,设置为EV_PERSIST模式,回调为:event_handler
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;//挂载到链表头部
        }

conn_new函数负责初始化一个连接的conn *c; 结构,这里memcache对于连接的查找方法为直接用fd大小去索引数组,这样速度快很多倍,而且这个数目不会太大的。conn_new里面大部分是数据字段初始化操作。主要就是讲参数里面的fd句柄加入到主线程的 event_base中:

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    //thread_libevent_process等调用这里,设置一个连接的客户端结构,然后将其加入到event里面,执行函数统一为event_handler
    c = conns[sfd];//直接用fd进行数组索引

    //加入到事件循环里面。,不管是LISTEN SOCK,还是客户端的SOCK,都是这个执行函数。只有工作线程的管道不是这个
    //工作线程的管道执行函数是thread_libevent_process,也就是本函数的调用者之一
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);//用参数,填充&c->event
    event_base_set(base, &c->event);//ev->ev_base = base, 就让event记住了所属的base
    c->ev_flags = event_flags;//其实 c->event->ev_events也记录了这个事件集合的。

    if (event_add(&c->event, 0) == -1) {//加到epoll里面去
        perror("event_add");
        return NULL;
    }
}

这样,一个监听SOCKET就建立起来了,并且设置了可读事件的毁掉函数为event_handler, 这个函数及其重要,是主线程的线程循环。

主线程回到main函数后,就进入了事件循环了:


    /* Drop privileges no longer needed */
    drop_privileges();

    //进入事件循环,监听sock的回调为 event_handler,实际上等于drive_machine, 工作线程有自己的event_base_loop,各自循环自己的,由一>
个管道沟通。
    //主线程accept一个连接后,调用dispatch_conn_new分配给一个工作线程,轮训
    /* enter the event loop */
    if (event_base_loop(main_base, 0) != 0) {
        retval = EXIT_FAILURE;
    }

此时这个事件循环里面只有监听SOCK的事件的,于是如果有新连接到来的话,event_handler 就会被调用,实际上基本等于drive_machine 函数被调用。

主线程事件循环

drive_machine  函数相当于一个自动机,由连接的不同状态进行循环处理,直到完毕。

其大体结构为:

static void drive_machine(conn *c) {
//····
    while (!stop) {

        switch(c->state) {//根据这个SOCK的状态进行处理,
        case conn_listening://对于监听SOCK, 在server_socket里面设置为conn_listening状态了的
        case conn_waiting:
        case conn_read:
        case conn_write:
}
//````

这里只关注conn_listening 新连接的事件,也就是如果这个连接的状态是conn_listening,那么说明它是LISTEN socket了。这个是在conn_new的参数里面指定的。 来看看董自动机的新连接处理方法:

    while (!stop) {

        switch(c->state) {//根据这个SOCK的状态进行处理,
        case conn_listening://对于监听SOCK, 在server_socket里面设置为conn_listening状态了的
            addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
            if (use_accept4) {
                sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);//快速设置SOCK_NONBLOCK状态
            } else {
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            }
#else
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
            if (settings.maxconns_fast &&
//·····
            } else {//OK,将这个连接分配给线程去处理吧
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;

可以看出上面的新连接也还算挺简单的,就accept一个新连接,然后调用dispatch_conn_new 函数,进行分发,分发给其他地方的线程去处理真正的请求,这里只是简单的accept了连接。调用dispatch_conn_new 的时候,设置连接状态为
啊conn_new_cmd, 也就是等待命令状态.

主线程接收这个连接后,怎么分配给其他工作线程处理呢?是通过dispatch_conn_new 函数完成的。

方法为轮训选一个线程,然后将当前连接的状态, 缓存区等放到一个CQ_ITEM 结构里面,然后将其加入到线程的thread->new_conn_queue 链表里面的,注意cq_push 函数是得加锁的。 然后调用write(thread->notify_send_fd, buf, 1) 简单的将一个"c"  字符写入到该线程的管道中,这样该线程一定会被换起来,然后就能检测到这个新连接,然后就可以为其提供服务了。

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    //分配给不同的线程去处理,通过管道
    CQ_ITEM *item = cqi_new();
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let's try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }

    int tid = (last_thread + 1) % settings.num_threads;//轮训分配

    LIBEVENT_THREAD *thread = threads + tid;//找到那个线程,然后将这个fd加入到这个线程的连接队列里面

    last_thread = tid;

    item->sfd = sfd;//填充结构
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    cq_push(thread->new_conn_queue, item);//加锁,放到队列尾部

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    if (write(thread->notify_send_fd, buf, 1) != 1) {//写一个字节,告诉他有新东西了
        perror("Writing to thread notify pipe");
    }
}

稍微回顾一下:

  • memcached的线程框架为:单进程,多线程,master-worker,二者用管道通信,链表传递数据。
  • 使用libevent 开发框架, master-worker分别拥有不同的event_base,因此他们能各自进行事件监听, 相对还是挺清晰的;
  • master负责建立客户端连接,worker负责跟实际客户端进行数据交互,处理请求。

所以这里master只是一个分发器而已,具体请求由工作线程去处理,解析等。这部分后续记录吧。

Share

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。