0
点赞
收藏
分享

微信扫一扫

flink 利用 redis 去重

Sophia的玲珑阁 2023-01-11 阅读 68


flink 利用 redis 去重

对于每一条待处理的 record,根据算法计算其唯一key:

    key = getMessageKey(record);

如果 key 不存在,设置key值=0和超时. redis 保证 setnx 指令是原子的:

    result = cmds.set(key, "0", nxShortTimeArgs);

如果 key 存在, result 返回 nil。如果返回 OK 说明线程获得了key,继续操作:

    onCollectKey(key, value, out);

上面的过程记录下来,就是下面这个类,使用时需要继承并实现几个重载函数:

/**
* RedisStatefulMapFunction.java
*
* https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFlatMapFunction.html
* https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeHint.html

*
*
* create: 2020-11-19
* update: 2020-12-15
*/
package com.cheungmine.flink.common;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pepstack.guru.LogUtil;
import com.pepstack.guru.RedisClusterIO;

import java.time.Duration;
import java.util.Properties;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.SetArgs;


public class RedisStatefulMapFunction<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
private static final Logger logger = LoggerFactory.getLogger(RedisStatefulMapFunction.class);

private transient RedisClusterClient redisClient;
private transient StatefulRedisClusterConnection<String, String> connection;

private static final String[] propKeys = new String[] {
"redis.cluser.uris"
,"redis.timeout.millis"
,"redis.expire.seconds"
};

private final String[] redisClusterUris;

private final int redisTimeoutMillis;
private final long redisExpireSeconds;

private final SetArgs nxShortTimeArgs;
private final SetArgs xxExpireTimeArgs;


protected RedisStatefulMapFunction(Properties kvprops) throws Exception {
// hard code here:
redisClusterUris = new String[] {
"redis://test@192.168.19.111:7001?timeout=30s",
"redis://test@192.168.19.111:7002?timeout=30s",
"redis://test@192.168.19.111:7003?timeout=30s",
"redis://test@192.168.19.111:7004?timeout=30s",
"redis://test@192.168.19.111:7005?timeout=30s",
"redis://test@192.168.19.111:7006?timeout=30s",
"redis://test@192.168.19.111:7007?timeout=30s",
"redis://test@192.168.19.111:7008?timeout=30s",
"redis://test@192.168.19.111:7009?timeout=30s"
};

redisTimeoutMillis = 30*1000;
redisExpireSeconds = 86400*30;

nxShortTimeArgs = SetArgs.Builder.nx().px(redisTimeoutMillis);
xxExpireTimeArgs = SetArgs.Builder.xx().ex(redisExpireSeconds);
}

// 子类必须覆盖此方法
public String getMessageKey(IN value)
{
// 根据 IN value 记录计算其签名值 (签名值一样的记录被任务是重复记录)
// 子类应该实现这个方法, 返回 null 值表面放弃本消息
return null;
}


// 子类必须覆盖此方法
public void onCollectKey (String key, IN value, Collector<OUT> out)
{
LogUtil.debug(logger, "{%s}", key);
}


// 子类最好忽略此方法
public void onDiscardKey (String key, IN value, Collector<OUT> out)
{
// 其他线程已经处理了这个消息
LogUtil.warn(logger, "{%s}", key);
}


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

redisClient = RedisClusterIO.createClient(redisClusterUris, Duration.ofMillis(redisTimeoutMillis));
connection = redisClient.connect();
}


@Override
public void close() throws Exception {
super.close();

connection.close();
RedisClusterIO.shutdownClient(redisClient);
}


@Override
public void flatMap(IN value, Collector<OUT> out) throws Exception {
RedisAdvancedClusterCommands<String, String> cmds = connection.sync();

// 根据 IN value 记录计算其签名值 key
final String key = getMessageKey(value);

if (key != null) {
// redis 保证 setnx 指令是原子的, 仅仅当 key 不存在设置其值和过期时间
final String result = cmds.set(key, "0", nxShortTimeArgs);

if (result != null && result.equals("OK")) {
// 处理该记录
onCollectKey(key, value, out);

// 至此被任务记录操作成功, 设置一个较长的过期时间.
// redis 保证 setxx 指令是原子的, 仅当 key 存在设置其值和过期时间
cmds.set(key, "1", xxExpireTimeArgs);
} else {
// 忽略该记录
onDiscardKey(key, value, out);
}
}
}
}

redis 集群操作类 RedisClusterIO:

/**
* RedisClusterIO.java
* 操作 Redis 集群
*
*/
package com.pepstack.guru;

import com.pepstack.guru.LogUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Iterator;
import java.util.Map.Entry;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import java.time.Duration;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStringCommands;
import io.lettuce.core.api.async.RedisStringAsyncCommands;

// https://github.com/lettuce-io/lettuce-core/blob/6.0.1.RELEASE/src/test/java/io/lettuce/examples/ConnectToRedisCluster.java
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;


public class RedisClusterIO {
private static final Logger logger = LoggerFactory.getLogger(RedisClusterIO.class);

private RedisClusterIO() {
}


public static RedisClusterClient createClient(String[] uris, Duration defaultTimeout) throws Exception {
RedisClusterClient redisClient = null;

List<RedisURI> list = new ArrayList<RedisURI>();
for (int i = 0; i < uris.length; i++) {
// uri Syntax:
// redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
LogUtil.info(logger, String.format("redis-server.%d=%s", i+1, uris[i]));

list.add(RedisURI.create(uris[i]));
}

redisClient = RedisClusterClient.create(list);

if (defaultTimeout != null) {
redisClient.setDefaultTimeout(defaultTimeout);
}

return redisClient;
}


public static RedisClusterClient createClient(Properties props) throws Exception {
int maxid = 0;
while (props.getProperty(String.format("redis-server.%d", maxid+1), null) != null) {
maxid++;
}

final String[] uris = new String[maxid];
for (int i = 0; i < maxid; i++) {
uris[i] = props.getProperty(String.format("redis-server.%d", i + 1));
}
return createClient(uris, null);
}


public static void shutdownClient(RedisClusterClient client) throws Exception {
client.shutdown();
}
}

特此记录。

举报

相关推荐

0 条评论