首页 > Redis > Redis源码学习-AOF数据持久化原理分析(0)

Redis源码学习-AOF数据持久化原理分析(0)

2013年6月5日 发表评论 阅读评论 9633次阅读    

Redis作为一个使用广泛的KV内存存储,其也支持一定的数据持久化,这里试着介绍一下Redis在源码层面对持久化的实现机制。

总的来说,Redis支持的将其数据库里面的KV数据存储到磁盘,但可能会有短时间的丢失。官网关于持久化的介绍可以参考这里“Redis Persistence”,这篇文章介绍一下其在代码层面的实现。

其支持2中不同的持久化机制:

  1. 第一种RDB数据快照持久化。RDB持久化实际上就是对数据库内容做快照,然后将快照存储到磁盘上面,这样就要去我们进行周期性的做快照,但是这种方式无法做到实时的存储,出现故障时只能恢复上一次做快照时的状态,因此比较有限。不过redis的主从同步也是利用RDB实现的,这个我们后续文章分析;
  2. 第二种AOF日志实时持久化。AOF=Append Only File,也就是不断追加写的文件。在这种情况下,Redis首先将数据库做个快照,将数据还原为跟客户端的协议格式的文本数据,然后将其存储到一个临时文件中,然后将其覆盖成正常的aof文件,并把这个过程中新增的命令追加到aof文件后面,从此之后,后续的从客户端过来的命令都会不断根据不同的安全级别写到磁盘里面去。这样就支持了实时的持久化,只是可能会有短时间内的数据丢失,对一般系统还是可以容忍的。

下面一步步介绍其实现的原理。

0、数据初始化

当redis启动时,如果打开了aof开关,也就是配置了:"appendonly on",那么就会从"appendfilename"指令指定的文件中加载数据库数据进行初始化;其调用流程为:

main()->loadDataFromDisk(),后者会判断server.aof_state == REDIS_AOF_ON,如果是,就调用loadAppendOnlyFile函数去加载数据文件的数据,加载的方法就是把文件内容读出来当客户端请求一样调用各个命令的cmd->proc(fakeClient);还原数据,其实就是进行操作重放。
如果没有配置"appendonly on",那么redis就会从RDB文件中加载数据。

/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == REDIS_AOF_ON) {
		//如果AOF打开了,那么优先从AOF文件中加载数据,因为AOF文件的数据比RDB中的数据要新。
        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
    } else {
    //否则就从RDB文件中加载数据。
        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
                (float)(ustime()-start)/1000000);
        } else if (errno != ENOENT) {
            redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
            exit(1);
        }
    }
}

1、新数据是如何持久化到磁盘的。

我们知道,redis的main函数最后调用aeMain进入消息处理循环,其循环已eventLoop->stop为终止条件,不断执行。循环每次都会调用beforeSleep函数,然后再调用aeProcessEvents函数进行SOCKET事件的监听和处理。beforeSleep函数跟我们之前的刷磁盘有关,这个我们后面讲,先讲讲aeProcessEvents函数里面关于AOF的逻辑。下面看一下aeMain代码,很简单:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {//stop ==1 停止服务
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);//调用了beforeSleep函数。

        aeProcessEvents(eventLoop, AE_ALL_EVENTS);//处理各种事件。
    }
}

监听事件相关的代码比较多,我们这里不关新处理逻辑,直奔AOF相关的代码。aeProcessEvents-》readQueryFromClient-》processInputBuffer-》processCommand-》call。最后走到call,其调用完对应的命令的处理函数后,会调用propagate的过程,看看下面的代码:

/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
//processCommand函数调用这里进行进行命令函数的执行。
 //···········
    /* Call the command. */
    redisOpArrayInit(&server.also_propagate);//繁殖;扩大
    dirty = server.dirty;
    c->cmd->proc(c);
	//在proc函数里面,如果指令觉得这是一条修改的指令,那么久需要吧这条指令
	//写到AOF文件中,因此它会增加server.dirty的值。从而在下面的时候,flag增加了AOF,REPL的标志。
	//一般只有set接口会设置这条指令。
    dirty = server.dirty-dirty;
    duration = ustime()-start;
