神刀安全网

Redis主从复制

搭建环境

为了测试Redis主从复制功能,需要在本地启动master和slave两个Redis实例。这里使用docker创建了两个容器:

CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                      NAMES 372cbdc31eb2        redis               "docker-entrypoint.sh"   4 days ago          Up 5 hours          127.0.0.1:7001->6379/tcp   slave ef5b6b9dce8a        redis               "docker-entrypoint.sh"   4 days ago          Up 5 hours          127.0.0.1:7000->6379/tcp   master

使用redis-cli连接上slave redis server,发送slaveof命令:

127.0.0.1:7001> slaveof 172.17.0.2 6379 => OK

在master上可以看到日志:

1:M 10 Sep 07:38:43.652 * Slave 172.17.0.3:6379 asks for synchronization 1:M 10 Sep 07:38:43.652 * Full resync requested by slave 172.17.0.3:6379 1:M 10 Sep 07:38:43.652 * Starting BGSAVE for SYNC with target: disk 1:M 10 Sep 07:38:43.655 * Background saving started by pid 21 21:C 10 Sep 07:38:43.661 * DB saved on disk 21:C 10 Sep 07:38:43.661 * RDB: 6 MB of memory used by copy-on-write 1:M 10 Sep 07:38:43.750 * Background saving terminated with success 1:M 10 Sep 07:38:43.753 * Synchronization with slave 172.17.0.3:6379 succeeded

在master上执行命令:

set hello world => OK

在slave上执行命令:

get hello => "world"

Redis主从复制配置完成。

另外一种方式是启动Redis时制定配置文件,在配置中修改:
slaveof <masterip> <masterport>

分析主从复制过程

主从复制可以看作是一次数据迁移,涉及存量数据同步和增量数据同步两步。

master日志

从master的日志中可以了解主从复制的大致过程。

1:M 10 Sep 07:38:43.652 * Slave 172.17.0.3:6379 asks for synchronization

slave 172.17.0.3:6379向master请求数据同步。

1:M 10 Sep 07:38:43.652 * Full resync requested by slave 172.17.0.3:6379

master判断需要进行一次full sync。

1:M 10 Sep 07:38:43.652 * Starting BGSAVE for SYNC with target: disk 1:M 10 Sep 07:38:43.655 * Background saving started by pid 21 21:C 10 Sep 07:38:43.661 * DB saved on disk 21:C 10 Sep 07:38:43.661 * RDB: 6 MB of memory used by copy-on-write 1:M 10 Sep 07:38:43.750 * Background saving terminated with success

master开始执行BGSAVE,为存量数据生成一份rdb文件。

1:M 10 Sep 07:38:43.753 * Synchronization with slave 172.17.0.3:6379 succeeded

master将rdb文件发送给slave,并建立增量同步。

slave端

主从复制的入口是salve服务器向master发送salveof命令。server.c文件中的redisCommandTable中可以找到slaveof命令对应的处理函数是slaveofCommand。在函数中首先会判断命令是否是slaveof no one,如果不是进入else逻辑,解析slaveof命令中的ip地址和端口号:

void slaveofCommand(client *c) {     //判断命令是否是slaveof no one     if (!strcasecmp(c->argv[1]->ptr,"no") &&         !strcasecmp(c->argv[2]->ptr,"one")) {         ...     } else {         long port;          if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))             return;          //判断是否已经存在master,并且与当前命令指向的master一致         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)             && server.masterport == port) {             serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");             addReplySds(c,sdsnew("+OK Already connected to specified master/r/n"));             return;         }         //设置master属性         replicationSetMaster(c->argv[1]->ptr, port);         ...     }     //向客户端返回ok     addReply(c,shared.ok); }

replicationSetMaster函数中会将master的ip与port保存到server的masterhost和masterport属性中,并初始化复制状态repl_state为REPL_STATE_CONNECT,偏移量信息master_repl_offset(从master复制的字节数)和repl_down_since(时间戳)。设置完成后向客户端返回OK。

在下一次事件循环中processTimeEvents会检查时间事件链表。在服务器启动时注册过时间事件serverCrond函数:

aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);

serverCron函数中,会调用复制相关的函数replicationCron

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {     ...     run_with_period(1000) replicationCron();     ... }

replicationCron函数中,会判断复制状态,如果是REPL_STATE_CONNECT那么开始创建与master之间的连接:

