0
点赞
收藏
分享

微信扫一扫

通过dremio 一个单元测试了解基本查询处理过程

沐之轻语 2022-12-31 阅读 107

dremio 属于一个比较复杂的系统,官方有不少模块,官方同时也包含了一个不错的单元测试可以基本了解查询的处理
从session到查询,到sql 解析,关系节点转换,逻辑计划器以及物理计划以及执行计划

参考代码

 

public class Limit0LogicalToPhysicalTest extends BaseTestQuery {

 

private static String TEST_PATH = TestTools.getWorkingPath() + "/src/test/resources";

private static File tblPath = null;

 

@BeforeClass

public static void createTable() throws Exception {

tblPath = new File(getDfsTestTmpSchemaLocation(), "yelp");

FileUtils.deleteQuietly(tblPath);

FileUtils.copyFileToDirectory(new File(TEST_PATH + "/yelp_business.json"), tblPath);

FileUtils.moveFile(new File(tblPath + "/yelp_business.json"), new File(tblPath + "/1.json"));

FileUtils.copyFile(new File(tblPath + "/1.json"), new File(tblPath + "/2.json"));

}

 

@AfterClass

public static void cleanUpTable() throws Exception {

FileUtils.deleteQuietly(tblPath);

}

 

@Ignore

public void ExchangesKeepTest() throws Exception {

 

final String yelpTable = TEMP_SCHEMA + ".\"yelp\"";

final String sql = "SELECT nested_0.review_id AS review_id, nested_0.user_id AS user_id, nested_0.votes AS votes," +

" nested_0.stars AS stars, join_business.business_id AS business_id0, join_business.neighborhoods AS neighborhoods, join_business.city AS city, join_business.latitude AS latitude, join_business.review_count AS review_count, join_business.full_address AS full_address, join_business.stars AS stars0, join_business.categories AS categories, join_business.state AS state, join_business.longitude AS longitude\n" +

"FROM (\n" +

"  SELECT review_id, user_id, votes, stars, business_id\n" +

"  FROM cp.\"yelp_review.json\" where 1 = 0\n" +

") nested_0\n" +

" FULL JOIN " + yelpTable + " AS join_business ON nested_0.business_id = join_business.business_id";

// 上下文配置信息

final SabotContext context = getSabotContext();

context.getOptionManager().setOption(

OptionValue.createLong(OptionValue.OptionType.SYSTEM, "planner.slice_target", 1)

);

context.getOptionManager().setOption(

OptionValue.createLong(OptionValue.OptionType.SYSTEM, "planner.width.max_per_node", 10)

);

context.getOptionManager().setOption(

OptionValue.createBoolean(OptionValue.OptionType.SYSTEM, "planner.enable_mux_exchange", true)

);

// 查询上下文

final QueryContext queryContext = new QueryContext(session(), context, UserBitShared.QueryId.getDefaultInstance());

// AttemptObserver

final AttemptObserver observer = new PassthroughQueryObserver(ExecTest.mockUserClientConnection(null));

// SqlConverter 进行sql 转换处理的

final SqlConverter converter = new SqlConverter(

queryContext.getPlannerSettings(),

queryContext.getOperatorTable(),

queryContext,

queryContext.getMaterializationProvider(),

queryContext.getFunctionRegistry(),

queryContext.getSession(),

observer,

queryContext.getCatalog(),

queryContext.getSubstitutionProviderFactory(),

queryContext.getConfig(),

queryContext.getScanResult(),

queryContext.getRelMetadataQuerySupplier());

// 解析sqlnode

final SqlNode node = converter.parse(sql);

// sql 处理器配置

final SqlHandlerConfig config = new SqlHandlerConfig(queryContext, converter, observer, null);

// sql 校验以及转换,依赖catalog,实际上是基于了calcite 处理

final ConvertedRelNode convertedRelNode = PrelTransformer.validateAndConvert(config, node);

final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();

final RelNode queryRelNode = convertedRelNode.getConvertedNode();

 

final Rel drel = PrelTransformer.convertToDrel(config, queryRelNode, validatedRowType);

// 物理节点获取

final Pair<Prel, String> convertToPrel = PrelTransformer.convertToPrel(config, drel);

final Prel prel = convertToPrel.getKey();

final String prePhysicaltextPlan = convertToPrel.getValue();

 

assertThat(prePhysicaltextPlan).contains("HashToRandomExchange");

assertThat(prePhysicaltextPlan).contains("UnorderedMuxExchange");

assertThat(prePhysicaltextPlan).contains("Empty");

assertThat(prePhysicaltextPlan).contains("EasyScan");

// 物理操作器

final PhysicalOperator pop = PrelTransformer.convertToPop(config, prel);

// 物理计划

final PhysicalPlan plan = PrelTransformer.convertToPlan(config, pop);

final String postPhysicaltextPlan = plan.unparse(config.getContext().getLpPersistence().getMapper().writer());

 

assertThat(postPhysicaltextPlan).contains("EmptyValues");

assertThat(postPhysicaltextPlan).contains("EasyGroupScan");

assertThat(postPhysicaltextPlan).contains("unordered-mux-exchange");

assertThat(postPhysicaltextPlan).contains("hash-to-random-exchange");

 

 

PhysicalPlanReader pPlanReader = new PhysicalPlanReader(

DEFAULT_SABOT_CONFIG, CLASSPATH_SCAN_RESULT, new LogicalPlanPersistence(DEFAULT_SABOT_CONFIG, CLASSPATH_SCAN_RESULT),

CoordinationProtos.NodeEndpoint.getDefaultInstance(),

DirectProvider.wrap(Mockito.mock(CatalogService.class)), context);

// 执行计划

ExecutionPlan exec = ExecutionPlanCreator

.getExecutionPlan(queryContext, pPlanReader, AbstractMaestroObserver.NOOP, plan,

QueueType.SMALL);

List<PlanFragmentFull> fragments  = exec.getFragments();

 

int scanFrags = 0;

for (PlanFragmentFull fragment : fragments) {

if (new String(fragment.getMajor().getFragmentJson().toByteArray()).contains("easy-sub-scan")) {

scanFrags++;

}

}

assertEquals(2, scanFrags);

 

}

//  用户会话创建,mock 使用

private static UserSession session() {

return UserSession.Builder.newBuilder()

.withSessionOptionManager(

new SessionOptionManagerImpl(getSabotContext().getOptionValidatorListing()),

getSabotContext().getOptionManager())

.withUserProperties(UserProtos.UserProperties.getDefaultInstance())

.withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())

.setSupportComplexTypes(true)

.build();

}

 

}

说明

以上是一个简单的说明,实际上dremio 执行是比较复杂的,使用了不同的优化器,同时还包含了物化处理,查询规则,底层数据存储读取,元数据关联
以上是一个单元测试的,实际执行还包含了具体命令的处理,线程调度。。。。。,但是基于上边可以大致了解执行的处理是值得参考的

参考资料

sabot/kernel/src/test/java/com/dremio/exec/planner/sql/handlers/commands/Limit0LogicalToPhysicalTest.java

举报

相关推荐

0 条评论