Redis 7.0 Multi Part AOF 源码解析

您所在的位置:网站首页 remaintodo与doing Redis 7.0 Multi Part AOF 源码解析

Redis 7.0 Multi Part AOF 源码解析

2023-03-02 02:12| 来源: 网络整理| 查看: 265

文章目录 AOF1):写入2):重写2)1):打开新的incr文件,并替换为aof文件描述符2)2):fork线程执行重写2)3):aof重写 3):执行命令时追加aof缓冲区逻辑4):aof重写完成4)1):获取新的base名,并将以前的设置为history4)2):将incr类型文件移动到history4)3):删除历史文件 5):初次启动redis加载aof文件逻辑 todo加载base aof 总结

AOF

首先咱们需要开启aof配置

appendonly yes 开启aof appendfsync always 每次都执行

redis这里可以设置三种策略模式,根据模式的不同决定不同的aof文件的写入和同步时机

always:每次事件循环会执行一次,写入aof文件之后会执行一次同步。缺点速度最慢,优点故障停机也丢失一个事件循环数据everysec:每次事件循环会执行一次写,最少间隔一秒执行一次同步,在执行sync则阻塞2秒进行写入。no:每次事件循环执行一次,至于何时同步取决于操作系统。 1):写入

话不多说直接看源码aof.c#flushAppendOnlyFile

