0
点赞
收藏
分享

微信扫一扫

dremio fragment 执行简单说明

dremio 的内部执行实际上与drill 是比较类似的,只是dremio 做了不少的优化处理

一个调用流程说明

来自官方文档

dremio fragment 执行简单说明_sed

 

 

  • 参考调用链

dremio fragment 执行简单说明_ide_02

 

 

实际执行管理的类

AttemptManager

 

@Override

public void run() {

// rename the thread we're using for debugging purposes

final Thread currentThread = Thread.currentThread();

final String originalName = currentThread.getName();

currentThread.setName(queryIdString + ":foreman");

 

 

try {

injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_TRY_BEGINNING_ERROR,

ForemanException.class);

 

observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.PENDING));

 

observer.queryStarted(queryRequest, queryContext.getSession().getCredentials().getUserName());

 

String ruleSetEngine = ruleBasedEngineSelector.resolveAndUpdateEngine(queryContext);

ResourceSchedulingProperties resourceSchedulingProperties = new ResourceSchedulingProperties();

resourceSchedulingProperties.setRoutingEngine(queryContext.getSession().getRoutingEngine());

resourceSchedulingProperties.setRuleSetEngine(ruleSetEngine);

final GroupResourceInformation groupResourceInformation =

maestroService.getGroupResourceInformation(queryContext.getOptions(), resourceSchedulingProperties);

queryContext.setGroupResourceInformation(groupResourceInformation);

 

// Checks for Run Query privileges for the selected Engine

checkRunQueryAccessPrivilege(groupResourceInformation);

 

// planning is done in the command pool

commandPool.submit(CommandPool.Priority.LOW, attemptId.toString() + ":foreman-planning",

(waitInMillis) -> {

observer.commandPoolWait(waitInMillis);

 

injector.injectPause(queryContext.getExecutionControls(), INJECTOR_PENDING_PAUSE, logger);

injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_PENDING_ERROR,

ForemanException.class);

 

plan();

injector.injectPause(queryContext.getExecutionControls(), INJECTOR_PLAN_PAUSE, logger);

injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_PLAN_ERROR,

ForemanException.class);

return null;

}, runInSameThread).get();

 

 

if (command.getCommandType() == CommandType.ASYNC_QUERY) {

AsyncCommand asyncCommand = (AsyncCommand) command;

committer = asyncCommand.getPhysicalPlan().getCommitter();

queryCleaner =  asyncCommand.getPhysicalPlan().getCleaner();

 

moveToState(QueryState.STARTING, null);

// 通过maestroService 执行查询

maestroService.executeQuery(queryId, queryContext, asyncCommand.getPhysicalPlan(), runInSameThread,

new MaestroObserverWrapper(observer), new CompletionListenerImpl());

asyncCommand.executionStarted();

}

 

observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.RUNNING));

moveToState(QueryState.RUNNING, null);

 

injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_TRY_END_ERROR,

ForemanException.class);

} catch (ResourceUnavailableException e) {

// resource allocation failure is treated as a cancellation and not a failure

try {

// the caller (JobEventCollatingObserver) expects metadata event before a cancel/complete event.

observer.planCompleted(null);

} catch (Exception ignore) {

}

profileTracker.setCancelReason(e.getMessage());

moveToState(QueryState.CANCELED, null); // ENQUEUED/STARTING -> CANCELED transition

} catch (final UserException | ForemanException e) {

moveToState(QueryState.FAILED, e);

} catch (final OutOfMemoryError e) {

if (ErrorHelper.isDirectMemoryException(e)) {

moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger));

} else {

/*

* FragmentExecutors use a NodeStatusListener to watch out for the death of their query's AttemptManager. So, if we

* die here, they should get notified about that, and cancel themselves; we don't have to attempt to notify

* them, which might not work under these conditions.

*/

ProcessExit.exitHeap(e);

}

} catch (Throwable ex) {

UserCancellationException t = ErrorHelper.findWrappedCause(ex, UserCancellationException.class);

if (t != null) {

moveToState(QueryState.CANCELED, null);

} else {

moveToState(QueryState.FAILED,

new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));

}

 

} finally {

/*

* Begin accepting external events.

*

* Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there

* is an exception anywhere during setup, it wouldn't occur, and any events that are generated

* as a result of any partial setup that was done (such as the FragmentSubmitListener,

* the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the

* event delivery call.

*

* If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to

* make sure that we can't make things any worse as those events are delivered, but allow

* any necessary remaining cleanup to proceed.

*

* Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because AttemptManager

* would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the AttemptManager

* to accept events.

*/

try {

stateSwitch.start();

} catch (Exception e) {

moveToState(QueryState.FAILED, e);

}

 

// restore the thread's original name

currentThread.setName(originalName);

}

 

/*

* Note that despite the run() completing, the AttemptManager could continue to exist, and receives

* events about fragment completions. It won't go away until everything is completed, failed, or cancelled.

*/

}

MaestroServiceImpl

@Override

