本文共 11374 字,大约阅读时间需要 37 分钟。
根据前面源码阅读可以知道,JobContainer将所有的task分配到TaskGroup中执行,TaskGroup启动5个线程去消费所有的task的,具体实现为
public void start() { try { /** * 状态check时间间隔,较短,可以把任务及时分发到对应channel中 */ int sleepIntervalInMillSec = this.configuration.getInt( CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100); /** * 状态汇报时间间隔,稍长,避免大量汇报 */ long reportIntervalInMillSec = this.configuration.getLong( CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL, 10000); /** * 2分钟汇报一次性能统计 */ // 获取channel数目 int channelNumber = this.configuration.getInt( CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL); int taskMaxRetryTimes = this.configuration.getInt( CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1); long taskRetryIntervalInMsec = this.configuration.getLong( CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000); long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000); ListtaskConfigs = this.configuration .getListConfiguration(CoreConstant.DATAX_JOB_CONTENT); if(LOG.isDebugEnabled()) { LOG.debug("taskGroup[{}]'s task configs[{}]", this.taskGroupId, JSON.toJSONString(taskConfigs)); } int taskCountInThisTaskGroup = taskConfigs.size(); /*LOG.info(String.format( "taskGroupId=[%d] start [%d] channels for [%d] tasks.", this.taskGroupId, channelNumber, taskCountInThisTaskGroup));*/ this.containerCommunicator.registerCommunication(taskConfigs); Map taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置 List taskQueue = buildRemainTasks(taskConfigs); //待运行task列表 Map taskFailedExecutorMap = new HashMap (); //taskId与上次失败实例 List runTasks = new ArrayList (channelNumber); //正在运行task Map taskStartTimeMap = new HashMap (); //任务开始时间 long lastReportTimeStamp = 0; Communication lastTaskGroupContainerCommunication = new Communication(); while (true) { //1.判断task状态 boolean failedOrKilled = false; Map communicationMap = containerCommunicator.getCommunicationMap(); for(Map.Entry entry : communicationMap.entrySet()){ Integer taskId = entry.getKey(); Communication taskCommunication = entry.getValue(); if(!taskCommunication.isFinished()){ continue; } TaskExecutor taskExecutor = removeTask(runTasks, taskId); //上面从runTasks里移除了,因此对应在monitor里移除 taskMonitor.removeTask(taskId); //失败,看task是否支持failover,重试次数未超过最大限制 if(taskCommunication.getState() == State.FAILED){ taskFailedExecutorMap.put(taskId, taskExecutor); if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){ taskExecutor.shutdown(); //关闭老的executor containerCommunicator.resetCommunication(taskId); //将task的状态重置 Configuration taskConfig = taskConfigMap.get(taskId); taskQueue.add(taskConfig); //重新加入任务列表 }else{ failedOrKilled = true; break; } }else if(taskCommunication.getState() == State.KILLED){ failedOrKilled = true; break; }else if(taskCommunication.getState() == State.SUCCEEDED){ Long taskStartTime = taskStartTimeMap.get(taskId); if(taskStartTime != null){ Long usedTime = System.currentTimeMillis() - taskStartTime; /*LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms", this.taskGroupId, taskId, usedTime);*/ //usedTime*1000*1000 转换成PerfRecord记录的ns,这里主要是简单登记,进行最长任务的打印。因此增加特定静态方法 PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L); taskStartTimeMap.remove(taskId); taskConfigMap.remove(taskId); } } } // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误 if (failedOrKilled) { lastTaskGroupContainerCommunication = reportTaskGroupCommunication( lastTaskGroupContainerCommunication, taskCountInThisTaskGroup); throw DataXException.asDataXException( FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable()); } //3.有任务未执行,且正在运行的任务数小于最大通道限制 Iterator iterator = taskQueue.iterator(); while(iterator.hasNext() && runTasks.size() < channelNumber){ Configuration taskConfig = iterator.next(); Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID); int attemptCount = 1; TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId); if(lastExecutor!=null){ attemptCount = lastExecutor.getAttemptCount() + 1; long now = System.currentTimeMillis(); long failedTime = lastExecutor.getTimeStamp(); if(now - failedTime < taskRetryIntervalInMsec){ //未到等待时间,继续留在队列 continue; } if(!lastExecutor.isShutdown()){ //上次失败的task仍未结束 if(now - failedTime > taskMaxWaitInMsec){ markCommunicationFailed(taskId); reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup); throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "task failover等待超时"); }else{ lastExecutor.shutdown(); //再次尝试关闭 continue; } }else{ /*LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown", this.taskGroupId, taskId, lastExecutor.getAttemptCount());*/ } } Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig; TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount); taskStartTimeMap.put(taskId, System.currentTimeMillis()); taskExecutor.doStart(); iterator.remove(); runTasks.add(taskExecutor); //上面,增加task到runTasks列表,因此在monitor里注册。 taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId)); taskFailedExecutorMap.remove(taskId); /*LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started", this.taskGroupId, taskId, attemptCount);*/ } //4.任务列表为空,executor已结束, 搜集状态为success--->成功 if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) { // 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确 lastTaskGroupContainerCommunication = reportTaskGroupCommunication( lastTaskGroupContainerCommunication, taskCountInThisTaskGroup); /*LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);*/ break; } // 5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报 long now = System.currentTimeMillis(); if (now - lastReportTimeStamp > reportIntervalInMillSec) { lastTaskGroupContainerCommunication = reportTaskGroupCommunication( lastTaskGroupContainerCommunication, taskCountInThisTaskGroup); lastReportTimeStamp = now; //taskMonitor对于正在运行的task,每reportIntervalInMillSec进行检查 for(TaskExecutor taskExecutor:runTasks){ taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId())); if(DATX_LOG_ENABLE){ LOG.info("Running queue capacity is :[{}], current length is:[{}]", taskExecutor.channel.getCapacity(), taskExecutor.channel.size()); } } } Thread.sleep(sleepIntervalInMillSec); } //6.最后还要汇报一次 reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup); } catch (Throwable e) { Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect(); if (nowTaskGroupContainerCommunication.getThrowable() == null) { nowTaskGroupContainerCommunication.setThrowable(e); } nowTaskGroupContainerCommunication.setState(State.FAILED); this.containerCommunicator.report(nowTaskGroupContainerCommunication); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); }finally { if(!PerfTrace.getInstance().isJob()){ //最后打印cpu的平均消耗,GC的统计 VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { vmInfo.getDelta(false); LOG.info(vmInfo.totalString()); } LOG.info(PerfTrace.getInstance().summarizeNoException()); } }}上述实现主要分为以下几个步骤: 1、初始化task执行相关的状态信息,分别是taskId->Congifuration的map、待运行的任务队列taskQueue、运行失败任务taskFailedExecutorMap、运行中的任务runTasks、任务开始时间taskStartTimeMap 2、循环检测所有任务的执行状态 1)判断是否有失败的task,如果有则放入失败对立中,并查看当前的执行是否支持重跑和failOver,如果支持则重新放回执行队列中;如果没有失败,则标记任务执行成功,并从状态轮询map中移除 2)如果发现有失败的任务,则汇报当前TaskGroup的状态,并抛出异常 3)查看当前执行队列的长度,如果发现执行队列还有通道,则构建TaskExecutor加入执行队列,并从待运行移除 4)检查执行队列和所有的任务状态,如果所有的任务都执行成功,则汇报taskGroup的状态并从循环中退出 5)检查当前时间是否超过汇报时间检测,如果是,则汇报当前状态 6)当所有的执行完成从while中退出之后,再次全局汇报当前的任务状态
至此,taskGroup中的所有执行完成,上述taskGroup的运行队列只是将负责对task任务进行调度,具体的执行还是TaskExecutor负责实现,下面看看TaskExecutor的执行,代码实现如下
public TaskExecutor(Configuration taskConf, int attemptCount) { // 获取该taskExecutor的配置 this.taskConfig = taskConf; Validate.isTrue(null != this.taskConfig.getConfiguration(CoreConstant.JOB_READER) && null != this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER), "[reader|writer]的插件参数不能为空!"); // 得到taskId this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID); this.attemptCount = attemptCount; /** * 由taskId得到该taskExecutor的Communication * 要传给readerRunner和writerRunner,同时要传给channel作统计用 */ this.taskCommunication = containerCommunicator .getCommunication(taskId); Validate.notNull(this.taskCommunication, String.format("taskId[%d]的Communication没有注册过", taskId)); this.channel = ClassUtil.instantiate(channelClazz, Channel.class, configuration); this.channel.setCommunication(this.taskCommunication); /** * 获取transformer的参数 */ ListtransformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig); /** * 生成writerThread */ writerRunner = (WriterRunner) generateRunner(PluginType.WRITER); this.writerThread = new Thread(writerRunner, String.format("%d-%d-%d-writer", jobId, taskGroupId, this.taskId)); //通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器 this.writerThread.setContextClassLoader(LoadUtil.getJarLoader( PluginType.WRITER, this.taskConfig.getString( CoreConstant.JOB_WRITER_NAME))); /** * 生成readerThread */ readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs); this.readerThread = new Thread(readerRunner, String.format("%d-%d-%d-reader", jobId, taskGroupId, this.taskId)); /** * 通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器 */ this.readerThread.setContextClassLoader(LoadUtil.getJarLoader( PluginType.READER, this.taskConfig.getString( CoreConstant.JOB_READER_NAME))); } public void doStart() { this.writerThread.start(); // reader没有起来,writer不可能结束 if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) { throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable()); } this.readerThread.start(); // 这里reader可能很快结束 if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) { // 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常 throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable()); } }
TaskExecutor构建的时候,生成一个reader、channel和writer,并启动两个线程,reader生产数据写入channel,writer从channel中读数据,任务执行完毕时,通过wirter将任务状态置为成功
转载地址:http://sllxo.baihongyu.com/