0
点赞
收藏
分享

微信扫一扫

dremio ExecutionPlanCreator 简单说明

dremio 在进行了一系列的sql 解析,sql 校验,逻辑计划、物理计划之后就需要实际的执行计划生成以及具体的数据处理了
ExecutionPlanCreator 的作用就是进行执行计划的生成,在dremio 中核心是fragment (包含了Major 以及Minor)

调用

参考如下,由 MaestroService 调用QueryTracker 使用触发的

 

// 调用生成执行计划信息

@com.dremio.exec.maestro.QueryTrackerImpl.planExecution()

at com.dremio.exec.maestro.MaestroServiceImpl.lambda$executeQuery$2(MaestroServiceImpl.java:167)

at com.dremio.service.commandpool.ReleasableBoundCommandPool.lambda$getWrappedCommand$3(ReleasableBoundCommandPool.java:137)

at com.dremio.service.commandpool.CommandWrapper.run(CommandWrapper.java:62)

at com.dremio.context.RequestContext.run(RequestContext.java:96)

at com.dremio.common.concurrent.ContextMigratingExecutorService.lambda$decorate$3(ContextMigratingExecutorService.java:199)

at com.dremio.common.concurrent.ContextMigratingExecutorService$ComparableRunnable.run(ContextMigratingExecutorService.java:180)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:750)

构造

  • 提供的方法

dremio  ExecutionPlanCreator 简单说明_List

 

 

生成执行计划信息

public static ExecutionPlan getExecutionPlan(

final QueryContext queryContext,

final PhysicalPlanReader reader,

MaestroObserver observer,

final PhysicalPlan plan,

ResourceSet allocationSet,

PlanningSet planningSet,

ExecutorSelectionService executorSelectionService,

ResourceSchedulingDecisionInfo resourceSchedulingDecisionInfo,

GroupResourceInformation groupResourceInformation

) throws ExecutionSetupException {

 

final Root rootOperator = plan.getRoot();

final Fragment rootOperatorFragment = MakeFragmentsVisitor.makeFragments(rootOperator);

//    SimpleParallelizer 简单的并行化器,实际上此类才是一个比较核心的

final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext,

observer, executorSelectionService, resourceSchedulingDecisionInfo, groupResourceInformation);

final long queryPerNodeFromResourceAllocation =  allocationSet.getPerNodeQueryMemoryLimit();

planningSet.setMemoryAllocationPerNode(queryPerNodeFromResourceAllocation);

 

// set bounded memory for all bounded memory operations

MemoryAllocationUtilities.setupBoundedMemoryAllocations(

plan,

queryContext.getOptions(),

groupResourceInformation,

planningSet,

queryPerNodeFromResourceAllocation);

 

// pass all query, session and non-default system options to the fragments

final OptionList fragmentOptions = filterDCSControlOptions(queryContext.getNonDefaultOptions());

 

// index repetitive items to reduce rpc size.

final PlanFragmentsIndex.Builder indexBuilder = new PlanFragmentsIndex.Builder();

 

final List<PlanFragmentFull> planFragments = parallelizer.getFragments(

fragmentOptions,

planningSet,

reader,

rootOperatorFragment,

indexBuilder);

 

traceFragments(queryContext, planFragments);

 

return new ExecutionPlan(queryContext.getQueryId(), plan, planFragments, indexBuilder);

}

SimpleParallelizer

  • 参考类图

dremio  ExecutionPlanCreator 简单说明_java_02

 

 

  • 生成fragment 方法

public List<PlanFragmentFull> getFragments(

OptionList options,

PlanningSet planningSet,

PhysicalPlanReader reader,

Fragment rootFragment,

PlanFragmentsIndex.Builder indexBuilder) throws ExecutionSetupException {

Preconditions.checkNotNull(queryContext);

final Stopwatch stopwatch = Stopwatch.createStarted();

List<PlanFragmentFull> fragments =

// 此方式是核心

generateWorkUnit(options, reader, rootFragment, planningSet, indexBuilder);

stopwatch.stop();

observer.planGenerationTime(stopwatch.elapsed(TimeUnit.MILLISECONDS));

observer.plansDistributionComplete(new QueryWorkUnit(fragments));

return fragments;

}

generateWorkUnit
主要是生成PlanFragmentMajor 以及PlanFragmentMinor, 从函数参数可以看出是依赖PlanningSet 的,这个由 方法
getExecutionPlanningResources 依赖当前执行器的资源情况进行评估(包含了cpu,内存)

 

