2026/1/3 3:17:07
网站建设
项目流程
现在最流行的网站开发工具,搜索引擎有哪些类型,网站 设计公司 温州,网站建设费 账务处理一、概述PikiwiDB(pika)3.5.X版本发布了分布式集群方案#xff0c;基于codisPikiwiDB(pika)-server实现#xff0c;已经在360内部搜索团队线上使用#xff0c;稳定性和性能都非常优秀。本文主要介绍分布式集群的架构和部署方案。二、分布式架构解析pika分布式集群基于codis架…一、概述PikiwiDB(pika)3.5.X版本发布了分布式集群方案基于codisPikiwiDB(pika)-server实现已经在360内部搜索团队线上使用稳定性和性能都非常优秀。本文主要介绍分布式集群的架构和部署方案。二、分布式架构解析pika分布式集群基于codis架构进行改造设计架构图如下所示pika分布式集群主要 由以下这些组件组成Pika ServerPikiwiDB(pika)3.5.X, PikiwiDB(pika)4.0.x版本与单机版模式及架构保持一致。Codis Proxy客户端直接连接 codis-proxycodis-proxy 手动用户请求后会通过计算 hash 值将请求转发到指定的 Pika Server 去执行。对于同一个业务集群而言可以同时部署多个 codis-proxy 实例不同 codis-proxy 之间由 codis-dashboard 保证状态同步。Codis Dashboard集群管理工具支持 codis-proxy、pika-server 的添加、删除以及据迁移等操作。在集群状态发生改变时codis-dashboard 维护集群下所有 codis-proxy 的状态的一致性。对于同一个业务集群而言同一个时刻 codis-dashboard 只能有 0个或者1个所有对集群的修改都必须通过 codis-dashboard 完成。Codis FE集群管理界面。多个集群实例共享可以共享同一个前端展示页面通过配置文件管理后端 codis-dashboard 列表配置文件可自动更新。Codis Etcd:codis-etcd主要用于记录元数据信息为保证高可用建议etcd部署为3节点。三、Sentinel主从切换为了方便运维管理本次版本支持sentinel自动主从切换当集群主挂的时候会备升主提供了主节点故障自愈的能力。四、部署方式机器配置可以根据自身情况选择搜索部门节点分配如下组件节点个数(可以根据需求调整)实例规格(可以根据需求调整)pika server12主12从每个实例20核32G内存200G磁盘Codis FE1个节点1个节点 2核4GCodis Dashboard1个节点1个节点 2核4GCodis Etcd3个节点3个节点 2核4GCodis Proxy4个节点4个节点 2核4G集群创建部署顺序启动 PikiwiDB(pika)建立 PikiwiDB(pika) 主从关系启动 codis etcd启动 codis dashboard启动 codis proxy启动 codis fe绑定 PikiwiDB(pika)codis绑定 codisPikiwiDB(pika) 需要在 dashboard 中进行操作操作顺序如下1.添加 group (注意PikiwiDB(pika) 1主一从为一个 group )2.添加 PikiwiDB(pika)server3.分配 slots :至此PikiwiDB(pika) 和 codis 已经绑定完毕我们可以用 proxy 的 vip vport 进行访问。五、快速启动脚本PikiwiDB(pika)-codis 源码(路径:) 中 admin 文件夹提供了一系列脚本以便快速启动、停止各个组件提高运维效率。启动codis-dashboard使用 codis-dashboard-admin.sh 脚本启动 dashboard并查看 dashboard 日志确认启动是否有异常。./admin/codis-dashboard-admin.sh start tail -100 ./log/codis-dashboard.log.2017-04-082017/04/08 15:16:57 fsclient.go:197: [INFO] fsclient - create /codis3/codis-demo/topom OK 2017/04/08 15:16:57 main.go:140: [WARN] [0xc42025f7a0] dashboard is working ... 2017/04/08 15:16:57 topom.go:424: [WARN] admin start service on [::]:18080快速启动集群元数据存储使用 filesystem默认数据路径保存在 /tmp/codis若启动失败请检查当前用户是否对该路径拥有读写权限。启动codis-proxy使用 codis-proxy-admin.sh 脚本启动 codis-proxy并查看 proxy 日志确认启动是否有异常。./admin/codis-proxy-admin.sh start tail -100 ./log/codis-proxy.log.2017-04-082017/04/08 15:39:37 proxy.go:293: [WARN] [0xc4200df760] set sentinels [] 2017/04/08 15:39:37 main.go:320: [WARN] rpc online proxy seems OK 2017/04/08 15:39:38 main.go:210: [WARN] [0xc4200df760] proxy is working ...启动codis-server使用 codis-server-admin.sh 脚本启动 codis-server并查看 redis 日志确认启动是否有异常。./admin/codis-server-admin.sh start tail -100 /tmp/redis_6379.log5706:M 08 Apr 16:04:11.748 * DB loaded from disk: 0.000 seconds 5706:M 08 Apr 16:04:11.748 * The server is now ready to accept connections on port 6379redis.conf 配置中 pidfile、logfile 默认保存在 /tmp 目录若启动失败请检查当前用户是否有该目录的读写权限。启动codis-fe使用 codis-fe-admin.sh 脚本启动 codis-fe并查看 fe 日志确认启动是否有异常。六、Codis部分代码详细解析1. Pika Servercodis架构中pika server作为数据节点存储数据处理codis proxy的读写请求并根据slot迁移的命令进行数据迁移。1.1 数据存储在codis架构中所有的数据按照分片进行存储。因此每个pika server只持有部分分片的数据。当前的pika实现中相同类型的所有数据是写在同一个RocksDB实例中引擎中数据本身并不携带slot信息。codis在进行数据迁移时支持key粒度的迁移和slot粒度的迁移。key粒度的迁移比较好理解slot粒度的迁移就需要在存储引擎中找到对应slot的存量数据。pika是通过为每个slot创建一个set类型的key以此来记录每个slot中存储的key来实现的。由于该操作会引入额外的更新操作对性能会有影响。因此pika中设置了slotmigrate_参数来表示是否要支持slot粒度迁移。如果不支持就不需要更新slot set。pika节点在收到proxy节点发来的请求之后如果开启了slot_migrate除了将用户数据写入RocksDB以外还需要计算得出数据的slotID将key和type追加到以对应slotID为key的set集合中。该步骤在pika_command层进行处理。以mset为例写db成功之后会调用AddSlotKey将key记录到对应set中。void MsetCmd::Do(std::shared_ptrSlot slot) { storage::Status s slot-db()-MSet(kvs_); if (s.ok()) { res_.SetRes(CmdRes::kOk); std::vectorstorage::KeyValue::const_iterator it; for (it kvs_.begin(); it ! kvs_.end(); it) { AddSlotKey(k, it-key, slot); } } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } }void AddSlotKey(const std::string type, const std::string key, const std::shared_ptrSlot slot) { if (g_pika_conf-slotmigrate() ! true) { return; } int slotID GetSlotsID(key, crc, hastag); std::string slot_key GetSlotKey(slotID); std::vectorstd::string members; members.emplace_back(type key); s slot-db()-SAdd(slot_key, members, res); if (!s.ok()) { LOG(ERROR) sadd key[ key ] to slotKey[ slot_key ] failed, error: s.ToString(); return; } // codis hash tag模式 if (hastag) { std::string tag_key GetSlotsTagKey(crc); s slot-db()-SAdd(tag_key, members, res); if (!s.ok()) { LOG(ERROR) sadd key[ key ] to tagKey[ tag_key ] failed, error: s.ToString(); return; } } }1.2 Slot迁移codis的架构中slot迁移包括key粒度的迁移通过命令slotsmgrtone, slotsmgrttagone每次将一个key迁移到目的端。还有一种是分片粒度迁移。主要涉及的类包括PikaMigrate, PikaMigrateThread和PikaParseSendThread。首先介绍PikaMigrateThread类。class PikaMigrateThread : public net::Thread { public: bool ReqMigrateBatch(const std::string ip, int64_t port, int64_t time_out, int64_t slot_num, int64_t keys_num, const std::shared_ptrSlot slot); int ReqMigrateOne(const std::string key, const std::shared_ptrSlot slot); private: void NotifyRequestMigrate(void); void ReadSlotKeys(const std::string slotKey, int64_t need_read_num, int64_t real_read_num, int32_t *finish); bool CreateParseSendThreads(int32_t dispatch_num); void DestroyParseSendThreads(void); void *ThreadMain() override; int32_t workers_num_ 0; std::vectorPikaParseSendThread * workers_; std::dequestd::pairconst char, std::string mgrtone_queue_; std::dequestd::pairconst char, std::string mgrtkeys_queue_; std::mapstd::pairconst char, std::string, std::string mgrtkeys_map_;PikaMigrateThread是Pika的后台迁移线程负责收集待发送的key创建迁移线程管理迁移流程。PikaMigrateThread创建了若干个实际的迁移线程即workers_负责在后台进行数据迁移。待迁移的数据有两个来源一个接收到是codis发来的异步迁移单个key的命令将具体的key追加到mgrtone_queue_队列中再由PikaMigrateThread线程消费mgrtone_queue_将数据追加到mgrtkeys_queue_中。另一个是遍历slotkey对应的set将遍历到的key追加到mgrtkeys_queue_中。wokers_线程从mgrtkeys_queue_中消费出待迁移的key并进行发送。PikaParseSendThread是迁移的worker线程在一个while循环中消费mgrtkeys_queue_将key发送到对应的接收端。MigrateOneKey会根据key类型的不同调用不同的处理函数构造网络请求包比如string类型直接从引擎中读取value并发送hash类型则需要将整个pkey下的所有field遍历出来并发送。当前的migrateOneKey的实现中查询两次RocksDB并发送两次请求包一次是key-value本身第二次是ttl。void *PikaParseSendThread::ThreadMain() { while (!should_exit_) { //消费mgrtkeys_queue_ std::dequestd::pairconst char, std::string send_keys; { migrate_thread_-IncWorkingThreadNum(); for (int32_t i 0; i mgrtkeys_num_; i) { if (migrate_thread_-mgrtkeys_queue_.empty()) { break; } send_keys.emplace_back(migrate_thread_-mgrtkeys_queue_.front()); migrate_thread_-mgrtkeys_queue_.pop_front(); } } int64_t send_num 0; int64_t need_receive_num 0; int32_t migrate_keys_num 0; for (const auto send_key : send_keys) { // 发送单个key if (0 (send_num MigrateOneKey(cli_, send_key.second, send_key.first, false))) { LOG(WARNING) PikaParseSendThread::ThreadMain MigrateOneKey: send_key.second failed !!!; migrate_thread_-OnTaskFailed(); migrate_thread_-DecWorkingThreadNum(); return nullptr; } else { need_receive_num send_num; migrate_keys_num; } } // 阻塞等待收到need_receive_num个数据包 if (!CheckMigrateRecv(need_receive_num)) { LOG(INFO) PikaMigrateThread::ThreadMain CheckMigrateRecv failed !!!; migrate_thread_-OnTaskFailed(); migrate_thread_-DecWorkingThreadNum(); return nullptr; } else { DelKeysAndWriteBinlog(send_keys, slot_); } migrate_thread_-AddResponseNum(migrate_keys_num); migrate_thread_-DecWorkingThreadNum(); } return nullptr; } int PikaParseSendThread::MigrateOneKey(net::NetCli *cli, const std::string key, const char key_type, bool async) { int send_num; switch (key_type) { case k: if (0 (send_num MigrateKv(cli_, key, slot_))) { return -1; } break; ...... default: return -1; break; } return send_num; }相对于PikaParseSendThreadPikaMigrateThread并不执行实际的数据迁移任务而是用来进行迁移任务的管理。pika后台线程线程的执行是分批次进行每一批次执行完成之后会挂起需要被再次唤醒。PikaMigrateThread后台线程的主要代码执行流程如下LOG(INFO) PikaMigrateThread::ThreadMain Start; // Create parse_send_threads auto dispatch_num static_castint32_t(g_pika_conf-thread_migrate_keys_num()); if (!CreateParseSendThreads(dispatch_num)) { LOG(INFO) PikaMigrateThread::ThreadMain CreateParseSendThreads failed !!!; DestroyThread(true); return nullptr; } std::string slotKey GetSlotKey(static_castint32_t(slot_id_)); int32_t slot_size 0; slot_-db()-SCard(slotKey, slot_size); while (!should_exit_) { // Waiting migrate task { std::unique_lockstd::mutex lm(request_migrate_mutex_); while (!request_migrate_) { request_migrate_cond_.wait(lm); } // 每轮迁移对应一个task执行完成之后需要被再次唤醒 request_migrate_ false; } // read keys form slot and push to mgrtkeys_queue_ int64_t round_remained_keys keys_num_; int64_t real_read_num 0; int32_t is_finish 0; send_num_ 0; response_num_ 0; do { std::unique_lock lq(mgrtkeys_queue_mutex_); std::unique_lock lo(mgrtone_queue_mutex_); std::unique_lock lm(mgrtkeys_map_mutex_); // 查找待迁移的key包括单key的迁移和slot迁移 if (!mgrtone_queue_.empty()) { while (!mgrtone_queue_.empty()) { mgrtkeys_queue_.push_front(mgrtone_queue_.front()); mgrtkeys_map_[mgrtone_queue_.front()] INVALID_STR; mgrtone_queue_.pop_front(); send_num_; } } else { int64_t need_read_num (0 round_remained_keys - dispatch_num) ? dispatch_num : round_remained_keys; ReadSlotKeys(slotKey, need_read_num, real_read_num, is_finish); round_remained_keys - need_read_num; send_num_ static_castint32_t(real_read_num); } //唤醒worker线程 mgrtkeys_cond_.notify_all(); } while (0 round_remained_keys !is_finish); LOG(INFO) PikaMigrateThread:: wait ParseSenderThread finish; //阻塞等待worker线程执行完成 { std::unique_lock lw(workers_mutex_); while (!should_exit_ is_task_success_ send_num_ ! response_num_) { workers_cond_.wait(lw); } } LOG(INFO) PikaMigrateThread::ThreadMain send_num: send_num_ response_num: response_num_; if (should_exit_) { LOG(INFO) PikaMigrateThread::ThreadMain : pthread_self() exit2 !!!; DestroyThread(false); return nullptr; } // check one round migrate task success if (!is_task_success_) { LOG(ERROR) PikaMigrateThread::ThreadMain one round migrate task failed !!!; DestroyThread(true); return nullptr; } else { moved_num_ response_num_; std::unique_lock lm(mgrtkeys_map_mutex_); std::mapstd::pairconst char, std::string, std::string().swap(mgrtkeys_map_); } // check slot migrate finish int32_t slot_remained_keys 0; slot_-db()-SCard(slotKey, slot_remained_keys); if (0 slot_remained_keys) { LOG(INFO) PikaMigrateThread::ThreadMain slot_size: slot_size moved_num: moved_num_; if (slot_size ! moved_num_) { LOG(ERROR) PikaMigrateThread::ThreadMain moved_num ! slot_size !!!; } DestroyThread(true); return nullptr; } } return nullptr; }2. codis2.1 dashboarddashboard是codis架构中的中心管理节点负责proxy节点pika节点的管理发起运维操作检测节点状态以及进行failover。dashboard持有集群整体的信息需要持久化的数据保存在etcd或zookeeper或者本地磁盘文件中。2.1.1 关键类Topom是dashboard中的一个关键类记录了dashboard中所有信息。定义如下type Topom struct { mu sync.Mutex xauth string model *models.Topom //抽象出来的存储组件可以是zk/etcd/fs store *models.Store //缓存结构减少从store获取次数当成员变量值有变更时 //会调用相关接口清除cache下次获取数据时会强制从store中load cache struct { hooks list.List slots []*models.SlotMapping group map[int]*models.Group proxy map[string]*models.Proxy sentinel *models.Sentinel } exit struct { C chan struct{} } config *Config online bool closed bool ladmin net.Listener //slot迁移时使用 action struct { redisp *redis.Pool interval atomic2.Int64 disabled atomic2.Bool progress struct { status atomic.Value } executor atomic2.Int64 } stats struct { redisp *redis.Pool servers map[string]*RedisStats proxies map[string]*ProxyStats } ha struct { redisp *redis.Pool monitor *redis.CodisSentinel masters map[int]string } } context看起来是Topom的一个只读快照。 type context struct { slots []*models.SlotMapping group map[int]*models.Group proxy map[string]*models.Proxy sentinel *models.Sentinel hosts struct { sync.Mutex m map[string]net.IP } method int } const ( ActionNothing ActionPending pending ActionPreparing preparing ActionPrepared prepared ActionMigrating migrating ActionFinished finished ActionSyncing syncing )Action状态在三个地方会用到第一个是仅用于实现promot server即提升group server为master此时更新的是group.Action.State。第二个是用于实现slot的迁移第三个是用来实现group内pika节点的主从sync。2.1.2 主要函数dashboard在启动之后其主要工作通过6个goroutine来实现。分别是1. CheckMastersAndSlavesState。 2. CheckPreOffineMastersState。 3. RefreshRedisStats。 4. RefreshProxyStats。 5. ProcessSlotAction。 6. ProcessSyncAction。2.1.2.1 CheckMastersAndSlavesState主要工作周期性检测pika节点状态并根据状态统计对需要下线节点进行摘除。第一阶段是检测pika节点的状态。首先是获取所有group的server信息之后对每个server执行info replication命令获取每个group的master和slave关系。这一阶段在CodisSentinel.RefreshMastersAndSlavesClient函数中完成。第二阶段根据统计结果更新每个group中的pika节点的状态。即遍历每个server的stat统计信息如果某个pika server的error状态不为nil而且该节点是对应group的master节点需要更新该节点的状态。具体地如果该节点之前的状态是GroupServerStateNormal先将该节点标记为GroupServerStateSubjectiveOffline即主观下线。如果之前不是normal状态则累加ReCallTimes如果ReCallTimes大于等于设定的主观下线阈值将节点状态更新为GroupServerStateOffline并将该group记录到pending数据中后续将对pending中记录的group进行failover操作。其实在CheckMasterAndSlavesState中并不会走到第二个节点因为filter函数会过滤掉master状态不是normal的节点所以stat信息中不会包含非normal状态的master节点这部分逻辑将会在2.1.2.2函数中进行。接下来对每个节点更新state信息和offset信息。具体代码如下所示// It was the master node before, the master node hangs up, and it is currently the master node if state.Index 0 state.Err ! nil g.Servers[0].Addr state.Addr { if g.Servers[0].State models.GroupServerStateNormal { //主观下线 g.Servers[0].State models.GroupServerStateSubjectiveOffline } else { // update retries g.Servers[0].ReCallTimes // Retry more than config times, start election if g.Servers[0].ReCallTimes s.Config().SentinelMasterDeadCheckTimes { // Mark enters objective offline state g.Servers[0].State models.GroupServerStateOffline g.Servers[0].ReplicaGroup false } // Start the election master node if g.Servers[0].State models.GroupServerStateOffline { pending append(pending, g) } } } // Update the offset information of the state and role nodes if val, ok : serversMap[state.Addr]; ok { if state.Err ! nil { if val.State models.GroupServerStateNormal { val.State models.GroupServerStateSubjectiveOffline } continue } val.State models.GroupServerStateNormal val.ReCallTimes 0 val.Role state.Replication.Role if val.Role master { val.ReplyOffset state.Replication.MasterReplOffset } else { val.ReplyOffset state.Replication.SlaveReplOffset } }第三阶段进行failover。第二阶段中记录到pending中的group需要进行主从切换。首先从对应group中选新的master挑选的原则是状态是normal且并且replyoffset最大。接下来对新master执行slaveof no one对其他节点重新执行slaveof命令绑定到新master上。更新group统计信息交换新老master在group.servers的位置删除对应group的cache信息标记group为OutOfSync true更新store中信息。2.1.2.2 CheckPreOffineMastersState整体执行逻辑类似于CheckMasterAndSlaveState不同的是filter函数。2.1.2.1中检测状态时会忽略掉状态不是normal的master节点当前函数逻辑互补即只检测不是正常状态的master节点。猜测区分成两个函数处理的逻辑是为了使用不同的检测频率。2.1.2.3 RefreshRedisStats遍历所有的group以及每个group的server向每个pika节点发info请求获取统计信息。然后发送命令“config get maxmemory”统计maxmemory。目前返回的是max-write-buffer-size后期可以优化下。所有server的统计信息统计在map[string]*RedisStats赋值给s.stats.severs。2.1.2.4 RefreshProxyStats类似于获取redis server节点的统计信息同理向所有的proxy发请求获取统计信息记录到Topom.stats.proxies中。初次之外会扫一遍topom中记录的所有proxy如果proxy的状态不是online执行OnlineProxy对proxy进行上线操作。2.1.2.5 ProcessSlotAction功能执行slots迁移相关的工作。具体执行流程类似于ProcessSyncAction。第一步找到所有状态不是Nothing的slot说明这些slots需要执行迁移动作或者已经在执行迁移动作过程中。找到足够的slots之后更新状态到Migrating根据dashboard中设置的迁移函数开始进行迁移。每迁移完成一个slots执行一次resyncmappings操作将slots新状态同步到proxy节点。在slot迁移完成之前proxy收到业务的请求需要先将对应的key迁移到destination然后在执行读写操作。2.1.2.6 ProcessSyncAction功能遍历所有的group和所有的节点如果有group的Action成员变量不为空找到Action.Index最小的对其执行slaveof。主要函数分为SyncActionPrepare newSyncActionExecutor, SyncActionComplete。 SyncActionPrepare第一步遍历所有的group中的所有server如果server.Action.State models.ActionPending并且server.Action.Index最小记录对应的server addr。然后根据server addr找到所属的group更新group.server.Action.Index为0Action.State为ActionSyncing。newSyncActionExecutor(addr)找出对应group的master返回的lambda处理逻辑是如果master为NOONE执行slaveof no one。如果不是对addr节点执行slaveof masterSyncActionComplete扫尾函数更新cache下发mappings到proxy节点。2.1.2.7 resyncSlotMappings函数签名func (s *Topom) resyncSlotMappings(ctx *context, slots ...*models.SlotMapping) error作用是将传入的models.SlotMapping中的slot转换成models.Slot并下发给所有的proxy节点。models.Slot定义type Slot struct { Id int json:id Locked bool json:locked,omitempty BackendAddr string json:backend_addr,omitempty BackendAddrGroupId int json:backend_addr_group_id,omitempty //处于迁移状态的slot会设置这两个值. //在proxy节点如果客户端请求的key分配到了当前slot且这两个参数不为空proxy需要先迁移对应的key到target节点slotmgrtone MigrateFrom string json:migrate_from,omitempty MigrateFromGroupId int json:migrate_from_group_id,omitempty //proxy需要同步迁移一个key时使用的方法 ForwardMethod int json:forward_method,omitempty //复制组dc就近读 ReplicaGroups [][]string json:replica_groups,omitempty }主要包括了两个函数context.toSlot()和FilSlots()context完成models.Mapping到models.Slot的转换转换过程中相关key的赋值情况Locked: 如果slot的状态是prepared返回true。如果不是preapred状态检查group lock状态。BackendAddr: BackendAddrGroupId MigrateFrom MigrateFromGroupId2.1.3 主要流程slot rebalanceslot rebalance形参中有一个confirm参数如果为false表明只是生成一个slot rebalance的迁移计划并不会真正执行如果为true表明生成了迁移计划之后就更新对应slot的状态。关键的几个变量assigned : map[int]int //key: groupId, value: 不需要迁移的slots个数pendings : map[int][]int //key: groupId, value: 等待迁出的待分配的slots个数即pendings中记录的[]slot记录的slot当前属于key对应的groupId可以迁出但还没有找到目的端。moveoutmap[int]int //key: groupId, value: 对应groupId中要迁出的slots的数量。如果为正值说明对应的groupId需要往外迁移如果为负值说明需要其他的group向它迁移数据。docking 需要进行迁移的slots其中包括了offline slots还有需要进行迁移的slotsslot_rebalance整体的执行逻辑包括1. topom加锁生成一个新的context。2. 遍历ctx.slots如果某个slot的Action.State不为“”说明该slot属于已经迁移的slot那么对应的嵌入端的group的assigned值。3. 遍历ctx.slots如果某个slot的Action.State为并且groupId不为0 如果该group的slot个数小于平均值那么该group的assigned值。否则该group就需要迁出一些slots所以pendings中记录该group和slotid4. 构造一棵红黑树排序依据是groupsize将所有的groupId存到rbtree中。作用是尽量slots分片尽量均匀地分配给所有的slots所以需要按照节点持有的分片数进行排序。5. 遍历所有的slots如果某个slot的groupId为0即为offline slot(最开始集群中的slots没有分配给任何节点就是offline slot)则groupsize最小的group的moveout值--。通过rbtree找到最小size的groupmoveout值负数表示需要迁入slot。6. 从rbtree中找到groupsize最大的group再找到groupsize最小的group如果他们size相差大于1那么最大size的group的moveout最小的moveout--。其实就是在做slots均衡削峰填谷7. 根据第8步中已经计算出的moveout值遍历moveout如果值大于0表明需要迁出那么从pending中截取指定长度的slots追加到docking中。8. 遍历groupids和docking将docking中记录的slots的目的端记录为需要迁入的group生成了一个迁移plankey位slotidvalue为targetGroupId。9. 如果confirm为true更新slot的Action.State为ActionPendingAction.Index为一个单调递增的counter值targeId为plans中记录的groupId。执行storeUpdateSlotMapping更新store清除cache。processslotaction函数会执行从plan中恢复中所有需要执行迁移的slots。2.1.4 主要接口类定义type Group struct { Id int json:id Servers []*GroupServer json:servers Promoting struct { Index int json:index,omitempty State string json:state,omitempty } json:promoting OutOfSync bool json:out_of_sync }相关api创建group/api/topom/group/create/{xauth}/{gid}topom_group.go执行完成之后如果store选择的是文件系统会在prodct_name目录下创建一个group目录group目录中新建“group-{gid}”目录中初始内容{ id: 1, servers: [], promoting: {}, out_of_sync: false }添加server/api/topom/group/add/{xauth}/{gid}/{addr}/{dc} topom_group.go:GroupAddServer newcontext - getGroup() - group g如果group的Promoting.state不是ActionNothing 返回error如果ctx.sentinel.servers不为空标记sentinel.OutOfSync为true执行storeUpdateSentinel将新server追加到group的server中保存到store.group-{gid}文件内容变为{ id: 1, servers: [ { server: 10.224.129.40:9271, datacenter: , action: {}, role: , reply_offset: 0, state: 0, recall_times: 0, replica_group: false } ], promoting: {}, out_of_sync: false }主从同步/api/topom/group/action/create/{token}/10.224.129.40:9261首先是状态判断如果group.Promoting.State不是nothing返回error。如果对应server的Action.State models.ActionPending返回error,表明该server已经有action存在。设置server.Action.Index,设置Action.State models.ActionPending实际执行主从同步的步骤是通过dashboard的后台goroutine执行ProcessSyncAction完成。{ id: 1, servers: [ { server: 10.224.129.40:9271, datacenter: , action: {}, role: master, reply_offset: 0, state: 0, recall_times: 0, replica_group: false }, { server: 10.224.129.40:9261, datacenter: , action: { state: synced }, role: master, reply_offset: 0, state: 0, recall_times: 0, replica_group: false } ], promoting: {}, out_of_sync: false }rebalance/api/topom/slots/rebalance/95b62887719520f17e312eaa76d28f2b/0topom_api.go:SlotsRebalancetopom_slots.go:SlotRebalance同步完成plan的更新将plan中的每个slot保存到store中状态为pending。接下来processSlotAction协程每秒被调度执行一次首先遍历所有的slots找到Action.State不是nothing状态的slot更新状态从ActionPending - ActionPreparing - ActionPrepared - ActionMigrating。每次更新时需要同步更新store以及使cache失效。之后开始执行迁移操作根据config中的配置选择执行SLOTSMGRTTAGSLOT或者是SLOTSMGRTTAGSLOT_ASYNC. slotsmigrtagslot命令会返回对应slotkey中还剩余的key个数。dashboard收到之后就可以判断是否这个codis已经迁移完成如果已经迁移完成执行slotActionComplete,更新slotAction的状态以及resyncSlotMapings通知proxy。如果没有全部迁移完sleep指定的slot_action_interval之后重视。