aof缓冲区没有任何数据处理 当策略为每秒执行一次,且sync偏移量不等aof缓冲区当前大小,且当前时间>上次同步时间,且没有执行fsync,执行一次fsync 当策略为每秒执行一次,且不强制刷新,且有fsync在后台执行。这里操作主要保证推迟2秒才允许后续进行aof写入操作。 aof延迟时间为0,则初始化为当前时间,并返回。以延迟时间为基准,后续2秒时间内,进入此方法直接返回。 写入aof文件对于写入数据低于aof缓存预期处理 写入失败则记录日志,标记失败状态。低于预期,尝试移除内容到上次aof长度,也就是移除添加的数据如果为ALWAYS策略,则退出。其他策略,尝试移除添加的数据失败,则剔除aof缓冲区写的数据,并返回。 这里就是更新可读数据下标,表示nwritten后面的数据可读。没有这一步,则后续写入aof缓存的数据将丢失。 写入成功,如果上次写入状态为错误,则更新为成功如果aof缓冲区小则重用,否则释放重新分配一个。如果重写在进行时不需要同步开启,且有子进程在工作,则不进行sync同步数据 策略为ALWAYS,执行fsync,并更新fsync偏移和时间。策略为everysec,且过了上次fsync1秒钟,且fsync没有运行,则异步同步,更新fsync偏移和时间。 void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; mstime_t latency; //1:aof缓冲区没有任何数据 if (sdslen(server.aof_buf) == 0) { /* Check if we need to do fsync even the aof buffer is empty, * because previously in AOF_FSYNC_EVERYSEC mode, fsync is * called only when aof buffer is not empty, so if users * stop write commands before fsync called in one second, * the data in page cache cannot be flushed in time. */ if (server.aof_fsync == AOF_FSYNC_EVERYSEC && //策略为每秒执行一次 server.aof_fsync_offset != server.aof_current_size && //sync偏移量不等aof缓冲区当前大小 server.unixtime > server.aof_last_fsync && //当前时间>上次同步时间 !(sync_in_progress = aofFsyncInProgress())) { //没有执行fsync goto try_fsync; //执行一次fsync } else { return; } } //2:针对每次间隔1s同步的处理 if (server.aof_fsync == AOF_FSYNC_EVERYSEC) { // 是否有 SYNC 正在后台进行? sync_in_progress = aofFsyncInProgress(); } //每次间隔1S,且不强制刷新 if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ //有sync在后台运行 if (sync_in_progress) { //延迟执行aof为0 if (server.aof_flush_postponed_start == 0) { /* No previous write postponing, remember that we are * postponing the flush and return. */ //记录上次延迟执行aof时间 server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start usleep(server.aof_flush_sleep); } latencyStartMonitor(latency); //3:写入数据,并返回写入总个数 nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); latencyEndMonitor(latency); /* We want to capture different events for delayed writes: * when the delay happens with a pending fsync, or with a saving child * active, and when the above two conditions are missing. * We also use an additional event name to save all samples which is * useful for graphing / monitoring purposes. */ if (sync_in_progress) { latencyAddSampleIfNeeded("aof-write-pending-fsync",latency); } else if (hasActiveChildProcess()) {//是否有子进程工作 latencyAddSampleIfNeeded("aof-write-active-child",latency); } else { latencyAddSampleIfNeeded("aof-write-alone",latency); } latencyAddSampleIfNeeded("aof-write",latency); /* We performed the write so reset the postponed flush sentinel to zero. */ //重置因sync而延迟等待的时间 server.aof_flush_postponed_start = 0; //4:写入数据低于aof缓存预期处理 if (nwritten != (ssize_t)sdslen(server.aof_buf)) { static time_t last_write_error_log = 0; int can_log = 0; /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */ //4.1:将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒 if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) { can_log = 1; last_write_error_log = server.unixtime; } /* Log the AOF write error and record the error code. */ //4.2:写入失败,则记录日志,且更新上次aof写入状态 if (nwritten == -1) { if (can_log) { serverLog(LL_WARNING,"Error writing to the AOF file: %s", strerror(errno)); } //标记写入失败 server.aof_last_write_errno = errno; } else { if (can_log) { serverLog(LL_WARNING,"Short write while writing to " "the AOF file: (nwritten=%lld, " "expected=%lld)", (long long)nwritten, (long long)sdslen(server.aof_buf)); } //4.3:移除内容到上次aof长度,也就是移除添加的数据 if (ftruncate(server.aof_fd, server.aof_last_incr_size) == -1) { if (can_log) { serverLog(LL_WARNING, "Could not remove short write " "from the append-only file. Redis may refuse " "to load the AOF the next time it starts. " "ftruncate: %s", strerror(errno)); } } else { /* If the ftruncate() succeeded we can set nwritten to * -1 since there is no longer partial data into the AOF. */ nwritten = -1; } //标记写入数据量不一致错误 server.aof_last_write_errno = ENOSPC; } /* Handle the AOF write error. */ //4.4:处理aof写入错误 if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* We can't recover when the fsync policy is ALWAYS since the reply * for the client is already in the output buffers (both writes and * reads), and the changes to the db can't be rolled back. Since we * have a contract with the user that on acknowledged or observed * writes are is synced on disk, we must exit. */ serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); exit(1); } else { /* Recover from failed write leaving data into the buffer. However * set an error to stop accepting writes as long as the error * condition is not cleared. */ server.aof_last_write_status = C_ERR; /* Trim the sds buffer if there was a partial write, and there * was no way to undo it with ftruncate(2). */ if (nwritten > 0) { //走到这里说明将缓冲区数据写入aof文件了,但是写入长度和aof缓冲长度不一致,这说明有并发写入。 server.aof_current_size += nwritten; //将aof文件内容截取到上次写入数据失败,这里便保留aof文件数据 server.aof_last_incr_size += nwritten; sdsrange(server.aof_buf,nwritten,-1);//同时将缓冲区数据从nwritten截取数据并覆盖原来字符串。 } return; /* We'll try again on the next call... */ } } else { /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ //5:写入成功,如果上次写入状态为错误,则更新为成功 if (server.aof_last_write_status == C_ERR) { serverLog(LL_WARNING, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = C_OK; } } //更新aof当前大小 server.aof_current_size += nwritten; //增量记录aof写入个数 server.aof_last_incr_size += nwritten; /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ //6:如果aof缓冲区小则重用 if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) //释放缓存内容 sdsfree(server.aof_buf); //重新申请一个缓存 server.aof_buf = sdsempty(); } try_fsync: /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ //7:如果重写在进行时不需要同步开启,且有子进程在工作,则不进行sync if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess()) return; /* Perform the fsync if needed. */ //8.策略为每次事件循环执行一次 if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* redis_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); /* Let's try to get this data on the disk. To guarantee data safe when * the AOF fsync policy is 'always', we should exit if failed to fsync * AOF (see comment next to the exit(1) after write error above). */ //执行sync if (redis_fsync(server.aof_fd) == -1) { //失败,记录失败且退出 serverLog(LL_WARNING,"Can't persist AOF for fsync error when the " "AOF fsync policy is 'always': %s. Exiting...", strerror(errno)); exit(1); } latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); //记录aof执行fsync的偏移量 server.aof_fsync_offset = server.aof_current_size; //更新最近一次fsync时间 server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { //9:策略为每秒执行一次,且过了上次fsync1秒钟,则同步 //fsync没有后台执行 if (!sync_in_progress) { //后台执行 aof_background_fsync(server.aof_fd); //记录aof执行fsync的偏移量 server.aof_fsync_offset = server.aof_current_size; } //更新最近一次fsync时间 server.aof_last_fsync = server.unixtime; } } 2):重写

当写入数据频繁,会造成aof过大。

解决办法:

重写aof,创建一个新的aof文件,新aof和旧aof保存的数据状态相同,不同的是新aof没有多余的指令.而且新的aof的指令是通过读取数据库状态,然后写入文件的.当单个命令写入元素过多,超过64,则分割成多个命令写入.

为避免重写期间导致客户端无法写入数据,redis开启一个新的子进程处理重写。

