0
点赞
收藏
分享

微信扫一扫

flink实现protobuf format(超详细)



文章目录

  • ​​01 引言​​
  • ​​02 protobuf概述​​
  • ​​2.1 protobuf优缺点​​
  • ​​2.2 protobuf使用​​
  • ​​03 Flink使用protobuf​​
  • ​​3.1 Flink如何使用protobuf​​
  • ​​3.2 Flink实现protobuf思路​​
  • ​​3.3 可能会出现的问题​​
  • ​​04 文末​​

01 引言

​Flink​​​项目目前是没有实现​​protobuf​​​格式的,目前Flink相关的开发者正在开发,详情可以参考:​​https://github.com/maosuhan/flink/tree/feature/flink-pb-format​​

本文来讲讲如何实现​​Flink​​​的​​protobuf​​。

02 protobuf概述


概念:​​protobuf (protocol buffer) ​​是谷歌内部的混合语言数据标准。通过将结构化的数据进行序列化(串行化),用于通讯协议、数据存储等领域和语言无关、平台无关、可扩展的序列化结构数据格式。


protobuf github 地址:​​https://github.com/protocolbuffers/protobuf​​

flink实现protobuf format(超详细)_java

从上图可以看到protobuf很火,因此很有必要去学习它。

2.1 protobuf优缺点

既然知道​​protbuf​​很火,那它有什么优势呢?简单概括如下:

性能方面

  • 序列化后,数据大小可缩小3倍
  • 序列化速度快
  • 传输速度快

使用方面

  • 使用简单:proto编译器自动进行序列化和反序列化
  • 维护成本低:多平台只需要维护一套对象协议文件,即.proto文件
  • 可扩展性好:不必破坏旧的数据格式,就能对数据结构进行更新
  • 加密性好:http传输内容抓包只能抓到字节数据

使用范围

  • 跨平台、跨语言、可扩展性强

既然有优点,那么是否有缺点呢?当然有的,先来看看怎么使用​​protobuf​​。

2.2 protobuf使用

首先需要下载​​protobuf​​客户端,直接在​​github​​仓库下载即可:​​https://github.com/protocolbuffers/protobuf/releases/tag/v3.20.1​​

flink实现protobuf format(超详细)_flink_02

下载完后,我们看看使用方式,示例命令如下:

./protoc  --java_out=生成java文件的目录 protobuf文件路径

可以看到使用​​protoc​​​命令,需要传两个命令,一个是生成​​java​​​文件的目录(或者​​cpp​​​文件路径),一个是​​proto​​文件的路径。

也就是说,如果使用​​protbuf​​​进行通信,发送方和接收方必须定义一个​​protobuf​​​的彼此公用认同的消息模板(类似于实体类),然后双方都要基于这个消息模板(​​proto​​模板文件)去生成自己语言的模板。即:


  • A端根据proto模板生成c++实体类文件
  • A端写入内容到这个实体类,序列化实体类(二进制),然后发送给B端
  • B端接收到二进制后,先根据proto模板生成对应的java实体类文件,
  • 然后反序列化二进制。


从上述的流程,可以看到消息是通过二进制传输的,速度肯定很快,因为共同定义了​​protobuf​​模板,使得不受平台的限制,做到跨语言通信。

那么,有什么缺陷呢?

就是太依赖protobuf程序了,只有使用这个程序才可以生成对应平台语言的实体类文件目前是无法解决的,protobuf也没有提供工具类去生成(比如:使用工具类把​​proto​​​消息模板文件转换成​​java​​实体类文件)

解决方案就只有把​​protobuf​​​执行程序顺带打进我们的​​jar​​包了。

03 Flink使用protobuf

我们可以把​​protobuf​​​打成一个​​format​​​,其主流思路与​​自定义connector​​思路大致一样,之前写过博客,可以参考:

  • ​​《Flink自定义Connector》​​
  • ​​《flink kafka connector源码解读(超详细)》​​

原理图如下:

flink实现protobuf format(超详细)_大数据_03

3.1 Flink如何使用protobuf

自定义​​format​​的流程不是难点,难点是如何合理优雅的嵌入​​protobuf​​程序到​​format​​?

我们希望连接参数传入proto消息模板,然后直接调用这个​​format​​就可以使用了,如下:

