0
点赞
收藏
分享

微信扫一扫

Flink DataGen 模拟数据源

Flink DataGen   模拟数据源_flink

Flink DataGen   模拟数据源_flink_02

一、前情

开发完Flink作业,压测的方式很简单,先在kafka中积压数据,之后开启Flink任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。

数据可以是自己造的模拟数据,也可以是生产中的部分数据。造测试数据的工具:DataFactory、datafaker 、DBMonster、Data-Processer 、Nexmark、Jmeter等。

Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

二、DataStream的DataGen

2.1 概述

DataGeneratorSource抽象了数据生成器,可以轻松的生成数据。

/**
* A data generator source that abstract data generator. It can be used to easy startup/test for
* streaming job and performance testing. It is stateful, re-scalable, possibly in parallel.
*/
@Experimental
public class DataGeneratorSource<T> extends RichParallelSourceFunction<T>
implements CheckpointedFunction {
。。。

public DataGeneratorSource(DataGenerator<T> generator) {
this(generator, Long.MAX_VALUE, null);
}

/**
* Creates a source that emits records by {@link DataGenerator}.
*
* @param generator data generator.
* @param rowsPerSecond Control the emit rate.
* @param numberOfRows Total number of rows to output.
*/
public DataGeneratorSource(
DataGenerator<T> generator, long rowsPerSecond, @Nullable Long numberOfRows) {
this.generator = generator;
this.rowsPerSecond = rowsPerSecond;
this.numberOfRows = numberOfRows;
}
  • ​DataGenerator​​​是一个数据生成器接口,Flink内部提供了​​SequenceGenerator​​和​​RandomGenerator​​,分别用来生成序列数据和随机数据。
  • ​rowsPerSecond​​​每秒生成的行数,默认是​​Long.MAX_VALUE​​。

2.2 代码实现

package com.duo.app2.source
import org.apache.commons.math3.random.RandomDataGenerator
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.functions.source.datagen.{DataGeneratorSource, RandomGenerator, SequenceGenerator}
import org.apache.flink.streaming.api.scala._

object DataStreamDataGenDemo {
def main(args: Array[String]): Unit = {

val configuration = new Configuration()

val env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(configuration)

env.setParallelism(1)
env.disableOperatorChaining()

// 1.0 生成随机数据RandomGenerator
val orderInfods = env.addSource(
new DataGeneratorSource[OrderInfo](
new RandomGenerator[OrderInfo]() {
override def next() = {
OrderInfo(
random.nextInt(1, 100000),
random.nextLong(1, 100000),
random.nextUniform(1, 1000),
System.currentTimeMillis()
)
}
}
))
// 1.0 生成序列数据SequenceGenerator
val userInfods = env.addSource(
new DataGeneratorSource[UserInfo](
new SequenceGenerator[UserInfo](1,1000000) {
val random = new RandomDataGenerator()
override def next() = {
UserInfo(
// poll拿出不放回
// peek拿出放回
valuesToEmit.poll().intValue(),
valuesToEmit.poll().longValue(),
random.nextInt(1, 100),
random.nextInt(0, 1)
)
}
}
))
orderInfods.print("orderinfo>>>>>>>>>>>>>>>>")
userInfods.print("userinfo>>>")
env.execute()

}
}

case class OrderInfo(
id: Int,
user_id: Long,
total_amount: Double,
create_time: Long
)

case class UserInfo(
id: Int,
user_id: Long,
age: Int,
sex: Int
)

结果:

userinfo>>>> UserInfo(771879,771880,63,0)
userinfo>>>> UserInfo(771881,771882,98,1)
userinfo>>>> UserInfo(771883,771884,93,1)
userinfo>>>> UserInfo(771885,771886,1,1)
userinfo>>>> UserInfo(771887,771888,68,0)
userinfo>>>> UserInfo(771889,771890,91,0)
orderinfo>>>>>>>>>>>>>>>>> OrderInfo(57313,75426,375.0372852993927,1652613191308)
orderinfo>>>>>>>>>>>>>>>>> OrderInfo(47265,23144,713.7214041887538,1652613191308)
orderinfo>>>>>>>>>>>>>>>>> OrderInfo(62312,21297,425.7540187946541,1652613191308)
orderinfo>>>>>>>>>>>>>>>>> OrderInfo(17968,68066,376.2214432261888,1652613191308)
orderinfo>>>>>>>>>>>>>>>>> OrderInfo(85790,51534,584.077634767886,1652613191308)
orderinfo>>>>>>>>>>>>>>>>> OrderInfo(38109,97159,519.6732785767342,1652613191308)

Flink DataGen   模拟数据源_apache_03

三、SQL的DataGenerator

3.1 概述

官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/datagen/

默认情况下,DataGen将为每列创建一个具有随机值的无界行数。

对于可变大小的类型char/varchar/string/array/map/multiset,可以指定长度。此外,可以指定总行数,从而形成一个有界表。

Connector Options

Option

Required

Default

Type

Description

connector

required

(none)

String

指定要使用的连接器,这里应该是“datagen”

rows-per-second

optional

10000

Long

每秒发送数据的速度

number-of-rows

optional

(none)

Long

要发送的行总数。默认情况下,该表是无界的。

fields.#.kind

optional

random

String

'sequence' or 'random'.

fields.#.min

optional

(Minimum value of type)

(Type of field)

随机生成器的最小值,适用于数字类型。

fields.#.max

optional

(Maximum value of type)

(Type of field)

随机生成器的最大值,适用于数字类型。

fields.#.max-past

optional

0

Duration

Maximum past of timestamp random generator, only works for timestamp types.

fields.#.length

optional

100

Integer

用于生成char/varchar/string/array/map/multiset类型的集合的大小或长度。

fields.#.start

optional

(none)

(Type of field)

序列生成器的起始值。

fields.#.end

optional

(none)

(Type of field)

序列生成器的最终值。

3.2 代码实现

package com.duo.app2.source
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object SQLDataGenDemo {
def main(args: Array[String]): Unit = {

val configuration = new Configuration()

val env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(configuration)

val settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env)

env.setParallelism(1)
env.disableOperatorChaining()


/*
fields.id.kind: 字段id的类型
fields.id.start:字段id的开始值
fields.id.end:字段id的结束值
*/
val orderSql =
s"""
|CREATE TABLE Orders (
| id INT,
| user_id BIGINT,
| total_amount DOUBLE,
| create_time TIMESTAMP(3),
| WATERMARK FOR create_time AS create_time
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second'='20000',
| 'fields.id.kind'='sequence' ,
| 'fields.id.start'='1',
| 'fields.id.end'='1000000',
| 'fields.user_id.kind'='random',
| 'fields.user_id.min'='1',
| 'fields.user_id.max'='1000000',
| 'fields.total_amount.kind'='random',
| 'fields.total_amount.min'='1',
| 'fields.total_amount.max'='1000000'
|)
|""".stripMargin

tableEnv.executeSql(orderSql)
tableEnv.executeSql( "select * from Orders").print()

}
}

结果:

| +I |       19990 |               788587 |             295844.47814000136 | 2022-05-15 11:25:04.999 |
| +I | 19991 | 574737 | 183288.8604379419 | 2022-05-15 11:25:04.999 |
| +I | 19992 | 518123 | 977228.0087449611 | 2022-05-15 11:25:04.999 |
| +I | 19993 | 305440 | 160450.1554539004 | 2022-05-15 11:25:04.999 |
| +I | 19994 | 701281 | 646258.6008159731 | 2022-05-15 11:25:04.999 |
| +I | 19995 | 707352 | 697068.6504361494 | 2022-05-15 11:25:04.999 |

Flink DataGen   模拟数据源_flink_04



举报

相关推荐

0 条评论