//······················
//关于AOF,google之,用来写入记录文件的,可以从里面恢复redis的数据。
//上面处理完指令后,这里需要处理一下AOF的事情了,这里面的数据子进程不知道的,所以得放到额外的缓冲区去,
//等子进程写完之前的数据后,将这部分改动追加到后面去。
    /* Propagate the command into the AOF and replication link */
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;

        if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
            flags |= REDIS_PROPAGATE_REPL;
//如果dirty不为0,也就是说当前这条指令是set操作,所以需要追加到AOF文件中。
        if (dirty)//
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
		//下面将这条指令的改动广播到AOF相关代码中。以备后续在AOF日志里面追加这条改动。
        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }
}
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
* flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{//call()等函数处理完指令后,调用这里将指令加入到AOF缓冲里面,以供后续追加到AOF文件
//如果aof_state开关打开了,并且flags职位了写入AOF文件的标志,那么久需要将本指令加入到AOF缓存去。
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);

	//将这条指令发送给所有的slaves,放到其缓冲里面以待发送。就当它是一个服务端一样。
if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

call指令调用propagate函数,后者判断server.aof_state != REDIS_AOF_OFF的时候,就会进入去准备AOF的相关数据,也就是调用feedAppendOnlyFile,去将客户端传递的参数,数据转换为aof缓冲区,存起来。

feedAppendOnlyFile这个函数的调用者会判断是否AOF打开了,如果关闭了就不会调用的。函数首先根据客户端的参数,准备了一个buf,其就是跟客户端协议一致的数据,可以用来重放请求。具体怎么准备的我们这里不做过多介绍,具体看下面代码:

