0
点赞
收藏
分享

微信扫一扫

Flink-Sink(Kafka、Redis、ES、JDBC)

Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。

Flink-Sink(Kafka、Redis、ES、JDBC)_flink

Flink-Sink(Kafka、Redis、ES、JDBC)_redis_02

 

5.0 File

package com.zhen.flink.api.sink

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._


/**
* @Author FengZhen
* @Date 6/8/22 10:43 PM
* @Description TODO
*/
object FileSink {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

// 0.读取数据
val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
val inputStream = env.readTextFile(filePath)

// 1.先转换成样例数据
val dataStream: DataStream[SensorReading] = inputStream
.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)

dataStream.print()
val outFilePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor_out.txt"
dataStream.writeAsCsv(outFilePath)

val outFilePath1 = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor_out_1.txt"
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path(outFilePath1),
new SimpleStringEncoder[SensorReading]()
).build()
)

env.execute("file sink.")
}

}

5.1 Kafka

package com.zhen.flink.api.sink

import java.util.Properties

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

/**
* @Author FengZhen
* @Date 6/11/22 3:20 PM
* @Description TODO
*/
object KafkaSink {

def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

// 0.读取数据
val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
val inputStream = env.readTextFile(filePath)


//从kafka读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val streamKafka = env.addSource( new FlinkKafkaConsumer[String](
"topic_sensor",
new SimpleStringSchema(),
properties
))

// 1.先转换成样例数据
val dataStream: DataStream[String] = streamKafka
.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
}
)

dataStream.addSink(
new FlinkKafkaProducer[String]("localhost:9092", "topic_flink_kafka_sink", new SimpleStringSchema())
)

//./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_sensor

// ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_flink_kafka_sink

env.execute("kafka sink.")


}

}

 

5.2 Redis

package com.zhen.flink.api.sink

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

/**
* @Author FengZhen
* @Date 6/12/22 8:23 PM
* @Description TODO
*/
object RedisSink {


def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

// 0.读取数据
val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
val inputStream = env.readTextFile(filePath)

// 1.先转换成样例数据
val dataStream: DataStream[SensorReading] = inputStream
.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)

// 定义一个FlinkJedisConfigBase
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.setDatabase(1)
.build()

dataStream.addSink( new RedisSink[SensorReading](conf, new MyRedisMapper))

env.execute("redis sink.")

}

// 定义一个redis mapper
class MyRedisMapper extends RedisMapper[SensorReading]{

// 定义保存数据写入Redis的命令,HSET 表名 key value
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")
}


// 将ID指定位可以
override def getKeyFromData(t: SensorReading): String =
t.id

// 将温度指定为value
override def getValueFromData(t: SensorReading): String =
t.temperature.toString
}

}

 

5.3 Elasticsearch

package com.zhen.flink.api.sink

import java.util

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkBase, ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

/**
* @Author FengZhen
* @Date 6/17/22 3:39 PM
* @Description TODO
*/
object ElasticsearchSinkTest {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

// 0.读取数据
val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
val inputStream = env.readTextFile(filePath)

// 1.先转换成样例数据
val dataStream: DataStream[SensorReading] = inputStream
.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)


// 定义HttpHosts
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))

// 自定义写入ES的EsSinkFunction
val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {

// 包装一个map作为DataSource
val dataSource = new util.HashMap[String, String]()
dataSource.put("id", element.id)
dataSource.put("temperature", element.temperature.toString)
dataSource.put("ts", element.timestamp.toString)

// 创建index request,用于发送http请求
val indexRequest = Requests.indexRequest()
.index("sensor")
.`type`("reading_data")
.source(dataSource)

// 用indexer发送请求
indexer.add(indexRequest)

}
}

dataStream.addSink(
new ElasticsearchSink.Builder[SensorReading](httpHosts, myEsSinkFunc)
.build()
)
env.execute("elasticsearch sink.")
}
}

 

5.4 JDBC自定义sink

package com.zhen.flink.api.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.zhen.flink.api.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._


/**
* @Author FengZhen
* @Date 7/1/22 2:21 PM
* @Description TODO
*/
object JdbcSink {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

// 0.读取数据
val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
val inputStream = env.readTextFile(filePath)

// 1.先转换成样例数据
val dataStream: DataStream[SensorReading] = inputStream
.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
}
)

dataStream.addSink(new MyJdbcSinkFunc())

env.execute("jdbc sink")

}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{

// 定义连接、预编译语句
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _


override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "1234qwer")
insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?,?)")
updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
}

override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {

// 先执行更新操作,查到就更新
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()

//如果更新没有查到数据,那么就插入
if(updateStmt.getUpdateCount == 0){
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}

override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}

}

 

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.zhen.flink</groupId>
<artifactId>flink_learn</artifactId>
<version>1.0-SNAPSHOT</version>

<name>flink_learn Maven</name>


<properties>
<scala_version>2.12</scala_version>
<flink_version>1.13.1</flink_version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala_version}</artifactId>
<version>${flink_version}</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala_version}</artifactId>
<version>${flink_version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala_version}</artifactId>
<version>${flink_version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala_version}</artifactId>
<version>${flink_version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala_version}</artifactId>
<version>${flink_version}</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>

</dependencies>

<build>
<plugins> <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution> <!-- 声明绑定到 maven 的 compile 阶段 -->
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>


</project>

 

 

 

 

 

举报

相关推荐

0 条评论