文章目录
- 01 引言
- 02 源码分析
- 2.1 API 入口
- 2.1.1 校验接口
- 2.1.2 提交预览接口
- 03 文末
01 引言
Ververica Platform官网地址:https://docs.ververica.com/
Ververica Platform是创建Apache Flink的 Ververica公司(原名:dataArtisans)研发的一套实时计算平台,它的简介如下:
What is Ververica Platform?
Companies running the largest stream processing deployments in the world have adopted Apache Flink® because of its powerful model for stateful stream processing. Stateful stream processing enables companies to derive insight and to take action on data at the moment it’s generated ― when it’s the most valuable.
Ververica Platform is purpose-built for stateful stream processing architectures and makes operating these powerful systems easier than ever before by offering an entirely new experience for developing, deploying, and managing stream processing applications. It’s our mission at Ververica to ensure that developers invest their time on their core business objectives, not on maintenance and infrastructure.
当点击了如下图的Run按钮后,Ververica Platform后台做了什么,本文来阅读下:
02 源码分析
2.1 API 入口
从上图可以得知,当点击 “Run” 之后,可以看到调用了2个REST Api
,具体如下:
校验接口:
curl 'http://127.0.0.1:31158/sql/v1beta1/namespaces/default/sqlscripts:validate' \
-H 'Accept: application/json, text/plain, */*' \
-H 'Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7' \
-H 'Connection: keep-alive' \
-H 'Content-Type: application/json' \
-H 'Origin: http://127.0.0.1' \
-H 'Referer: http://127.0.0.1:31158/app/' \
-H 'User-Agent: Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Mobile Safari/537.36' \
-H 'X-Requested-With: XMLHttpRequest' \
--data-raw '{"statement":"SELECT * FROM pepole"}' \
--compressed \
提交预览接口:
curl 'http://127.0.0.1:31158/sql/v1beta1/namespaces/default/sqlscripts:submit-preview' \
-H 'Accept: application/json, text/plain, */*' \
-H 'Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7' \
-H 'Connection: keep-alive' \
-H 'Content-Type: application/json' \
-H 'Origin: http://127.0.0.1:31158' \
-H 'Referer: http://127.0.0.1:31158/app/' \
-H 'User-Agent: Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Mobile Safari/537.36' \
-H 'X-Requested-With: XMLHttpRequest' \
--data-raw '{"statement":"SELECT * FROM pepole"}' \
--compressed \
2.1.1 校验接口
API:/sql/v1beta1/namespaces/default/sqlscripts:validate
接口层代码如下(注释已写入代码里面):
/**
* 校验
*
* @param ns 命名空间
* @param statement 校验sql
* @param catalog 目录
* @param database 数据库
* @return 校验结果
*/
@PostMapping({"sqlscripts:validate"})
@PreAuthorize("isNamespaceEditor(#ns)")
public CompletableFuture<ValidateStatementResponse> validateStatement(@PathVariable NamespaceName ns, @RequestBody Statement statement, @RequestParam(required = false) String catalog, @RequestParam(required = false) String database) {
/*** 与校验SQL相关的内容(上下文,包括目录、数据库) **/
StatementWithContext statementWithContext = StatementWithContext.newBuilder().setStatement(statement.getStatement()).setCatalog((catalog != null) ? catalog : "").setDatabase((database != null) ? database : "").build();
/*** 构造校验请求(含命令空间) **/
ValidateStatementRequest request = ValidateStatementRequest.newBuilder().setParent(ns.getName()).setStatement(statementWithContext).build();
/*** 异步校验 **/
CompletableFuture<ValidateStatementResponse> future = new CompletableFuture<>();
SimpleUnaryStreamObserver<ValidateStatementResponse> streamObserver = new SimpleUnaryStreamObserver(future);
this.sqlService.validateStatement(request, (StreamObserver) streamObserver);
return future;
}
可以看到,上述的核心校验语句在:com.ververica.platform.sql.service.SqlService#validateStatement方法,代码如下:
/**
* 校验SQL
*
* @param request 请求内容
* @param responseObserver 校验结果(异步)
*/
public void validateStatement(ValidateStatementRequest request, StreamObserver<ValidateStatementResponse> responseObserver) {
ExecutionContext execContext;
/*** 根据命名空间名称获取命名空间 **/
NamespaceName namespace = NamespaceName.parse(request.getParent());
try {
/*** 根据命名空间、目录、数据库获取执行上下文 **/
execContext = this.flinkCatalogProvider.getExecutionContext(namespace, request
.getStatement().getCatalog(), request.getStatement().getDatabase());
} catch (CatalogNotExistException e) {
responseObserver.onError((Throwable) Status.NOT_FOUND
.withDescription(
String.format("Catalog %s not found", new Object[]{request.getStatement().getCatalog()
})).asException());
return;
} catch (DatabaseNotExistException e) {
responseObserver.onError((Throwable) Status.NOT_FOUND
.withDescription(
String.format("Database %s not found", new Object[]{request.getStatement().getDatabase()
})).asException());
return;
}
/*** 校验SQL **/
String statement = request.getStatement().getStatement();
ValidateStatementResponse response = this.sqlScriptValidator.validate(statement, execContext).toValidateStatementResponse();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
可以vvp进一步封装了校验的代码,代码在:com.ververica.platform.sql.service.validation.SqlScriptValidationServiceImpl#validate方法,代码如下:
/**
* 校验SQL
*
* @param sqlScript SQL脚本
* @param execContext 执行上下文
* @return 校验结果
*/
public ValidationResultWithOperations validate(String sqlScript, ExecutionContext execContext) {
/*** TableEnvironment 类加载器 **/
TableEnvClassLoader tEnvClassLoader = TableEnvClassLoader.create();
/*** 根据命名空间 + TableEnvironment 类加载器获取VvpTableEnvironment**/
VvpTableEnvironment tEnv = this.tableEnvProvider.getTableEnvironment(execContext.getNamespace(), tEnvClassLoader);
execContext.configureEnvironment((TableEnvironment) tEnv);
/*** 校验 **/
return validate(execContext.getNamespace(), sqlScript, tEnv, true);
}
/**
* 校验SQL
*
* @param namespace 命名空间
* @param sqlScript SQL脚本
* @param tEnv VvpTableEnvironment
* @param closeUdfLoaders 是否关闭UDF类加载器
* @return 校验结果
*/
private ValidationResultWithOperations validate(NamespaceName namespace, String sqlScript, VvpTableEnvironment tEnv, boolean closeUdfLoaders) {
Objects.requireNonNull(sqlScript);
/*** 校验SQL**/
ValidationResultWithOperations result = (ValidationResultWithOperations) ClassLoaderWrapper
.execute(
new SqlScriptValidator(sqlScript, namespace, tEnv),
(ClassLoader) tEnv.getTableEnvClassLoader()
);
if (closeUdfLoaders) {
tEnv.getTableEnvClassLoader().close();
}
return result;
}
从代码可以得知,校验语句是: ValidationResultWithOperations result = (ValidationResultWithOperations) ClassLoaderWrapper.execute( new SqlScriptValidator(sqlScript, namespace, tEnv), (ClassLoader) tEnv.getTableEnvClassLoader() );方法。
其校验实现在com.ververica.platform.sql.service.validation.SqlScriptValidationServiceImpl.SqlScriptValidator#get方法,代码如下:
从上图可以得知,解析方式有两种,当然采取的是VvpSqlParser
的解析方式,解析的代码如下:
/**
* 解析SQL
*
* @param statement SQL
* @return 解析结果
*/
public List<Operation> parse(String statement) {
ValidatorCache validatorCache = new ValidatorCache(this.validatorSupplier);
List<SqlNode> nodes = parseNodes(statement, validatorCache);
return (List<Operation>) nodes.stream()
.map(sqlNode -> (Operation) SqlToOperationConverter.convert(validatorCache.getValidator(), this.catalogManager, sqlNode).orElseThrow(()))
.peek(operation -> processIntermediateOperation(validatorCache, operation))
.collect(Collectors.toUnmodifiableList());
}
/**
* 解析节点
*
* @param statement SQL
* @param validatorCache 解析器缓存
* @return 解析结果
*/
private List<SqlNode> parseNodes(String statement, ValidatorCache validatorCache) {
SqlParser sqlParser = SqlParser.create(statement, validatorCache.getValidator().config().getParserConfig());
try {
return sqlParser.parseStmtList().getList();
} catch (SqlParseException e) {
throw new SqlParserException(String.format("Could not parse SQL: %s. Statement was: %s", new Object[]{e
.getMessage(), statement}), e);
}
}
看看这个类,可以知道其实它是继承了Flink
的ParserImpl
实现类,而ParserImpl
是基于Calcite
去解析SQL
的,关于Calcite
的知识,大家可以参考之前写过一篇的博客《Apache Calcite入门》
2.1.2 提交预览接口
API:/sql/v1beta1/namespaces/default/sqlscripts:submit-preview
由于篇幅原因,下一篇博客继续讲解。
03 文末
本文主要分析了Vvp
的校验源码,希望能对大家有所帮助,本文完!