void replicationCron(void) {     ...     if (server.repl_state == REPL_STATE_CONNECT) {         serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",             server.masterhost, server.masterport);         if (connectWithMaster() == C_OK) {             serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");         }     }     ... }

整个主从复制的准备过程称为replication handshake,在过程中server.repl_state会发生一系列状态变化,每个状态会执行不同操作以达到下一个状态,具体状态变化如下(其中虚线代表状态变化不是在一个事件循环中):

Redis主从复制
状态变化.png

其中最关键的操作是发送PSYNC命令:

int slaveTryPartialResynchronization(int fd, int read_reply) {     ...     if (!read_reply) {         server.repl_master_initial_offset = -1;         //如果之前有连接过master,取出cache中的runid和offset用于判断是否能部分同步         if (server.cached_master) {             psync_runid = server.cached_master->replrunid;             snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);         } else {             //第一次连接master必然需要全量同步             psync_runid = "?";             memcpy(psync_offset,"-1",3);         }          //发送psync命令         reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);         if (reply != NULL) {             serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);             sdsfree(reply);             aeDeleteFileEvent(server.el,fd,AE_READABLE);             return PSYNC_WRITE_ERROR;         }         return PSYNC_WAIT_REPLY;     } }

slaveTryPartialResynchronization函数首先会判断是否存在cached_master,如果存在会在发送psync命令时带上runid和offset,让master判断是进行全量同步还是增量同步。

在下一次事件循环中slaveTryPartialResynchronization函数会读取master的响应,master的响应会有几种:PSYNC_WAIT_REPLY, PSYNC_CONTINUE, PSYNC_NOT_SUPPORTED, PSYNC_FULLRESYNC, PSYNC_WRITE_ERROR

PSYNC_WAIT_REPLY:本次未读到内容,下次事件继续。
PSYNC_CONTINUE:增量同步。
PSYNC_NOT_SUPPORTED:不支持PSYNC命令,重新发送SYNC命令。
PSYNC_FULLRESYNC:全量同步。
PSYNC_WRITE_ERROR:错误。

当master支持psync命令,且slave是第一次与master建立主从同步关系时,slaveTryPartialResynchronization函数会创建tmpfile,用于接收master发来的rdb文件。同时注册可读事件readSyncBulkPayload函数,并将server.repl_state更新为REPL_STATE_TRANSFER

slaveTryPartialResynchronization函数会在每次文件事件触发时,读取master发送过来的rdb文件。接收完成后会清空db,使用master发送来的rdb文件初始化数据库,将repl_state改为REPL_STATE_CONNECTED。至此全量数据同步完成,进入增量数据同步。

master端

在slave的connectWithMaster函数中,会创建与master的tcp连接。master会为slave创建一个client保存到客户端列表中,过程参考Redis命令处理流程分析

Redis的复制可以是master->slave->slave这种模式,下面的代码过滤了这种模式的分支处理代码,只保留master->slave模式分支代码。

在master端,slave的client也有一套状态变化:

Redis主从复制
状态变化.png

当slave向master发送psync或者sync命令时会调用syncCommand函数:

void syncCommand(client *c) {     ...     //如果当前client中还有未发送的内容,不能进行sync操作,否则会导致增量数据不一致     if (clientHasPendingReplies(c)) {         addReplyError(c,"SYNC and PSYNC are invalid with pending output");         return;     }     //master日志中的第一条     serverLog(LL_NOTICE,"Slave %s asks for synchronization",         replicationGetSlaveName(c));      //如果是psync,判断是否能进行增量同步     if (!strcasecmp(c->argv[0]->ptr,"psync")) {         if (masterTryPartialResynchronization(c) == C_OK) {             server.stat_sync_partial_ok++;             return;         } else {             char *master_runid = c->argv[1]->ptr;             if (master_runid[0] != '?') server.stat_sync_partial_err++;         }     } else {         //不是psync,设置slave的客户端flag         c->flags |= CLIENT_PRE_PSYNC;     }      //全量同步次数+1     server.stat_sync_full++;      //更新slave的client的状态:等待bgsave开始     c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;     ...     c->flags |= CLIENT_SLAVE;     //添加到slaves链表中     listAddNodeTail(server.slaves,c);      //情况1:bgsave正在执行,且是落磁盘的     if (server.rdb_child_pid != -1 &&         server.rdb_child_type == RDB_CHILD_TYPE_DISK)     {         //循环slaves链表         listRewind(server.slaves,&li);         while((ln = listNext(&li))) {             slave = ln->value;             //如果存在状态是等待bgsave完成的slave,那么可以复用这次bgsave产生的rdb文件             //增量数据通过拷贝该client的out put buf实现             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;         }         //如果存在         if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {             copyClientOutputBuffer(c,slave);             //函数内会修改客户端状态到:SLAVE_STATE_WAIT_BGSAVE_END             replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);         } else {             ...         }      //情况2:正在执行bgsave,但是文件是直接输出到某个socket,需要等待下一轮bgsave     } else if (server.rdb_child_pid != -1 &&                server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)     {         ...      //情况3:没有正在执行的bgsave     } else {         ...         if (server.aof_child_pid == -1) {             //开始执行bgsave,修改状态为:SLAVE_STATE_WAIT_BGSAVE_END             startBgsaveForReplication(c->slave_capa);         } else {             ...         }         ...     }      if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)         //创建backlog,用于增量同步         createReplicationBacklog();     return; }

当rdb文件生成完毕时,会调用updateSlavesWaitingBgsave函数,函数中会遍历server.slaves链表,找出状态是SLAVE_STATE_WAIT_BGSAVE_END的客户端,更改状态为SLAVE_STATE_SEND_BULK,并注册写事件处理器sendBulkToSlave,向salve发送rdb文件。

void updateSlavesWaitingBgsave(int bgsaveerr, int type) {     ...     listRewind(server.slaves,&li);     while((ln = listNext(&li))) {         client *slave = ln->value;         //判断状态         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {             ...         } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {             struct redis_stat buf;             //打开rdb文件             if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||                 ...                 continue;             }             aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);             //注册写事件处理器,发送rdb文件             if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {                 freeClient(slave);                 continue;             }         }     }     ... }

sendBulkToSlave函数中,当rdb文件发送完成,调用putSlaveOnline函数更新client状态为SLAVE_STATE_ONLINE,删除sendBulkToSlave处理器,安装新的sendReplyToClient处理器用于发送缓冲中的增量数据。

void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {     ...     //文件发送完成     if (slave->repldboff == slave->repldbsize) {         close(slave->repldbfd);         slave->repldbfd = -1;         //删除事件处理器sendBulkToSlave         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);         //更新状态,安装新写事件处理器sendReplyToClient         putSlaveOnline(slave);     } }

在bgsave执行和传输rdb文件期间,master还是会继续处理写入请求,在server.c中会调用replicationFeedSlaves函数向salve的缓冲中写入增量数据:

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {     ...     //遍历所有的slave     listRewind(server.slaves,&li);     while((ln = listNext(&li))) {         client *slave = ln->value;          //判断状态,因为rdb文件需要跟增量数据配对数据才正确,所以SLAVE_STATE_WAIT_BGSAVE_START状态的客户端不写入         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;         //写入缓冲         addReplyMultiBulkLen(slave,argc);         for (j = 0; j < argc; j++)             addReplyBulk(slave,argv[j]);     } }

主从同步的配置

Redis提供了两项配置,通过修改配置可以在可用性和一致性之间做调节。

# min-slaves-to-write 3  最少有几个slave处于online状态 # min-slaves-max-lag 10  主从之间的延迟需要小于多少(seconds)

如果配置了这两个选项,在refreshGoodSlavesCount函数中会统计good的slave数量:

void refreshGoodSlavesCount(void) {     listIter li;     listNode *ln;     int good = 0;     //如果没有配置,直接返回     if (!server.repl_min_slaves_to_write ||         !server.repl_min_slaves_max_lag) return;      listRewind(server.slaves,&li);     while((ln = listNext(&li))) {         client *slave = ln->value;         time_t lag = server.unixtime - slave->repl_ack_time;          if (slave->replstate == SLAVE_STATE_ONLINE &&             lag <= server.repl_min_slaves_max_lag) good++;     }     //统计结果     server.repl_good_slaves_count = good; }

在master执行命令时,会判断server.repl_good_slaves_count值,如果小于配置会停止写命令执行:

    if (server.masterhost == NULL &&         server.repl_min_slaves_to_write &&         server.repl_min_slaves_max_lag &&         c->cmd->flags & CMD_WRITE &&         server.repl_good_slaves_count < server.repl_min_slaves_to_write)     {         flagTransaction(c);         addReply(c, shared.noreplicaserr);         return C_OK;     }

调高配置,提高数据一致性,降低可用性。相反提高可用性,降低数据一致性。

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Redis主从复制

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址