//feedAppendOnlyFile这个函数的调用者会判断是否AOF打开了,如果关闭了就不会调用的。
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
	//将这条指令还原成字符串表示,然后将其追加到server.aof_buf 字符串后面,
	//在beforeSleep的末尾会调用flushAppendOnlyFile将server.aof_buf里面的数据写入文件的。
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appendend. To issue a SELECT command is needed. */
    if (dictid != server.aof_selected_db) {
		//如果当前选择的库不是目标库,则在指令前面插入一个SELECT db的指令。
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;//修改当前选择的db
    }

    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else {
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
 //根据argc,argv恢复客户端发送过来的指令,返回之。
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

准备好客户端这条请求的数据缓冲后,就需要将数据保存起来。这里有2部分需要关注,aof_buf实时增量数据缓存 和aof_rewrite_buf_blocks快照保存期间的DIFF数据缓存。

0、aof_buf实时增量数据缓存

这个是做什么用的呢?只要server.aof_state == REDIS_AOF_ON,也就是AOF是常规打开的,既没有关闭,也不是在快照过程中,那么我们将这条客户端数据放到aof_buf的后面,不断追加。如下:

//到这里已经拼接好了客户端发送过来的指令的字符串,放在buf。
//在beforeSleep的末尾会调用flushAppendOnlyFile将server.aof_buf里面的数据写入文件的。
    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    if (server.aof_state == REDIS_AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

数据追加到了server.aof_buf,那么什么时候写入磁盘呢?答案是beforeSleep,本文开头我们看了aeMain在每次循环之前,或者说aeProcessEvents处理一轮数据之后,会调用beforeSleep函数。做一些wait之前做的事情,其末尾就好调用flushAppendOnlyFile,将之前的指令的改动,写入到AOF文件中,内容在server.aof_buf里面了,feedAppendOnlyFile函数在判断如果打开了AOF标志的话会保存到上面的字符串里面的。

一旦调用beforeSleep那就说明redis已经epoll_wait处理过一批活跃的句柄了,那么server.aof_buf上面应该就包含了这些活跃的句柄的指令。下面就需要写到磁盘里面去。

这里redis为了提供不同的安全级别,支持最多每秒fsync刷新,每次写都刷新,或者不主动fsync,但是fsync会降低性能,所以看具体应用考虑,是通过配置appendfsync no/everysec/always来控制的,代码里面根据下面三个宏去吧,存放在server.aof_fsync。

#define AOF_FSYNC_NO 0
#define AOF_FSYNC_ALWAYS 1
#define AOF_FSYNC_EVERYSEC 2

函数开头先调用bioPendingJobsOfType查询一下REDIS_BIO_AOF_FSYNC类型的后台任务挂载了(比如之前的调用需要fsync,但是目前还没有完成),赋值到sync_in_progress,也就是bioInit函数创建的几个后台线程,用来做可能会block的函数调用的,这个我们待会介绍。

然后如果是EVERYSEC每秒刷新的模式,并且force参数没有设置为1强制write/fsync刷新:

如果有REDIS_BIO_AOF_FSYNC类型的后台任务没有处理完,那么判断这一次刷新时间是不是跟上次相差超过2秒,如果是就需要刷新,否则可以等会再write/fsync,因为上次的fsync还没有完成,比较慢;

从这里我们可以看到,redis提供最长2秒的数据丢失保证。我们可以看看作者关于AOF的解释“Redis persistence demystified”中可以看到:

appendfsync everysec

In this configuration data will be both written to the file using write(2) and flushed from the kernel to the disk using fsync(2) one time every second. Usually the write(2) call will actually be performed every time we return to the event loop, but this is not guaranteed.

However if the disk can't cope with the write speed, and the background fsync(2) call is taking longer than 1 second, Redis may delay the write up to an additional second (in order to avoid that the write will block the main thread because of an fsync(2) running in the background thread against the same file descriptor). If a total of two seconds elapsed without that fsync(2) was able to terminate, Redis finally performs a (likely blocking) write(2) to transfer data to the disk at any cost.

So in this mode Redis guarantees that, in the worst case, within 2 seconds everything you write is going to be committed to the operating system buffersand transfered to the disk. In the average case data will be committed every second.

从上面可以看到,如果配置的是EVERYSEC模式,那么redis会最多隔2秒write、fsync一次数据,如果当前服务器忙于fsync,那就好延迟2秒write,fsync。下面看看前面部分的代码:

/* Write the append only file buffer on disk.
 *
 * Since we are required to write the AOF before replying to the client,
 * and the only way the client socket can get a write is entering when the
 * the event loop, we accumulate all the AOF writes in a memory
 * buffer and write it on disk using this function just before entering
 * the event loop again.
 *
 * About the 'force' argument:
 *
 * When the fsync policy is set to 'everysec' we may delay the flush if there
 * is still an fsync() going on in the background thread, since for instance
 * on Linux write(2) will be blocked by the background fsync anyway.
 * When this happens we remember that there is some aof buffer to be
 * flushed ASAP, and will try to do that in the serverCron() function.
 *
 * However if force is set to 1 we'll write regardless of the background
 * fsync. */
void flushAppendOnlyFile(int force) {
//beforeSleep函数每次在aeMain等待epoll之前调用,从而调用这里带0的参数,非强制的刷aof文件。
//将server.aof_buf的内容刷到磁盘AOF文件后面。如果有EVERYSEC标志,
//则在后端bio进程的队列里面挂在一个事件,后续bioProcessBackgroundJobs函数会处理这个的,进行fsync
//stopAppendOnly调用这里会force==1,强制刷新
    ssize_t nwritten;
    int sync_in_progress = 0;

    if (sdslen(server.aof_buf) == 0) return;
	//看看现在是不是还有REDIS_BIO_AOF_FSYNC类型的刷新操作在后台。
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {//如果不是强制刷新,就
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
			//当前服务器比较忙,上次fsync还没有完成,所以我们这次延迟一下write,fsync
            if (server.aof_flush_postponed_start == 0) {//还没开始,这是第一次进入。
                /* No previous write postponinig, remember that we are
                 * postponing the flush and return. */
                //记住这次的时间,后续再看看是不是超过了2秒,如果是就进行后面的刷新操作处理。
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (erver.unixtime - server.aof_flush_postponed_start < 2) {                 /* We were already waiting for fsync to finish, but for less                  * than two seconds this is still ok. Postpone again. */                 return;             }             /* Otherwise fall trough, and go write since we can't wait              * over two seconds. */             server.aof_delayed_fsync++;             redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");         }     } //到这里后,肯定是等的超过了2秒,或者后面没有进程在刷新AOF.     /* If you are following this code path, then we are going to write so      * set reset the postponed flush sentinel to zero. */     server.aof_flush_postponed_start = 0; 

走完上面的代码后,下面就剩下write,fsync了。由于redis是直接将所有这一批的命令放入server.aof_buf字符串的,所以一次write()函数调用就行了,如果调用写入的字节数不等于总大小,则exit(1)退出程序,够猛。 write完成后增加server.aof_current_size的大小,这个是用来做自动AOF rewrite时判断的,走个题,说下AOF rewrite,我们知道,如果一直这样AOF下去,把所有客户端命令都重放到AOF文件内,势必导致AOF文件非常大,不断增大,而且可能会有很多重复的无用命令,所以我们需要定期的将AOF文件进行覆盖,用最新的快照覆盖它,这样就能有效减少文件大小。这个操作是在serverCron定时任务里面做的。具体看下面代码:

 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//········
 /* Trigger an AOF rewrite if needed */
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
         {
            long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
	//如果AOF文件增长超过了指定百分比,那么需要自动rewrite aof文件了
            if (growth >= server.aof_rewrite_perc) {
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
         }
//········
}

继续上面的write,写完后,数据就放到操作系统缓存里面去了,或者说文件系统缓存。然后就是根据不同的持久化级别,判断是否要fsync。
如果是AOF_FSYNC_ALWAYS,那么写完后就直接aof_fsync(server.aof_fd);当然这样就肯定会导致服务器卡的,所以最好别这么用。

如果是AOF_FSYNC_EVERYSEC模式,那么如果当前没有未完成的fsync任务,那么就调用aof_background_fsync,继而调用bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);放一个后台任务,给后台线程去fsync,这样做的原因是工作线程如果去fsync,会卡住redis的处理,这样性能很低。早期的redis确实是这么干的,不过貌似到2.0之后就把这种可能会引起block的操作放到后台线程做了,实现了个异步。看看flushAppendOnlyFile余下的代码:

    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike */
    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    if (nwritten != (signed)sdslen(server.aof_buf)) {
//如果上面的写文件失败,那么直接exit进程,这个好干脆啊·····
//写失败肯定是致命错误么,不一定吧,比如缓存不够,磁盘慢了等。
//··················
        exit(1);
    }
	//统计AOF文件的大小,用来判断是否需要自动AOF rewrite文件了
    server.aof_current_size += nwritten;

    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;
//调用fdatasync将数据刷到磁盘去,刷到磁盘缓冲区了。并没有写到磁盘介质中。
//真刷,在主进程中干这个事情太危险了,会阻塞的。
    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
      //每隔一秒挂一个刷新磁盘操作到bio_jobs[REDIS_BIO_AOF_FSYNC]数组里面
      //供后端的定时任务进程去处理。这个进程执行bioProcessBackgroundJobs函数。
      //如果当前正有挂载的fsync任务没有完成,就不挂载了,服务器比较忙。
        if (!sync_in_progress)
			aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
    }
}

到这里,介绍完了redis是如何将不断发送过来的数据写到磁盘中的。总结一句话:分批将客户端发送的命令重放写到aof文件中去,然后后台fsync。
还剩下一个问题:AOF rewrite,即重新AOF,缩短文件大小。限于篇幅在后面一篇文章介绍。
next移步:"Redis源码学习-AOF数据持久化原理分析(1)"

Share
分类: Redis 标签: , ,

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。