神刀安全网

tbschedule源码解读

tbschedule源码解读

tbschedule部署包括两部分,一个是负责配置管理的后台程序,一个是客户端接入包,这两个程序依赖zk进行信息交互。

zk数据的大致结构

factory部分

/app1/factory /app1/factory/facotoryUUID1 /app1/strategy /app1/strategy/strategy1 /app1/strategy/strategy1/factoryUUID1

可以有多个facotory,每个factory对应一个客户端启动的TBScheduleManagerFactory实例,每个JVM可以有多个factory实例,factory实例也可以存在于不同的JVM中。
strategy是在后台配置的任务策略,每个factory启动时候回去检查自己能处理哪几个strategy,如果能处理则在/app1/strategy/strategy1/路径下注册自己,注册的这个信息在tbschedule源码里叫做FactoryRunningInfo

ScheduleServer部分

/app1/baseTaskType /app1/baseTaskType/task1 /app1/baseTaskType/task1/task1 /app1/baseTaskType/task1/task1/server /app1/baseTaskType/task1/task1/server/scheduleServerUUID1 /app1/baseTaskType/task1/task1/taskItem /app1/baseTaskType/task1/task1/taskItem/taskItem1 /app1/baseTaskType/task1/task1/taskItem/taskItem1/cur_server /app1/baseTaskType/task1/task1/taskItem/taskItem1/deal_desc /app1/baseTaskType/task1/task1/taskItem/taskItem1/parameter /app1/baseTaskType/task1/task1/taskItem/taskItem1/req_server /app1/baseTaskType/task1/task1/taskItem/taskItem1/sts

task1是在后台配置的任务。
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1表示可以用来处理任务的调度器,每个factory实例可以有多个ScheduleServer实例。
/app1/baseTaskType/task1/task1/taskItem表示配置任务时,每个任务可以拆分成几个小的任务项。该节点的子节点,表示这个任务项运行时的信息,例如cur_server表示这个taskItem正在被哪个ScheduleServer处理。这些在tbschedule源码里也叫作runningInfo。

核心类图

tbschedule源码解读

TBScheduleManagerFactoryfactory实例对象,管理这个factory内部所有的事情。
ZKManager负责与zk之间的连接,数据交换。
IScheduleDataManager负责/app1/baseTaskType及其子节点所有数据模型维护。
ScheduleDataManger4ZK负责/app1/factory``/app1/strategy及其字节点数据模型维护。
IStrategyTask每个实例代表一个线程组,每个strategy可对应多个IStrategyTask实例,来真正处理配置的任务。

关于这几个类的组合关系如下图:

tbschedule源码解读

一个Factory处理多个strategy,每个strategy下有多个IStrategyTask对象。
TBScheduleManager实现IStrategyTask接口,一个TBScheduleManager实例跟ScheduleServerScheduleProcessorIScheduleTaskDeal的关系都是一比一的关系。ScheduleServer是针对某一个task的的调度器。IScheduleTaskDeal是我们自己代码里需要实现的任务对象。ScheduleProcessor是处理任务的多线程任务处理器,代表一个线程组。可以包含多个线程,线程的最大数量取决于后台配置的task身上的threadNum字段。
一个Factory有多个IStrategyTask的原因是,任务需要分片处理,每个分片对应一个IStrategyTask实例。
一个ScheduleProcessor有多个Thread的原因是,一个任务分片下一次可以取出多个任务,开启多线程可以并发处理这些任务。

初始化流程

tbschedule源码解读

整个初始化过程大量使用Thread、Timer,很多工作都是异步进行的,且这些线程之间通过了状态对象、锁等方式进行了协调。
整个初始化过程粗略来看包括以下几步:

  1. 创建ZKManager对象
  2. 启动初始化线程InitalThread,然后立即返回
    接着便是InitalThread异步做的初始化工作:
  3. 准备好ZKManager、ScheduleDataManager4ZK、ScheduleStrategyDataManager4ZK对象
  4. 启动定时Timer对象ManagerFactoryTimerTask
    接着便是ManagerFactoryTimerTask定时执行的工作,主要是去扫描strategy配置,重新分配factory去处理这些strategy。分配完factory,会创建StrategyTask进行任务的处理。

    factory刷新工作详解

    整个过程源码入口在ManagerFactoryTimerTask#run()中,而主要的逻辑集中在TBScheduleManagerFactory#refresh()。这里不去关心stop factory的逆向流程,只来看正向流程,见TBScheduleManagerFactory #reRegisterManagerFactory

  5. 遍历strategy,重新计算factory实例跟strategy的匹配关系

  6. 找到当前factory实例不能处理的strategy,并停止掉正在运行的StrategyTask
  7. 遍历跟当前factory实例相关的strategy,选举出每个strategy的leader factory实例,由leader重新计算每个factory实例能够分到的reqNum,即根据strategy身上的assignNum``numOfSingleServer,将assignNum平分给每个factory实例。
  8. 调整当前factory实例分配到每个strategy的的StrategyTask的数量,确保数量等于上一步分配给自己的数量。

