0
点赞
收藏
分享

微信扫一扫

dremio UserServer 简单说明

Ewall_熊猫 2022-12-31 阅读 54

UserServer 目的是对于UserRPCServer 进行生命周期管理(基于netty 开发的)主要是处理非web 请求(实际上就是直接链接的服务)
从官方源码上UserServer 的创建只能在协调节点(当然候选节点也是可以执行rpc 服务的)

参考创建

DACDaemonModule

 

if (isCoordinator) {

registry.bindSelf(

new UserServer(

config,

registry.provider(java.util.concurrent.ExecutorService.class),

registry.provider(BufferAllocator.class),

registry.provider(UserService.class),

registry.provider(NodeEndpoint.class),

registry.provider(UserWorker.class),

dacConfig.autoPort,

bootstrapRegistry.lookup(Tracer.class),

registry.provider(OptionValidatorListing.class)

)

);

}

UserRPCServer的启动

因为实现了service 接口,包含了start,具体对于UserRPCServer 的启动就是在此处的
UserServer

 

public void start() throws Exception {

final SabotConfig sabotConfig = config.getSabotConfig();

allocator = bufferAllocator.get()

.newChildAllocator(

"rpc:user",

sabotConfig.getLong("dremio.exec.rpc.user.server.memory.reservation"),

sabotConfig.getLong("dremio.exec.rpc.user.server.memory.maximum"));

// 具有netty 的服务启动

final EventLoopGroup eventLoopGroup = TransportCheck

.createEventLoopGroup(sabotConfig.getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-");

eventLoopCloseable = new EventLoopCloseable(eventLoopGroup);

 

server = newUserRPCServer(eventLoopGroup);

 

Metrics.newGauge("rpc.user.current", allocator::getAllocatedMemory);

Metrics.newGauge("rpc.user.peak", allocator::getPeakMemoryAllocation);

// 默认端口为31010

int initialPort = sabotConfig.getInt(DremioClient.INITIAL_USER_PORT);

if(allowPortHunting){

initialPort += 333;

}

 

port = server.bind(initialPort, allowPortHunting);

}

 

 

UserRPCServer 类图

dremio UserServer 简单说明_java

 

 

rpc 内部任务的实行依赖了UserWorker,具体实现是在ForemenWorkManager 中定义的,也比较符合drill 的特点,foreman 是一个long running 任务
主要支持的协议处理
UserRPCServer 中的WorkIngestorImpl 类

 

// 会包含不同的rpc 请求类型

@Override

public void feedWork(UserClientConnectionImpl connection, int rpcType, byte[] pBody, ByteBuf dBody,

ResponseSender responseSender) throws  RpcException {

final UserWorker worker = this.worker.get();

final TerminationListenerRegistry registry = connection;

switch (rpcType) {

 

case RpcType.RUN_QUERY_VALUE: {

logger.debug("Received query to run.  Returning query handle.");

final RunQuery query = parse(pBody, RunQuery.PARSER, RunQuery.class);

UserRequest request = new UserRequest(RpcType.RUN_QUERY, query);

final ExternalId externalId = ExternalIdHelper.generateExternalId();

// UserWorker 请求会直接到ForemenWorkManager的submitWork,实际执行需要依赖CommandPool,可以参考以前写的简单的说明

worker.submitWork(externalId, connection.getSession(), new UserConnectionResponseHandler(connection), request, registry);

responseSender.send(new Response(RpcType.QUERY_HANDLE, ExternalIdHelper.toQueryId(externalId)));

break;

}

 

case RpcType.CANCEL_QUERY_VALUE: {

final QueryId queryId = parse(pBody, QueryId.PARSER, QueryId.class);

final Ack ack = worker.cancelQuery(ExternalIdHelper.toExternal(queryId),

connection.getSession().getCredentials().getUserName());

responseSender.send(new Response(RpcType.ACK, ack));

break;

}

 

case RpcType.RESUME_PAUSED_QUERY_VALUE: {

final QueryId queryId = parse(pBody, QueryId.PARSER, QueryId.class);

final Ack ack = worker.resumeQuery(ExternalIdHelper.toExternal(queryId));

responseSender.send(new Response(RpcType.ACK, ack));

break;

}

 

case RpcType.GET_CATALOGS_VALUE: {

final GetCatalogsReq req = parse(pBody, GetCatalogsReq.PARSER, GetCatalogsReq.class);

UserRequest request = new UserRequest(RpcType.GET_CATALOGS, req);

worker.submitWork(connection.getSession(), new MetadataProvider.CatalogsHandler(responseSender), request, registry);

break;

}

 

case RpcType.GET_SCHEMAS_VALUE: {

final GetSchemasReq req = parse(pBody, GetSchemasReq.PARSER, GetSchemasReq.class);

UserRequest request = new UserRequest(RpcType.GET_SCHEMAS, req);

worker.submitWork(connection.getSession(), new MetadataProvider.SchemasHandler(responseSender), request, registry);

break;

}

 

case RpcType.GET_TABLES_VALUE: {

final GetTablesReq req = parse(pBody, GetTablesReq.PARSER, GetTablesReq.class);

UserRequest request = new UserRequest(RpcType.GET_TABLES, req);

worker.submitWork(connection.getSession(), new MetadataProvider.TablesHandler(responseSender), request, registry);

break;

}

 

case RpcType.GET_COLUMNS_VALUE: {

final GetColumnsReq req = parse(pBody, GetColumnsReq.PARSER, GetColumnsReq.class);

UserRequest request = new UserRequest(RpcType.GET_COLUMNS, req);

worker.submitWork(connection.getSession(), new MetadataProvider.ColumnsHandler(responseSender), request, registry);

break;

}

 

case RpcType.CREATE_PREPARED_STATEMENT_VALUE: {

final CreatePreparedStatementReq req = parse(pBody, CreatePreparedStatementReq.PARSER, CreatePreparedStatementReq.class);

UserRequest request = new UserRequest(RpcType.CREATE_PREPARED_STATEMENT, req);

worker.submitWork(connection.getSession(), new PreparedStatementProvider.PreparedStatementHandler(responseSender), request, registry);

break;

}

 

case RpcType.CREATE_PREPARED_STATEMENT_ARROW_VALUE: {

final CreatePreparedStatementArrowReq req = parse(pBody, CreatePreparedStatementArrowReq.PARSER, CreatePreparedStatementArrowReq.class);

UserRequest request = new UserRequest(RpcType.CREATE_PREPARED_STATEMENT_ARROW, req);

worker.submitWork(connection.getSession(), new PreparedStatementProvider.PreparedStatementArrowHandler(responseSender), request, registry);

break;

}

 

case RpcType.GET_SERVER_META_VALUE: {

final GetServerMetaReq req = parse(pBody, GetServerMetaReq.PARSER, GetServerMetaReq.class);

UserRequest request = new UserRequest(RpcType.GET_SERVER_META, req);

worker.submitWork(connection.getSession(), new ServerMetaProvider.ServerMetaHandler(responseSender), request, registry);

break;

}

 

default:

throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type.  Type was %d.", rpcType));

}

}

 

}