因为引入了子进程,衍生出了一个新的问题,因为fork使用写时复制,若在重写期间,有新的命令写入,会造成重写完成后,与数据库数据不一致问题.

解决办法

redis设计与实现里面的代码逻辑

新建子进程重写,主进程返回。重写期间,引入一个AOF重写缓冲区,记录重写期间追加的命令重写完成,定时器会收到信号,将重写缓冲区数据刷入临时文件,然后重命名,原子替换现有AOF。

目前redis版本的逻辑

新建子进程重写,主进程返回。重写期间,执行任何修改命令则添加到重写缓冲区,第一次写入缓冲区会新建一个事件,该事件用于通过pipe刷入数据到子线程中。子线程重写完毕,会从pipe获取数据,然后刷入临时文件,当1秒内20次没获取数据则通知主进程不在发送数据。重写完成,定时器会收到信号,将重写缓冲区数据刷入临时文件,然后重命名。关闭管道

7.0 Multi Part AOF的逻辑 todo

aof.c#rewriteAppendOnlyFileBackground

当执行BGREWRITEAOF命令时会进入此方法。步骤如下

有子线程在执行则退出。redis中同一时刻只能一个子线程执行rdb,aof或者其他操作。这样避免瞬间io剧增当初次重写时aof目录为空,这里会进行创建。aof db设置-1,强调下一次执行feedAppendOnlyFile需要执行select命令强制执行aof fsync,这里走的是aof写入的逻辑。执行结束后aof缓冲区会刷入aof文件中,缓冲区数据会清空。打开新的incr文件,并替换为aof文件描述符。这样redis后续执行修改操作,执行aof同步时,都会写入incr文件中。fork子线程进行重写主进程记录重写时间累加重写次数并返回 int rewriteAppendOnlyFileBackground(void) { pid_t childpid; //1:有子进程执行退出。 if (hasActiveChildProcess()) return C_ERR; //2:当初次重写时aof目录为空,这里会进行创建。 if (dirCreateIfMissing(server.aof_dirname) == -1) { serverLog(LL_WARNING, "Can't open or create append-only dir %s: %s", server.aof_dirname, strerror(errno)); server.aof_lastbgrewrite_status = C_ERR; return C_ERR; } /* We set aof_selected_db to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command. */ //3:aof db设置-1,强调下一次执行feedAppendOnlyFile需要执行select命令 server.aof_selected_db = -1; //4:强制执行aof fsync flushAppendOnlyFile(1); //5:打开新的incr文件,并替换为aof文件描述符,这样redis后续的修改操作都会写入incr文件中。 if (openNewIncrAofForAppend() != C_OK) { server.aof_lastbgrewrite_status = C_ERR; return C_ERR; } //6:累加aof重写次数 server.stat_aof_rewrites++; //7:fork线程执行重写 if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) { //子进程处理 char tmpfile[256]; /* Child */ //设置进程名 redisSetProcTitle("redis-aof-rewrite"); redisSetCpuAffinity(server.aof_rewrite_cpulist); //格式化aof文件名 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); //执行重写 if (rewriteAppendOnlyFile(tmpfile) == C_OK) { serverLog(LL_NOTICE, "Successfully created the temporary AOF base file %s", tmpfile); sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite"); //发送成功信息 exitFromChild(0); } else { //发送失败信号 exitFromChild(1); } } else { /* Parent */ //8:主进程处理 if (childpid == -1) { server.aof_lastbgrewrite_status = C_ERR; serverLog(LL_WARNING, "Can't rewrite append only file in background: fork: %s", strerror(errno)); return C_ERR; } serverLog(LL_NOTICE, "Background append only file rewriting started by pid %ld",(long) childpid); server.aof_rewrite_scheduled = 0; //记录aof重写时间 server.aof_rewrite_time_start = time(NULL); return C_OK; } return C_OK; /* unreached */ } 2)1):打开新的incr文件,并替换为aof文件描述符

具体逻辑如下:

关闭aof功能则直接返回打开新的incr文件,并复制清单对象,将新的incr对象添加清单。构建incr 文件具体路径,并打开该文件清单数据刷入磁盘异步关闭aof文件描述符,执行到这里上层方法已经强制执行了fsync,所以关闭没影响。变更aof文件描述符为incr文件释放原来的aofManifest对象,更新为临时am对象 //打开新的incr文件,清单数据刷盘,然后关闭aof文件,并替换为incr文件,更新清单对象 int openNewIncrAofForAppend(void) { serverAssert(server.aof_manifest != NULL); int newfd = -1; aofManifest *temp_am = NULL; sds new_aof_name = NULL; /* Only open new INCR AOF when AOF enabled. */ //1:关闭aof功能则直接返回 if (server.aof_state == AOF_OFF) return C_OK; /* Open new AOF. */ //2:打开新的incr文件 if (server.aof_state == AOF_WAIT_REWRITE) { /* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */ new_aof_name = getTempIncrAofName(); } else { /* Dup a temp aof_manifest to modify. */ //复制aof_manifest对象 temp_am = aofManifestDup(server.aof_manifest); //创建新的incr对象,并添加到incr队列尾部,返回incr文件名 new_aof_name = sdsdup(getNewIncrAofName(temp_am)); } //3:构建incr 文件具体路径,并打开该文件 sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name); newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644); //及时释放incr文件名内存 sdsfree(new_aof_filepath); //打开文件出错 if (newfd == -1) { serverLog(LL_WARNING, "Can't open the append-only file %s: %s", new_aof_name, strerror(errno)); goto cleanup; } //4:清单数据刷入磁盘 if (temp_am) { /* Persist AOF Manifest. */ if (persistAofManifest(temp_am) == C_ERR) { goto cleanup; } } serverLog(LL_NOTICE, "Creating AOF incr file %s on background rewrite", new_aof_name); sdsfree(new_aof_name); /* If reaches here, we can safely modify the `server.aof_manifest` * and `server.aof_fd`. */ /* fsync and close old aof_fd if needed. In fsync everysec it's ok to delay * the fsync as long as we grantee it happens, and in fsync always the file * is already synced at this point so fsync doesn't matter. */ //5:异步关闭aof文件描述符,执行到这里上层方法已经强制执行了fsync,所以关闭没影响。 if (server.aof_fd != -1) { aof_background_fsync_and_close(server.aof_fd); server.aof_fsync_offset = server.aof_current_size; server.aof_last_fsync = server.unixtime; } //6:变更aof文件描述符为incr文件 server.aof_fd = newfd; /* Reset the aof_last_incr_size. */ //重置incr大小 server.aof_last_incr_size = 0; /* Update `server.aof_manifest`. */ //7:释放原来的aofManifest对象,更新为临时am对象 if (temp_am) aofManifestFreeAndUpdate(temp_am); return C_OK; cleanup: //释放新的aof名内存,关闭新的aof文件,释放临时清单对象 if (new_aof_name) sdsfree(new_aof_name); if (newfd != -1) close(newfd); if (temp_am) aofManifestFree(temp_am); return C_ERR; } 2)2):fork线程执行重写

就简单的新建临时文件,然后写入数据,并刷入磁盘。

//重写核心方法 int rewriteAppendOnlyFile(char *filename) { rio aof; FILE *fp = NULL; char tmpfile[256]; /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ //1:构建重写临时文件名,打开文件,并获写权限 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return C_ERR; } //2:初始化io对象 rioInitWithFile(&aof,fp); //3:若开启了增量同步 if (server.aof_rewrite_incremental_fsync) { //设置每次写入4M,执行一次sync。 rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); //每次sync后回收内存 rioSetReclaimCache(&aof,1); } startSaving(RDBFLAGS_AOF_PREAMBLE); //4:是否采用rdb重写aof if (server.aof_use_rdb_preamble) { int error; if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } } else { //aof重写 if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; } /* Make sure data will not remain on the OS's output buffers */ //5:刷出缓存到内核 if (fflush(fp)) goto werr; //内核数据刷到磁盘 if (fsync(fileno(fp))) goto werr; //通知回收页缓存 if (reclaimFilePageCache(fileno(fp), 0, 0) == -1) { /* A minor error. Just log to know what happens */ serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno)); } //关闭文件流 if (fclose(fp)) { fp = NULL; goto werr; } fp = NULL; /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ //6:用重写后的aof文件,覆盖临时文件 if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile);//失败则删除临时aof文件 stopSaving(0); return C_ERR; } stopSaving(1); return C_OK; werr: //对失败的处理 serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); //关闭临时文件 if (fp) fclose(fp); //删除临时文件 unlink(tmpfile); stopSaving(0); return C_ERR; } 2)3):aof重写 int rewriteAppendOnlyFileRio(rio *aof) { dictIterator *di = NULL; dictEntry *de; int j; long key_count = 0; long long updated_time = 0; /* Record timestamp at the beginning of rewriting AOF. */ //1:aof文件写入时间戳 if (server.aof_timestamp_enabled) { sds ts = genAofTimestampAnnotationIfNeeded(1); if (rioWrite(aof,ts,sdslen(ts)) == 0) { sdsfree(ts); goto werr; } sdsfree(ts); } if (rewriteFunctions(aof) == 0) goto werr; //2:遍历每个库,将元素,通过RPUSH,SADD等命令写入aof重写文件 for (j = 0; j sds keystr; robj key, *o; long long expiretime; size_t aof_bytes_before_key = aof->processed_bytes; //获取key和value keystr = dictGetKey(de); o = dictGetVal(de); initStaticStringObject(key,keystr); //获取超时时间 expiretime = getExpire(db,&key); /* Save the key and associated value */ //保存key-value if (o->type == OBJ_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ if (rioWriteBulkObject(aof,&key) == 0) goto werr; if (rioWriteBulkObject(aof,o) == 0) goto werr; } else if (o->type == OBJ_LIST) { if (rewriteListObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_SET) { if (rewriteSetObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_ZSET) { if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_HASH) { if (rewriteHashObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_STREAM) { if (rewriteStreamObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_MODULE) { if (rewriteModuleObject(aof,&key,o,j) == 0) goto werr; } else { serverPanic("Unknown object type"); } /* In fork child process, we can try to release memory back to the * OS and possibly avoid or decrease COW. We give the dismiss * mechanism a hint about an estimated size of the object we stored. */ size_t dump_size = aof->processed_bytes - aof_bytes_before_key; if (server.in_fork_child) dismissObject(o, dump_size); /* Save the expire time */ //保存超时时间 if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; } /* Update info every 1 second (approximately). * in order to avoid calling mstime() on each iteration, we will * check the diff every 1024 keys */ if ((key_count++ & 1023) == 0) { long long now = mstime(); if (now - updated_time >= 1000) { sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite"); updated_time = now; } } /* Delay before next key if required (for testing) */ if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay); } //释放迭代器 dictReleaseIterator(di); di = NULL; } return C_OK; werr: //迭代器有值,则释放迭代器 if (di) dictReleaseIterator(di); return C_ERR; } 3):执行命令时追加aof缓冲区逻辑

当用户发起写入命令时,会将命令写入到aof缓冲区。

aof.c#feedAppendOnlyFile

//将命令追加aof缓冲区中 void feedAppendOnlyFile(int dictid, robj **argv, int argc) { sds buf = sdsempty(); serverAssert(dictid == -1 || (dictid >= 0 && dictid buf = sdscatsds(buf, ts); sdsfree(ts); } } /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ //使用 SELECT 命令,显式设置数据库,确保之后的命令被设置到正确的数据库 if (dictid != -1 && dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; } /* All commands should be propagated the same way in AOF as in replication. * No need for AOF-specific translation. */ //根据传入的命令和命令参数,将它们还原成协议格式。 buf = catAppendOnlyGenericCommand(buf,argc,argv); /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ if (server.aof_state == AOF_ON || //开启了aof 或者 子线程在执行aof 则数据写入aof缓冲区 (server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF)) { //数据写入aof缓冲区 server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf)); } //释放缓冲区 sdsfree(buf); } 4):aof重写完成

父进程会将子进程返回结果,将临时缓冲区数据刷入新的aof文件

aof.c#backgroundRewriteDoneHandler

这里咱们只看正确返回的逻辑:

构建重写临时文件名,重写就是写到的这个临时文件复制一个临时aofManifest,这样出任何问题,可方便回滚。获取新的base名,并将以前的设置为history,文件名类似:appendonly.aof.1.base.rdb拼接aof base文件地址,并将重写临时文件重命名为base文件名将incr类型文件移动到history清单数据刷入磁盘更新manifest并删除旧manifest删除历史文件 //子线程完成aof重写后,父进程会调用这个函数 void backgroundRewriteDoneHandler(int exitcode, int bysignal) { //正确返回 if (!bysignal && exitcode == 0) { char tmpfile[256]; long long now = ustime(); sds new_base_filepath = NULL; sds new_incr_filepath = NULL; aofManifest *temp_am; mstime_t latency; serverLog(LL_NOTICE, "Background AOF rewrite terminated with success"); //1:构建重写临时文件名,重写就是写到的这个临时文件 snprintf(tmpfile, 256, "temp-rewriteaof-bg-%d.aof", (int)server.child_pid); serverAssert(server.aof_manifest != NULL); /* Dup a temporary aof_manifest for subsequent modifications. */ //2:复制一个临时aofManifest,这样出任何问题,可方便回滚。 temp_am = aofManifestDup(server.aof_manifest); /* Get a new BASE file name and mark the previous (if we have) * as the HISTORY type. */ //3:获取新的base名,并将以前的设置为history,文件名类似:appendonly.aof.1.base.rdb sds new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am); serverAssert(new_base_filename != NULL); //4:拼接aof base文件地址 new_base_filepath = makePath(server.aof_dirname, new_base_filename); /* Rename the temporary aof file to 'new_base_filename'. */ latencyStartMonitor(latency); //5:将aof重写临时文件重命名为 aof base文件名 if (rename(tmpfile, new_base_filepath) == -1) { serverLog(LL_WARNING, "Error trying to rename the temporary AOF base file %s into %s: %s", tmpfile, new_base_filepath, strerror(errno)); aofManifestFree(temp_am);//释放临时manifest对象 sdsfree(new_base_filepath);//释放aof base地址 server.aof_lastbgrewrite_status = C_ERR; //标记错误 server.stat_aofrw_consecutive_failures++; //累计失败次数 goto cleanup; // } latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-rename", latency); serverLog(LL_NOTICE, "Successfully renamed the temporary AOF base file %s into %s", tmpfile, new_base_filename); /* Rename the temporary incr aof file to 'new_incr_filename'. */ if (server.aof_state == AOF_WAIT_REWRITE) { /* Get temporary incr aof name. */ sds temp_incr_aof_name = getTempIncrAofName(); sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name); /* Get next new incr aof name. */ sds new_incr_filename = getNewIncrAofName(temp_am); new_incr_filepath = makePath(server.aof_dirname, new_incr_filename); latencyStartMonitor(latency); if (rename(temp_incr_filepath, new_incr_filepath) == -1) { serverLog(LL_WARNING, "Error trying to rename the temporary AOF incr file %s into %s: %s", temp_incr_filepath, new_incr_filepath, strerror(errno)); bg_unlink(new_base_filepath); sdsfree(new_base_filepath); aofManifestFree(temp_am); sdsfree(temp_incr_filepath); sdsfree(new_incr_filepath); sdsfree(temp_incr_aof_name); server.aof_lastbgrewrite_status = C_ERR; server.stat_aofrw_consecutive_failures++; goto cleanup; } latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-rename", latency); serverLog(LL_NOTICE, "Successfully renamed the temporary AOF incr file %s into %s", temp_incr_aof_name, new_incr_filename); sdsfree(temp_incr_filepath); sdsfree(temp_incr_aof_name); } /* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR * to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */ //6:将incr类型文件移动到history markRewrittenIncrAofAsHistory(temp_am); /* Persist our modifications. */ //7:清单数据刷入磁盘 if (persistAofManifest(temp_am) == C_ERR) { bg_unlink(new_base_filepath); aofManifestFree(temp_am); sdsfree(new_base_filepath); if (new_incr_filepath) { bg_unlink(new_incr_filepath); sdsfree(new_incr_filepath); } server.aof_lastbgrewrite_status = C_ERR; server.stat_aofrw_consecutive_failures++; goto cleanup; } //释放内存 sdsfree(new_base_filepath); if (new_incr_filepath) sdsfree(new_incr_filepath); /* We can safely let `server.aof_manifest` point to 'temp_am' and free the previous one. */ //8:更新manifest并删除旧manifest aofManifestFreeAndUpdate(temp_am); if (server.aof_fd != -1) { /* AOF enabled. */ server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ server.aof_current_size = getAppendOnlyFileSize(new_base_filename, NULL) + server.aof_last_incr_size; server.aof_rewrite_base_size = server.aof_current_size; server.aof_fsync_offset = server.aof_current_size; server.aof_last_fsync = server.unixtime; } /* We don't care about the return value of `aofDelHistoryFiles`, because the history * deletion failure will not cause any problems. */ //9:删除历史文件 aofDelHistoryFiles(); server.aof_lastbgrewrite_status = C_OK; server.stat_aofrw_consecutive_failures = 0; serverLog(LL_NOTICE, "Background AOF rewrite finished successfully"); /* Change state from WAIT_REWRITE to ON if needed */ if (server.aof_state == AOF_WAIT_REWRITE) server.aof_state = AOF_ON; serverLog(LL_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); } else if (!bysignal && exitcode != 0) { server.aof_lastbgrewrite_status = C_ERR; server.stat_aofrw_consecutive_failures++; serverLog(LL_WARNING, "Background AOF rewrite terminated with error"); } else { /* SIGUSR1 is whitelisted, so we have a way to kill a child without * triggering an error condition. */ if (bysignal != SIGUSR1) { server.aof_lastbgrewrite_status = C_ERR; server.stat_aofrw_consecutive_failures++; } serverLog(LL_WARNING, "Background AOF rewrite terminated by signal %d", bysignal); } cleanup: //删除重写临时文件 aofRemoveTempFile(server.child_pid); /* Clear AOF buffer and delete temp incr aof for next rewrite. */ if (server.aof_state == AOF_WAIT_REWRITE) { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); aofDelTempIncrAofFile(); }//计算重写总耗时 server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start; server.aof_rewrite_time_start = -1; /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */ if (server.aof_state == AOF_WAIT_REWRITE) server.aof_rewrite_scheduled = 1; } 4)1):获取新的base名,并将以前的设置为history //标记old base为history,并新建base sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am) { serverAssert(am != NULL); if (am->base_aof_info) { serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE); //将文件类型修改为history am->base_aof_info->file_type = AOF_FILE_TYPE_HIST; //添加到history节点 listAddNodeHead(am->history_aof_list, am->base_aof_info); } //通过指定的aof类型是通过rdb还是aof来指定文件后缀 char *format_suffix = server.aof_use_rdb_preamble ? RDB_FORMAT_SUFFIX:AOF_FORMAT_SUFFIX; //新建一个aofInfo,类型为base aofInfo *ai = aofInfoCreate(); ai->file_name = sdscatprintf(sdsempty(), "%s.%lld%s%s", server.aof_filename, ++am->curr_base_file_seq, BASE_FILE_SUFFIX, format_suffix); ai->file_seq = am->curr_base_file_seq; ai->file_type = AOF_FILE_TYPE_BASE; am->base_aof_info = ai; am->dirty = 1; return am->base_aof_info->file_name; } 4)2):将incr类型文件移动到history void markRewrittenIncrAofAsHistory(aofManifest *am) { serverAssert(am != NULL); //1:incr链表为空直接返回 if (!listLength(am->incr_aof_list)) { return; } listNode *ln; listIter li; listRewindTail(am->incr_aof_list, &li); /* "server.aof_fd != -1" means AOF enabled, then we must skip the * last AOF, because this file is our currently writing. */ //2:不为-1说明当前正在执行aof操作,则跳过最后一个节点的数据。最后一个incr节点目前正有数据刷入。 if (server.aof_fd != -1) { ln = listNext(&li); serverAssert(ln != NULL); } //3:遍历每个节点,修改类型为history,并从incr链表删除该节点。 /* Move aofInfo from 'incr_aof_list' to 'history_aof_list'. */ while ((ln = listNext(&li)) != NULL) { aofInfo *ai = (aofInfo*)ln->value; serverAssert(ai->file_type == AOF_FILE_TYPE_INCR); aofInfo *hai = aofInfoDup(ai); hai->file_type = AOF_FILE_TYPE_HIST; listAddNodeHead(am->history_aof_list, hai); listDelNode(am->incr_aof_list, ln); } am->dirty = 1; } 4)3):删除历史文件

这里遍历history链表里面的每个元素将其删除,并将其每个元素对应的文件异步删除。通过异步删除方式减少阻塞时间。

int aofDelHistoryFiles(void) { //1:清单对象为空,或者history链表为空直接返回 if (server.aof_manifest == NULL || server.aof_disable_auto_gc == 1 || !listLength(server.aof_manifest->history_aof_list)) { return C_OK; } listNode *ln; listIter li; //2:创建清单迭代器对象 listRewind(server.aof_manifest->history_aof_list, &li); //3:遍历链表 while ((ln = listNext(&li)) != NULL) { aofInfo *ai = (aofInfo*)ln->value; serverAssert(ai->file_type == AOF_FILE_TYPE_HIST); serverLog(LL_NOTICE, "Removing the history file %s in the background", ai->file_name); sds aof_filepath = makePath(server.aof_dirname, ai->file_name); //3.1:异步删除文件,通过异步删除减少同步阻塞时间。 bg_unlink(aof_filepath); //释放内存 sdsfree(aof_filepath); //3.2:删除该history节点 listDelNode(server.aof_manifest->history_aof_list, ln); } server.aof_manifest->dirty = 1; //4:清单数据刷入磁盘 return persistAofManifest(server.aof_manifest); } 5):初次启动redis加载aof文件逻辑 todo

server.c#loadDataFromDisk

兼容老版本升级获取base和incr文件总数和字节数设置服务器状态为正在载入加载base aof文件遍历加载incr aof文件设置服务器状态为载入结束 int loadAppendOnlyFiles(aofManifest *am) { serverAssert(am != NULL); int status, ret = AOF_OK; long long start; off_t total_size = 0, base_size = 0; sds aof_name; int total_num, aof_num = 0, last_file; /* If the 'server.aof_filename' file exists in dir, we may be starting * from an old redis version. We will use enter upgrade mode in three situations. * * 1. If the 'server.aof_dirname' directory not exist * 2. If the 'server.aof_dirname' directory exists but the manifest file is missing * 3. If the 'server.aof_dirname' directory exists and the manifest file it contains * has only one base AOF record, and the file name of this base AOF is 'server.aof_filename', * and the 'server.aof_filename' file not exist in 'server.aof_dirname' directory * */ //1:如果存在aof_filename文件,则说明是从旧的redis版本升级启动了。会在三种情况下进入升级模式。 if (fileExist(server.aof_filename)) { if (!dirExists(server.aof_dirname) || //1:如果aof_dirname目录不存在 (am->base_aof_info == NULL && listLength(am->incr_aof_list) == 0) || //2:aof_dirname目录存在,但缺少清单文件。 (am->base_aof_info != NULL && listLength(am->incr_aof_list) == 0 && //3:如果appenddirname目录存在且目录中存在manifest清单文件,且清单文件中只有BASE AOF相关信息, !strcmp(am->base_aof_info->file_name, server.aof_filename) && !aofFileExist(server.aof_filename)))//且这个BASE AOF的名字和server.aof_filename相同,且appenddirname目录中不存在名为server.aof_filename的文件 { //兼容老版本升级 aofUpgradePrepare(am); } } //清单为空,直接返回 if (am->base_aof_info == NULL && listLength(am->incr_aof_list) == 0) { return AOF_NOT_EXIST; } //2:获取base和incr文件总数 total_num = getBaseAndIncrAppendOnlyFilesNum(am); serverAssert(total_num > 0); /* Here we calculate the total size of all BASE and INCR files in * advance, it will be set to `server.loading_total_bytes`. */ //获取base和incr所有文件的字节数 total_size = getBaseAndIncrAppendOnlyFilesSize(am, &status); if (status != AOF_OK) { /* If an AOF exists in the manifest but not on the disk, we consider this to be a fatal error. */ if (status == AOF_NOT_EXIST) status = AOF_FAILED; return status; } else if (total_size == 0) { return AOF_EMPTY; } //3:设置服务器的状态为:正在载入 startLoading(total_size, RDBFLAGS_AOF_PREAMBLE, 0); /* Load BASE AOF if needed. */ //4:加载base aof if (am->base_aof_info) { serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE); aof_name = (char*)am->base_aof_info->file_name; updateLoadingFileName(aof_name); //获取base文件字节数 base_size = getAppendOnlyFileSize(aof_name, NULL); last_file = ++aof_num == total_num; start = ustime();//记录开始时间 ret = loadSingleAppendOnlyFile(aof_name);//加载base文件 if (ret == AOF_OK || (ret == AOF_TRUNCATED && last_file)) { serverLog(LL_NOTICE, "DB loaded from base file %s: %.3f seconds", aof_name, (float)(ustime()-start)/1000000); } /* If the truncated file is not the last file, we consider this to be a fatal error. */ if (ret == AOF_TRUNCATED && !last_file) { ret = AOF_FAILED; serverLog(LL_WARNING, "Fatal error: the truncated file is not the last file"); } if (ret == AOF_OPEN_ERR || ret == AOF_FAILED) { goto cleanup; } } /* Load INCR AOFs if needed. */ //5:遍历加载incr文件 if (listLength(am->incr_aof_list)) { listNode *ln; listIter li; listRewind(am->incr_aof_list, &li); while ((ln = listNext(&li)) != NULL) { aofInfo *ai = (aofInfo*)ln->value; serverAssert(ai->file_type == AOF_FILE_TYPE_INCR); aof_name = (char*)ai->file_name; updateLoadingFileName(aof_name); last_file = ++aof_num == total_num; start = ustime(); ret = loadSingleAppendOnlyFile(aof_name); if (ret == AOF_OK || (ret == AOF_TRUNCATED && last_file)) { serverLog(LL_NOTICE, "DB loaded from incr file %s: %.3f seconds", aof_name, (float)(ustime()-start)/1000000); } /* We know that (at least) one of the AOF files has data (total_size > 0), * so empty incr AOF file doesn't count as a AOF_EMPTY result */ if (ret == AOF_EMPTY) ret = AOF_OK; /* If the truncated file is not the last file, we consider this to be a fatal error. */ if (ret == AOF_TRUNCATED && !last_file) { ret = AOF_FAILED; serverLog(LL_WARNING, "Fatal error: the truncated file is not the last file"); } if (ret == AOF_OPEN_ERR || ret == AOF_FAILED) { goto cleanup; } } } server.aof_current_size = total_size; /* Ideally, the aof_rewrite_base_size variable should hold the size of the * AOF when the last rewrite ended, this should include the size of the * incremental file that was created during the rewrite since otherwise we * risk the next automatic rewrite to happen too soon (or immediately if * auto-aof-rewrite-percentage is low). However, since we do not persist * aof_rewrite_base_size information anywhere, we initialize it on restart * to the size of BASE AOF file. This might cause the first AOFRW to be * executed early, but that shouldn't be a problem since everything will be * fine after the first AOFRW. */ server.aof_rewrite_base_size = base_size; server.aof_fsync_offset = server.aof_current_size; cleanup: //6:标记加载结束 stopLoading(ret == AOF_OK || ret == AOF_TRUNCATED); return ret; } 加载base aof 总结

因为重写期间就替换了fd为incr文件,所以后续触发的写入和同步操作都是更新到incr文件中,这样不会丢失任何重写期间的命令。

然后重写base文件是在替换fd之后执行的,数据会是最新的。这样就造成incr文件记录0~N秒之后的数据,base记录0秒之后的数据。会有一定延迟,但是这个不影响。加载文件时会先加载base,后续用incr去覆盖,这样保证了数据一致性。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3