spark发射任务源代码分析

上传人:第*** 文档编号:32819558 上传时间:2018-02-12 格式:DOC 页数:6 大小:195.50KB
返回 下载 相关 举报
spark发射任务源代码分析_第1页
第1页 / 共6页
spark发射任务源代码分析_第2页
第2页 / 共6页
spark发射任务源代码分析_第3页
第3页 / 共6页
spark发射任务源代码分析_第4页
第4页 / 共6页
spark发射任务源代码分析_第5页
第5页 / 共6页
点击查看更多>>
资源描述

《spark发射任务源代码分析》由会员分享,可在线阅读,更多相关《spark发射任务源代码分析(6页珍藏版)》请在金锄头文库上搜索。

1、其中 TaskSetManager 类的 resourceOffer()方法调用流程CoarseGrainedSchedulerBackend 的 override def receiveAndReply(context: RpcCallContext): PartialFunctionAny, Unit = 方法 ,注册 executor,包括executorId,hostPort 和 cores.形成一个 WorkOffer 的列表,并发射任务val workOffers = activeExecutors.map case (id, executorData) =new WorkerOf

2、fer(id, executorData.executorHost, executorData.freeCores).toSeqlaunchTasks(scheduler.resourceOffers(workOffers)/ Launch tasks returned by a set of resource offersprivate def launchTasks(tasks: SeqSeqTaskDescription) for (task new WorkerOffer(id, executorData.executorHost, executorData.freeCores).to

3、SeqlaunchTasks(scheduler.resourceOffers(workOffers)先对所有的 WordOffer 进行 random Shuffle 打乱顺序,并对 TaskSet 进行排序/ Take each TaskSet in our scheduling order, and then offer it each node in increasing order/ of locality levels so that it gets a chance to launch local tasks on all of them./ NOTE: the preferre

4、dLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY然后不断调用自己的 resourceOfferSingleTaskSet()方法,直到 taskSet 发射成功。/* Called by cluster manager to offer resources on slaves. We respond by asking our active task* sets for tasks in order of priority. We fill each node with tasks in a round-r

5、obin manner so* that tasks are balanced across the cluster.*/def resourceOffers(offers: SeqWorkerOffer): SeqSeqTaskDescription = synchronized / Mark each slave as alive and remember its hostname/ Also track if new executor is addedvar newExecAvail = false for (o new ArrayBufferTaskDescription(o.core

6、s)val availableCpus = shuffledOffers.map(o = o.cores).toArrayval sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet 0) hasLaunchedTask = truereturn tasksprivate def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: SeqWorkerOffer,availableCpus: Arr

7、ayInt,tasks: SeqArrayBufferTaskDescription) : Boolean = var launchedTask = falsefor (i = CPUS_PER_TASK) try for (task = 0)launchedTask = true catch case e: TaskNotSerializableException =logError(sResource offer failed, task set $taskSet.name was not serializable)/ Do not offer resources for this tas

8、k, but dont throw an error to allow other/ task sets to be submitted.return launchedTaskreturn launchedTask在这个函数中分配 execId.对于所有的 WorkerOffer(executorId, host, cores),如果可以获取的 CPU 的数量大于完成每个任务所需要的 CPU 数量,就调用了TaskSetManager 的 resourceOffer()方法,返回一个 task.其中,每个 task 需要的 CPU个数是从配置文件中读取的,默认为然后,该 executor 的

9、availableCpus-= CPUS_PER_TASK,且executor 的 availableCpus=0,否则,换下一个 executor.进行整个循环,直到所有任务成功发射,返回 true,否则,返回 false;private def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: SeqWorkerOffer,availableCpus: ArrayInt,tasks: SeqArrayBufferTaskDescription) : Boo

10、lean = var launchedTask = falsefor (i = CPUS_PER_TASK) try for (task = 0)launchedTask = true catch case e: TaskNotSerializableException =logError(sResource offer failed, task set $taskSet.name was not serializable)/ Do not offer resources for this task, but dont throw an error to allow other/ task s

11、ets to be submitted.return launchedTaskreturn launchedTaskTaskSetManager 类成员变量pendingTasksForExecutor,以系列的为每个 executor/ Set of pending tasks for each executor. These collections are actually/ treated as stacks, in which new tasks are added to the end of the/ ArrayBuffer and removed from the end. Thi

12、s makes it faster to detect/ tasks that repeatedly fail because whenever a task failed, it is put/ back at the head of the stack. They are also only cleaned up lazily;/ when a task is launched, it remains in all the pending lists except/ the one that it was launched from, but gets removed from them

13、later.private val pendingTasksForExecutor = new HashMapString, ArrayBufferIntresourceOffer()方法/* Respond to an offer of a single executor from the scheduler by finding a task* NOTE: this function is either called with a maxLocality which* would be adjusted by delay scheduling algorithm or it will be

14、 with a special* NO_PREF locality which will be not modified* param execId the executor Id of the offered resource* param host the host Id of the offered resource* param maxLocality the maximum locality we want to schedule the tasks at*/throwsTaskNotSerializableExceptiondef resourceOffer(调用 dequeueT

15、ask 方法让 Task 出栈dequeueTask 方法调用/* Dequeue a pending task from the given list and return its index.* Return None if the list is empty.* This method also cleans up any tasks in the list that have already* been launched, since we want that to happen lazily.*/private def dequeueTaskFromList(execId: Stri

16、ng, list: ArrayBufferInt): OptionInt = var indexOffset = list.sizewhile (indexOffset 0) indexOffset -= 1val index = list(indexOffset)if (!executorIsBlacklisted(execId, index) / This should almost always be list.trimEnd(1) to remove taillist.remove(indexOffset)if (copiesRunning(index) = 0 & !successful(index) return Some(index)None从前三行可以看出,每次都返回

展开阅读全文
相关资源
正为您匹配相似的精品文档
相关搜索

最新文档


当前位置:首页 > 建筑/环境 > 工程造价

电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号