protected List<PlanFragmentFull> generateWorkUnit(

OptionList options,

NodeEndpoint foremanNode,

QueryId queryId,

PhysicalPlanReader reader,

Fragment rootNode,

PlanningSet planningSet,

PlanFragmentsIndex.Builder indexBuilder,

UserSession session,

QueryContextInformation queryContextInfo,

FunctionLookupContext functionLookupContext) throws ExecutionSetupException {

 

final List<PlanFragmentFull> fragments = Lists.newArrayList();

EndpointsIndex.Builder builder = indexBuilder.getEndpointsIndexBuilder();

MajorFragmentAssignmentCache majorFragmentAssignmentsCache = new MajorFragmentAssignmentCache();

// now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints

// assigned before we can materialize, so we start a new loop here rather than utilizing the previous one.

for (Wrapper wrapper : planningSet) {

Fragment node = wrapper.getNode();

final PhysicalOperator physicalOperatorRoot = node.getRoot();

boolean isRootNode = rootNode == node;

 

if (isRootNode && wrapper.getWidth() != 1) {

throw new ForemanSetupException(String.format("Failure while trying to setup fragment. " +

"The root fragment must always have parallelization one. In the current case, the width was set to %d.",

wrapper.getWidth()));

}

// a fragment is self driven if it doesn't rely on any other exchanges.

boolean isLeafFragment = node.getReceivingExchangePairs().size() == 0;

 

CoordExecRPC.QueryContextInformation queryContextInformation = CoordExecRPC.QueryContextInformation.newBuilder

(queryContextInfo)

.setQueryMaxAllocation(wrapper.getMemoryAllocationPerNode()).build();

 

// come up with a list of minor fragments assigned for each endpoint.

final List<FragmentAssignment> assignments = new ArrayList<>();

 

if(queryContext.getOptions().getOption(VectorizedHashAggOperator.OOB_SPILL_TRIGGER_ENABLED) ||

queryContext.getOptions().getOption(ExternalSortOperator.OOB_SORT_TRIGGER_ENABLED)) {

 

// collate by node.

ArrayListMultimap<Integer, Integer> assignMap = ArrayListMultimap.create();

for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) {

assignMap.put(builder.addNodeEndpoint(wrapper.getAssignedEndpoint(minorFragmentId)), minorFragmentId);

}

 

// create getAssignment lists.

for(int ep : assignMap.keySet()) {

assignments.add(

FragmentAssignment.newBuilder()

.setAssignmentIndex(ep)

.addAllMinorFragmentId(assignMap.get(ep))

.build());

}

}

 

// Create a minorFragment for each major fragment.

PlanFragmentMajor major = null;

boolean majorAdded = false;

// Create a minorFragment for each major fragment.

for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) {

IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);

wrapper.resetAllocation();

PhysicalOperator op = physicalOperatorRoot.accept(new Materializer(wrapper.getSplitSets(), builder), iNode);

 

Preconditions.checkArgument(op instanceof FragmentRoot);

FragmentRoot root = (FragmentRoot) op;

 

FragmentHandle handle =

FragmentHandle //

.newBuilder() //

.setMajorFragmentId(wrapper.getMajorFragmentId()) //

.setMinorFragmentId(minorFragmentId)

.setQueryId(queryId) //

.build();

 

// Build the major fragment only once.

if (!majorAdded) {

majorAdded = true;

 

// get plan as JSON

ByteString plan;

ByteString optionsData;

try {

plan = reader.writeJsonBytes(root, fragmentCodec);

optionsData = reader.writeJsonBytes(options, fragmentCodec);

} catch (JsonProcessingException e) {

throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);

}

 

// If any of the operators report ext communicable fragments, fill in the assignment and node details.

final Set<Integer> extCommunicableMajorFragments = physicalOperatorRoot.accept(new ExtCommunicableFragmentCollector(), wrapper);

majorFragmentAssignmentsCache.populateIfAbsent(planningSet, builder, extCommunicableMajorFragments);

final List<MajorFragmentAssignment> extFragmentAssignments =

majorFragmentAssignmentsCache.getAssignments(planningSet, builder, extCommunicableMajorFragments);

major =

PlanFragmentMajor.newBuilder()

.setForeman(foremanNode)

.setFragmentJson(plan)

.setHandle(handle.toBuilder().clearMinorFragmentId().build())

.setLeafFragment(isLeafFragment)

.setContext(queryContextInformation)

.setMemInitial(wrapper.getInitialAllocation())

.setOptionsJson(optionsData)

.setCredentials(session.getCredentials())

.setPriority(queryContextInfo.getPriority())

.setFragmentCodec(fragmentCodec)

.setFragmentExecWeight(wrapper.getAssignedWeight())

.addAllAllAssignment(assignments)

.addAllExtFragmentAssignments(extFragmentAssignments)

.build();

 

if (logger.isTraceEnabled()) {

logger.trace(

"Remote major fragment:\n {}", DremioStringUtils.unescapeJava(major.toString()));

}

}

 

final NodeEndpoint assignment = wrapper.getAssignedEndpoint(minorFragmentId);

final NodeEndpoint endpoint = builder.getMinimalEndpoint(assignment);

List<MinorAttr> attrList = MinorDataCollector.collect(handle,

endpoint,

root,

new MinorDataSerDe(reader,fragmentCodec),

indexBuilder);

 

 

// Build minor specific info and attributes.

PlanFragmentMinor minor = PlanFragmentMinor.newBuilder()

.setMajorFragmentId(wrapper.getMajorFragmentId())

.setMinorFragmentId(minorFragmentId)

.setAssignment(endpoint)

.setMemMax(wrapper.getMemoryAllocationPerNode())

.addAllCollector(CountRequiredFragments.getCollectors(root))

.addAllAttrs(attrList)

.build();

 

if (logger.isTraceEnabled()) {

logger.trace(

"Remote minor fragment:\n {}", DremioStringUtils.unescapeJava(minor.toString()));

}

 

fragments.add(new PlanFragmentFull(major, minor));

}

}

 

return fragments;

}

说明

以上是一个简单的说明,实际上dremio 具体执行计划还是比较复杂的,详细的可以多看看官方源码以及drill 的官方文档

参考资料

sabot/kernel/src/main/java/com/dremio/exec/maestro/planner/ExecutionPlanCreator.java
sabot/kernel/src/main/java/com/dremio/exec/planner/fragment/SimpleParallelizer.java

举报

相关推荐

0 条评论