factory线程组数量分配算法


见ScheduleUtil#assignTaskNumber

/**    * 分配任务数量    * @param serverNum 总的服务器数量    * @param taskItemNum 任务项数量    * @param maxNumOfOneServer 每个server最大任务项数目    * @return    */   public static int[] assignTaskNumber(int serverNum,int taskItemNum,int maxNumOfOneServer){     int[] taskNums = new int[serverNum];     int numOfSingle = taskItemNum / serverNum;     int otherNum = taskItemNum % serverNum;     //20150323 删除, 任务分片保证分配到所有的线程组数上。 开始 //    if (maxNumOfOneServer >0 && numOfSingle >= maxNumOfOneServer) { //      numOfSingle = maxNumOfOneServer; //      otherNum = 0; //    }     //20150323 删除, 任务分片保证分配到所有的线程组数上。 结束     for (int i = 0; i < taskNums.length; i++) {       if (i < otherNum) {         taskNums[i] = numOfSingle + 1;       } else {         taskNums[i] = numOfSingle;       }     }     return taskNums;   }

TBScheduleManagerStatic的初始化流程

tbschedule源码解读

  1. 找到task配置的用户实现的IScheduleTaskDeal对象
  2. 将当前ScheduleServer实例注册到/app1/baseTaskType/task1/task1/server/scheduleServerUUID1位置
  3. 启动心跳TimerHearBeatTimerTask
  4. 启动初始化线程

    心跳Timer

    这里主要做的事情就是重新将taskItem分配到每个SchedueServer,源码位置在TBScheduleManagerStatic#assignScheduleTask()。首先选举出当前ScheduleServer对应的task对应的所有ScheduleServer实例,选举出一个leader,由leader进行分配工作。

  5. 等到初始化线程完成initialRunningInfo的工作
  6. clearTaskItem,遍历所有taskItem,查看对应的cur_server是否还能找到,找不到则将cur_server置为null
  7. assignTaskItem,给每个taskItem分配合适的ScheduleServer实例。

    初始化线程

  8. initialRunningInfo,由当前task的leader ScheduleServer实例初始化这个task下所有的taskItem子节点的数据,此时还没有分配每个taskItem由哪个ScheduleServer实例执行(见心跳Timer)
  9. getCurrentScheduleTaskItemListNow,重新加载当前ScheduleServer能处理的taskItem项目
  10. computerStart,创建两个Timer,一个用来计算任务下次执行开始时间,一个用来计算任务下次终止执行时间。停止跟恢复通过TBScheduleManager身上的isPauseSchedule字段来标识。
  11. 恢复的时候去创建TBScheduleProcessorSleep``TBScheduleProcessorNotSleep对象;停止的时候,会将已经在执行的任务处理完,但是缓存在队列中待执行的任务将被丢弃。

    TBScheduleProcessorSleep多线程工作原理

    启动task配置的threadNum数量的线程去处理任务。由其中某一个线程去获取任务,将入taskList队列中,所有的线程从这个队列中获取任务执行,如果是Multi任务,可以一次取多个任务执行。在一个线程获取任务的过程中,其他线程处于休眠状态,任务获取完毕唤醒其他线程。获取任务代码在TBScheduleProcessorSleep#loadScheduleData,每次获取都是调用一次IScheduleTaskDeal对象selectTasks方法获取一批任务放到taskList中。
    两次loadScheduleData有一个休眠时间,即在task上配置的SleepTimeInterval。
    一旦TBScheduleProcessorSleep启动了,会一直循环执行,知道PauseTimer让其停止,如果你没有配置结束时间,则不会停止,而是一直运行;也可以通过后台配置将任务停止。

    总结

    tbschdule通过任务分片,将一个任务分配给多个线程组(即ScheduleServer实例)执行,这些线程组可以分布在相同或者不同的JVM上。而每个线程组支持多线程处理某一个分片的任务。
    tbschedule同时支持失效任务转移功能,并且可以通过管理后台对任务进行调度管理。
    不过官方文档实在太少。

参考:
tbschedule
关于TbSchedule任务调度管理框架的整合部署


进入博主个人站点,阅读更多文章

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » tbschedule源码解读

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址