执行调用参考

可以看看实际rpc 的执行情况

ts=2022-12-29 07:12:15;thread_name=USER-rpc-event-queue;id=95;is_daemon=true;priority=10;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2

@com.dremio.sabot.rpc.user.UserRPCServer.handle()

at com.dremio.exec.rpc.RpcBus$RequestEvent.run(RpcBus.java:475)

at com.dremio.common.SerializedExecutor$RunnableProcessor.run(SerializedExecutor.java:96)

at com.dremio.exec.rpc.RpcBus$SameExecutor.execute(RpcBus.java:341)

at com.dremio.common.SerializedExecutor.execute(SerializedExecutor.java:129)

at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:375)

at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:346)

at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)

at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)

at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)

at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

at java.lang.Thread.run(Thread.java:750)

 

ts=2022-12-29 07:12:15;thread_name=out-of-band-observer;id=ae9;is_daemon=true;priority=10;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2

@com.dremio.sabot.rpc.user.UserRPCServer.access$000()

at com.dremio.sabot.rpc.user.UserRPCServer$UserClientConnectionImpl.sendResult(UserRPCServer.java:575)

at com.dremio.exec.work.protector.UserConnectionResponseHandler.completed(UserConnectionResponseHandler.java:36)

at com.dremio.service.jobs.LocalJobsService$QueryListener.execCompletion(LocalJobsService.java:1731)

at com.dremio.exec.planner.observer.OutOfBandQueryObserver$1.run(OutOfBandQueryObserver.java:50)

at com.dremio.common.SerializedExecutor$RunnableProcessor.run(SerializedExecutor.java:96)

at com.dremio.context.RequestContext.run(RequestContext.java:96)

at com.dremio.common.concurrent.ContextMigratingExecutorService.lambda$decorate$3(ContextMigratingExecutorService.java:199)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:750)

 

ts=2022-12-29 07:12:15;thread_name=USER-rpc-event-queue;id=95;is_daemon=true;priority=10;TCCL=sun.misc.Launcher$AppClassLoader@18b4aac2

@com.dremio.sabot.rpc.user.UserRPCServer.getResponseDefaultInstance()

at com.dremio.exec.rpc.RpcBus$ResponseEvent.run(RpcBus.java:518)

at com.dremio.common.SerializedExecutor$RunnableProcessor.run(SerializedExecutor.java:96)

at com.dremio.exec.rpc.RpcBus$SameExecutor.execute(RpcBus.java:341)

at com.dremio.common.SerializedExecutor.execute(SerializedExecutor.java:129)

at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:380)

at com.dremio.exec.rpc.RpcBus$InboundHandler.decode(RpcBus.java:346)

at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)

at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)

at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)

at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)

at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

at java.lang.Thread.run(Thread.java:750)

说明

dremio 的默认jdbc 请求处理是基于UserServer的包装处理的,实际是UserRPCServer ,内部实现基于了netty进行网络处理
对于想了解实际客户端执行的可以参考,同时建议深入再看看源码

参考资料

sabot/kernel/src/main/java/com/dremio/sabot/rpc/user/UserServer.java
services/base-rpc/src/main/java/com/dremio/exec/rpc/BasicServer.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
sabot/kernel/src/test/java/com/dremio/exec/server/SabotNode.java
sabot/kernel/src/test/java/com/dremio/sabot/rpc/user/TestUserRpcServer.java
sabot/kernel/src/main/java/com/dremio/exec/work/protector/UserWorker.java
sabot/kernel/src/main/java/com/dremio/sabot/rpc/user/WorkIngestor.java
sabot/kernel/src/main/java/com/dremio/exec/work/protector/ForemenWorkManager.java

举报

相关推荐

0 条评论