CREATE TABLE table_sink_kafka (
name STRING,
id INT
)
WITH (
'connector' = 'kafka',
'format' = 'protobuf',
'protobuf.class-name' = 'org.apache.flink.pb.proto.SimpleTest',
'protobuf.protobuf-tpl' = 'syntax = "proto2";
package org.apache.flink.pb.other;
option java_package = "org.apache.flink.pb.proto";
option java_multiple_files = false;

message SimpleTest {
optional string name = 1;
optional int32 id = 2;
optional Data data = 3;
}
message Data {
optional int32 uid = 1;
optional string username = 2;
}
',
'scan.startup.mode' = 'latest-offset'
)

其实是可以做到的,说说实现思路。

3.2 Flink实现protobuf思路

思路如下:

  • 首先​​format​​​集成​​protobuf​​​程序进​​jar​​包;
  • 根据传入的模板消息定义,当前目录生成​​proto​​文件;
  • 调用​​protobuf​​​程序去生成对应的​​java​​实体类文件到当前目录;
  • 使用类加载器去加载对应的实体类文件并编译加载进入​​jvm​​;
  • 后续就可以使用这个​​java​​实体类对象了。

主要有几个痛点:

  • ​Java​​​如何使用​​shell​​​命令生成​​java​​文件?
  • 生成的​​java​​​文件如何类加载进​​jvm​​?

下面我提供下这两个工具类:

shell工具类:

/**
* shell工具类
*
* @author : YangLinWei
* @createTime: 2022/5/13 2:09 下午
* @version: 1.0.0
*/
@Slf4j
public class ShellUtil {


/**
* @see #runShellCommandSync(String, String[], Charset, String)
*/
public static int runShellCommandSync(String baseShellDir, String[] cmd,
Charset outputCharset) throws IOException {
return runShellCommandSync(baseShellDir, cmd, outputCharset, null);
}

/**
* 真正运行shell命令
*
* @param baseShellDir 运行命令所在目录(先切换到该目录后再运行命令)
* @param cmd 命令数组
* @param outputCharset 日志输出字符集,一般windows为GBK, linux为utf8
* @param logFilePath 日志输出文件路径, 为空则直接输出到当前应用日志中,否则写入该文件
* @return 进程退出码, 0: 成功, 其他:失败
* @throws IOException 执行异常时抛出
*/
public static int runShellCommandSync(String baseShellDir, String[] cmd,
Charset outputCharset, String logFilePath)
throws IOException {
long startTime = System.currentTimeMillis();
boolean needReadProcessOutLogStreamByHand = false;
log.info("【cli】receive new Command. baseDir: {}, cmd: {}, logFile:{}",
baseShellDir, String.join(" ", cmd), logFilePath);
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.directory(new File(baseShellDir));
initErrorLogHolder(logFilePath, outputCharset);
int exitCode = 0;
try {
if (logFilePath != null) {
ensureFilePathExists(logFilePath);
// String redirectLogInfoAndErrCmd = " > " + logFilePath + " 2>&1 ";
// cmd = mergeTwoArr(cmd, redirectLogInfoAndErrCmd.split("\\s+"));
pb.redirectErrorStream(true);
pb.redirectOutput(new File(logFilePath));
needReadProcessOutLogStreamByHand = false;
}
Process p = pb.start();
if (needReadProcessOutLogStreamByHand) {
readProcessOutLogStream(p, outputCharset);
}
try {
p.waitFor();
} catch (InterruptedException e) {
log.error("进程被中断", e);
setProcessLastError("中断异常:" + e.getMessage());
} finally {
exitCode = p.exitValue();
log.info("【cli】process costTime:{}ms, exitCode:{}",
System.currentTimeMillis() - startTime, exitCode);
}
if (exitCode != 0) {
throw new RuntimeException(
"进程返回异常信息, returnCode:" + exitCode
+ ", lastError:" + getProcessLastError());
}
return exitCode;
} finally {
removeErrorLogHolder();
}
}


/**
* 使用 Runtime.exec() 运行shell
*/
public static int runShellWithRuntime(String baseShellDir,
String[] cmd,
Charset outputCharset) throws IOException {
long startTime = System.currentTimeMillis();
initErrorLogHolder(null, outputCharset);
Process p = Runtime.getRuntime().exec(cmd, null, new File(baseShellDir));
//readProcessOutLogStream(p, outputCharset);
int exitCode;
try {
p.waitFor();
} catch (InterruptedException e) {
log.error("进程被中断", e);
setProcessLastError("中断异常:" + e.getMessage());
} catch (Throwable e) {
log.error("其他异常", e);
setProcessLastError(e.getMessage());
} finally {
exitCode = p.exitValue();
log.info("【cli】process costTime:{}ms, exitCode:{}",
System.currentTimeMillis() - startTime, exitCode);
}
if (exitCode != 0) {
throw new RuntimeException("进程返回异常信息, returnCode:" + exitCode
+ ", lastError:" + getProcessLastError());
}
return exitCode;
}

/**
* 确保文件夹存在
*
* @param filePath 文件路径
* @throws IOException 创建文件夹异常抛出
*/
public static void ensureFilePathExists(String filePath) throws IOException {
File path = new File(filePath);
if (path.exists()) {
return;
}
File p = path.getParentFile();
if (p.mkdirs()) {
log.info("为文件创建目录: {} 成功", p.getPath());
return;
}
log.warn("创建目录:{} 失败", p.getPath());
}

/**
* 合并两个数组数据
*
* @param arrFirst 左边数组
* @param arrAppend 要添加的数组
* @return 合并后的数组
*/
public static String[] mergeTwoArr(String[] arrFirst, String[] arrAppend) {
String[] merged = new String[arrFirst.length + arrAppend.length];
System.arraycopy(arrFirst, 0,
merged, 0, arrFirst.length);
System.arraycopy(arrAppend, 0,
merged, arrFirst.length, arrAppend.length);
return merged;
}

/**
* 删除以某字符结尾的字符
*
* @param originalStr 原始字符
* @param toTrimChar 要检测的字
* @return 裁剪后的字符串
*/
public static String trimEndsWith(String originalStr, char toTrimChar) {
char[] value = originalStr.toCharArray();
int i = value.length - 1;
while (i > 0 && value[i] == toTrimChar) {
i--;
}
return new String(value, 0, i + 1);
}

/**
* 错误日志读取线程池(不设上限)
*/
private static final ExecutorService errReadThreadPool = Executors.newCachedThreadPool(
new NamedThreadFactory("ReadProcessErrOut"));

/**
* 最后一次异常信息
*/
private static final Map<Thread, ProcessErrorLogDescriptor>
lastErrorHolder = new ConcurrentHashMap<>();

/**
* 主动读取进程的标准输出信息日志
*
* @param process 进程实体
* @param outputCharset 日志字符集
* @throws IOException 读取异常时抛出
*/
private static void readProcessOutLogStream(Process process,
Charset outputCharset) throws IOException {
try (BufferedReader stdInput = new BufferedReader(new InputStreamReader(
process.getInputStream(), outputCharset))) {
Thread parentThread = Thread.currentThread();
// 另起一个线程读取错误消息,必须先启该线程
errReadThreadPool.submit(() -> {
try {
try (BufferedReader stdError = new BufferedReader(
new InputStreamReader(process.getErrorStream(), outputCharset))) {
String err;
while ((err = stdError.readLine()) != null) {
log.error("【cli】{}", err);
setProcessLastError(parentThread, err);
}
}
} catch (IOException e) {
log.error("读取进程错误日志输出时发生了异常", e);
setProcessLastError(parentThread, e.getMessage());
}
});
// 外部线程读取标准输出消息
String stdOut;
while ((stdOut = stdInput.readLine()) != null) {
log.info("【cli】{}", stdOut);
}
}
}

/**
* 新建一个进程错误信息容器
*
* @param logFilePath 日志文件路径,如无则为 null
*/
private static void initErrorLogHolder(String logFilePath, Charset outputCharset) {
lastErrorHolder.put(Thread.currentThread(),
new ProcessErrorLogDescriptor(logFilePath, outputCharset));
}

/**
* 移除错误日志监听
*/
private static void removeErrorLogHolder() {
lastErrorHolder.remove(Thread.currentThread());
}

/**
* 获取进程的最后错误信息
* <p>
* 注意: 该方法只会在父线程中调用
*/
private static String getProcessLastError() {
Thread thread = Thread.currentThread();
return lastErrorHolder.get(thread).getLastError();
}

/**
* 设置最后一个错误信息描述
* <p>
* 使用当前线程或自定义
*/
private static void setProcessLastError(String lastError) {
lastErrorHolder.get(Thread.currentThread()).setLastError(lastError);
}

private static void setProcessLastError(Thread thread, String lastError) {
lastErrorHolder.get(thread).setLastError(lastError);
}

/**
* 判断当前系统是否是 windows
*/
public static boolean isWinOs() {
return System.getProperty("os.name").toLowerCase()
.startsWith("win");
}

/**
* 判断当前系统是否是 Mac
*/
public static boolean isMacOs() {
return System.getProperty("os.name").toLowerCase()
.startsWith("mac");
}

public static String getSystemType() {
return System.getProperty("os.name");
}

/**
* 进程错误信息描述封装类
*/
private static class ProcessErrorLogDescriptor {

/**
* 错误信息记录文件
*/
private String logFile;

/**
* 最后一行错误信息
*/
private String lastError;
private Charset charset;

ProcessErrorLogDescriptor(String logFile, Charset outputCharset) {
this.logFile = logFile;
charset = outputCharset;
}

String getLastError() {
if (lastError != null) {
return lastError;
}
try {
if (logFile == null) {
return null;
}
List<String> lines = FileUtils.readLines(
new File(logFile), charset);
StringBuilder sb = new StringBuilder();
for (int i = lines.size() - 1; i >= 0; i--) {
sb.insert(0, lines.get(i) + "\n");
if (sb.length() > 200) {
break;
}
}
return sb.toString();
} catch (Exception e) {
log.error("【cli】读取最后一次错误信息失败", e);
}
return null;
}

void setLastError(String err) {
if (lastError == null) {
lastError = err;
return;
}
lastError = lastError + "\n" + err;
if (lastError.length() > 200) {
lastError = lastError.substring(lastError.length() - 200);
}
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException {
String messageClassName = "org.apache.flink.pb.proto.SimpleTestOuterClass";
String meesagePbTpl = "syntax = \"proto2\";\n" +
"package org.apache.flink.pb.other;\n" +
"option java_package = \"org.apache.flink.pb.proto\";\n" +
"option java_multiple_files = false;\n" +
"\n" +
"message SimpleTest {\n" +
" optional int32 a = 1;\n" +
" optional int64 b = 2;\n" +
" optional bool c = 3;\n" +
" optional float d = 4;\n" +
" optional double e = 5;\n" +
" optional string f = 6;\n" +
" optional bytes g = 7;\n" +
" optional Corpus h = 8;\n" +
"\n" +
" enum Corpus {\n" +
" UNIVERSAL = 0;\n" +
" WEB = 1;\n" +
" IMAGES = 2;\n" +
" LOCAL = 3;\n" +
" NEWS = 4;\n" +
" PRODUCTS = 5;\n" +
" VIDEO = 7;\n" +
" }\n" +
"\n" +
"}\n" +
"\n" +
"\n";
String resPath = ShellUtil.class.getClassLoader().getResource("").getPath();
String baseDir = resPath + "/temp";
String protoFileName = StrUtil.humpToUnderline(StrUtil.getSplitLast(messageClassName, "\\.")) + ".proto";
String fileOutPath = String.format("%s/%s"
, baseDir
, StringUtils.join(StrUtil.splitExpectLast(messageClassName, "\\."), "/"));
String protoFilePath = String.format("%s/%s", fileOutPath, protoFileName);


FileUtils.write(new File(protoFilePath), meesagePbTpl, Charset.forName("UTF-8"));
String[] protocCmd = new String[]{
"protoc",
"--java_out=" + baseDir,
"--proto_path=" + fileOutPath,
protoFileName
};
String exePath = resPath + "/protobuf/linux/bin";
if (isMacOs()) {
exePath = resPath + "/protobuf/osx/bin";
}
//InputStream protocIS = ShellUtil.class.getClassLoader().getResourceAsStream("/protobuf/osx/bin");
int exitCode = runShellCommandSync(exePath, protocCmd, Charset.forName("UTF-8"));

Class<?> pbEntity = JarLoader.compile(messageClassName, FileUtils.readFileToString(
new File(fileOutPath + "/" + StrUtil.getSplitLast(messageClassName, "\\.") + ".java"), Charset.forName("UTF-8"))
);

System.out.println("-->" + pbEntity);

}
}

类加载器代码:

/**
* 类加载器
*
* @author : YangLinWei
* @createTime: 2022/5/13 3:33 下午
* @version: 1.0.0
*/
public class JarLoader extends URLClassLoader {
public JarLoader(String[] paths) {
this(paths, JarLoader.class.getClassLoader());
}

public JarLoader(String[] paths, ClassLoader parent) {
super(getURLs(paths), parent);
}

private static URL[] getURLs(String[] paths) {
Validate.isTrue(null != paths && 0 != paths.length,
"jar包路径不能为空.");

List<String> dirs = new ArrayList<String>();
for (String path : paths) {
dirs.add(path);
JarLoader.collectDirs(path, dirs);
}

List<URL> urls = new ArrayList<URL>();
for (String path : dirs) {
urls.addAll(doGetURLs(path));
}

return urls.toArray(new URL[0]);
}

private static void collectDirs(String path, List<String> collector) {
if (null == path || StringUtils.isBlank(path)) {
return;
}

File current = new File(path);
if (!current.exists() || !current.isDirectory()) {
return;
}

for (File child : current.listFiles()) {
if (!child.isDirectory()) {
continue;
}

collector.add(child.getAbsolutePath());
collectDirs(child.getAbsolutePath(), collector);
}
}

private static List<URL> doGetURLs(final String path) {
Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空.");

File jarPath = new File(path);

Validate.isTrue(jarPath.exists() && jarPath.isDirectory(),
"jar包路径必须存在且为目录.");

/* set filter */
FileFilter jarFilter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(".jar");
}
};

/* iterate all jar */
File[] allJars = new File(path).listFiles(jarFilter);
List<URL> jarURLs = new ArrayList<URL>(allJars.length);

for (int i = 0; i < allJars.length; i++) {
try {
jarURLs.add(allJars[i].toURI().toURL());
} catch (Exception e) {
throw new RuntimeException("系统加载jar包出错", e);
}
}

return jarURLs;
}

/**
* 装载字符串成为java可执行文件
*
* @param className className
* @param javaCodes javaCodes
* @return Class
*/
public static Class<?> compile(String className, String javaCodes) {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
StrSrcJavaObject srcObject = new StrSrcJavaObject(className, javaCodes);
Iterable<? extends JavaFileObject> fileObjects = Arrays.asList(srcObject);
String flag = "-d";
String outDir = "";
try {
File classPath = new File(Thread.currentThread().getContextClassLoader().getResource("").toURI());
outDir = classPath.getAbsolutePath() + File.separator;
} catch (URISyntaxException e1) {
e1.printStackTrace();
}
Iterable<String> options = Arrays.asList(flag, outDir);
JavaCompiler.CompilationTask task = compiler.getTask(null, fileManager, null, options, null, fileObjects);
boolean result = task.call();
if (result == true) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return null;
}

private static class StrSrcJavaObject extends SimpleJavaFileObject {
private String content;

public StrSrcJavaObject(String name, String content) {
super(URI.create("string:///" + name.replace('.', '/') + Kind.SOURCE.extension), Kind.SOURCE);
this.content = content;
}

public CharSequence getCharContent(boolean ignoreEncodingErrors) {
return content;
}
}

}

ok,以上两个工具类解决了我们以上痛点了。

3.3 可能会出现的问题

当我们正式部署上服务器可能会出现一些问题:

  • 为何读取不了jar包里面的程序,特别是​​Flink on Yarn​​的模式;
  • 为何加载后的类会丢失(​​Flink​​​也有自己的类加载器​​FlinkUserClassLoader​​)

这些问题大家可以私聊我,本人已经实现了​​Flink​​​使用​​protobuf​​ 格式的插件了,已上线使用。

04 文末

本文主要讲解了​​Protobuf​​​的一些概念,以及如何在​​Flink​​​里使用​​Protobuf​​,希望能帮助到大家,谢谢大家的阅读!



举报

相关推荐

0 条评论