public void executeQuery(

QueryId queryId,

QueryContext context,

PhysicalPlan physicalPlan,

boolean runInSameThread,

MaestroObserver observer,

CompletionListener listener) throws ExecutionSetupException, ResourceAllocationException {

 

injector.injectChecked(context.getExecutionControls(), INJECTOR_EXECUTE_QUERY_BEGIN_ERROR,

ExecutionSetupException.class);

 

// Set up the active query.

QueryTracker queryTracker = new QueryTrackerImpl(queryId, context, physicalPlan, reader,

resourceAllocator.get(), executorSetService.get(), executorSelectionService.get(),

executorServiceClientFactory.get(), jobTelemetryClient.get(), observer,

listener,

() -> closeQuery(queryId), closeableSchedulerThreadPool);

Preconditions.checkState(activeQueryMap.putIfAbsent(queryId, queryTracker) == null,

"query already queued for execution " + QueryIdHelper.getQueryId(queryId));

 

// allocate execution resources on the calling thread, as this will most likely block

queryTracker.allocateResources();

 

try {

observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.EXECUTION_PLANNING));

 

// do execution planning in the bound pool

commandPool.get().submit(CommandPool.Priority.MEDIUM,

QueryIdHelper.getQueryId(queryId) + ":execution-planning",

(waitInMillis) -> {

injector.injectChecked(context.getExecutionControls(),

INJECTOR_COMMAND_POOL_SUBMIT_ERROR, ExecutionSetupException.class);

 

observer.commandPoolWait(waitInMillis);

queryTracker.planExecution();

return null;

}, runInSameThread).get();

} catch (ExecutionException|InterruptedException e) {

throw new ExecutionSetupException("failure during execution planning", e);

}

 

observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.STARTING));

// propagate the fragments.

queryTracker.startFragments();

 

injector.injectChecked(context.getExecutionControls(), INJECTOR_EXECUTE_QUERY_END_ERROR,

ExecutionSetupException.class);

}

执行计划

QueryTrackerImpl

@Override

public void planExecution() throws ExecutionSetupException {

executionPlanningResources = ExecutionPlanCreator.getParallelizationInfo(context, observer,

physicalPlan, executorSelectionService, resourceTracker.getResourceSchedulingDecisionInfo());

 

injector.injectChecked(context.getExecutionControls(),

INJECTOR_EXECUTION_PLANNING_ERROR, ExecutionSetupException.class);

injector.injectPause(context.getExecutionControls(),

INJECTOR_EXECUTION_PLANNING_PAUSE, logger);

 

executionPlan = ExecutionPlanCreator.getExecutionPlan(context, reader, observer, physicalPlan,

resourceTracker.getResources(),

executionPlanningResources.getPlanningSet(), executorSelectionService,

resourceTracker.getResourceSchedulingDecisionInfo(),

executionPlanningResources.getGroupResourceInformation());

observer.planCompleted(executionPlan);

physicalPlan = null; // no longer needed

}

Fragment 执行

QueryTrackerImpl

@Override

public void startFragments() throws ExecutionSetupException {

Preconditions.checkNotNull(executionPlan, "execution plan required");

 

// Populate fragments before sending the query fragments.

fragmentTracker.populate(executionPlan.getFragments(), resourceTracker.getResourceSchedulingDecisionInfo());

 

AbstractMaestroObserver fragmentActivateObserver  = new AbstractMaestroObserver() {

@Override

public void activateFragmentFailed(Exception ex) {

fragmentTracker.sendOrActivateFragmentsFailed(ex);

}

};

 

injector.injectPause(context.getExecutionControls(),

INJECTOR_STARTING_PAUSE, logger);

try {

// FragmentStarter 此处会通过rpc 调用ExecutorService,里边包含了对于FragmentExecutors 服务的调用

FragmentStarter starter = new FragmentStarter(executorServiceClientFactory,

resourceTracker.getResourceSchedulingDecisionInfo(),

context.getExecutionControls(),context.getOptions());

starter.start(executionPlan, MaestroObservers.of(observer, fragmentActivateObserver));

executionPlan = null; // no longer needed

 

progressTracker = new ProgressTracker(queryId, jobTelemetryClient, observer);

} catch (Exception ex) {

fragmentTracker.sendOrActivateFragmentsFailed(ex);

throw ex;

}

}

说明

以上是一个简单的执行说明,实际上只包含了具体的查询调用,关于fragment 的数据聚合部分没有说明,后边进行详细说明下

参考资料

sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/query/NormalHandler.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/commands/HandlerToPreparePlan.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/commands/HandlerToPrepareArrowPlan.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/commands/HandlerToExec.java
sabot/kernel/src/main/java/com/dremio/exec/work/foreman/AttemptManager.java
sabot/kernel/src/main/java/com/dremio/exec/maestro/MaestroService.java
sabot/kernel/src/main/java/com/dremio/exec/planner/fragment/SimpleParallelizer.java
​​​https://drill.apache.org/docs/drill-query-execution/​​

举报

相关推荐

0 条评论