0
点赞
收藏
分享

微信扫一扫

大数据系列教程(4)Flink 使用 DataStream API 进行欺诈检测

E_topia 2022-03-23 阅读 64

目录

使用 DataStream API 进行欺诈检测

Apache Flink 提供了一个 DataStream API,用于构建健壮的、有状态的流应用程序。它提供对状态和时间的细粒度控制,从而允许实施高级事件驱动系统。

需求:

信用卡欺诈在数字时代日益受到关注。犯罪分子通过诈骗或侵入不安全的系统来窃取信用卡号码。被盗号码通过一次或多次小额购买进行测试,通常为一美元或更少。如果这行得通,他们就会进行更重大的购买,以获得可以出售或自己保留的物品。

在本教程中,您将构建一个欺诈检测系统,用于提醒可疑的信用卡交易。使用一组简单的规则,您将看到 Flink 如何让我们实现高级业务逻辑并实时行动。

  • 采用JDK8、maven进行构建
$ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-java -DarchetypeVersion=1.14.4 -DgroupId=frauddetection -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport -DinteractiveMode=false

用idea进行打包成jar,然后通过Flink 的Web UI来提交作业。

版本1

FraudDetectionJob.java

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		//执行环境、创建源
        DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .name("transactions");
       
       //分区事件和检测欺诈
        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId)
            .process(new FraudDetector())
            .name("fraud-detector");
      
      //输出结果
        alerts
            .addSink(new AlertSink())
            .name("send-alerts");

        env.execute("Fraud Detection");
    }
}
  • 第一行设置您的StreamExecutionEnvironment. 执行环境是您为 Job 设置属性、创建源并最终触发 Job 执行的方式。
  • 源将来自外部系统(例如 Apache Kafka、Rabbit MQ 或 Apache Pulsar)的数据提取到 Flink Jobs 中。本演练使用的源可生成无限的信用卡交易流供您处理。每笔交易都包含账户 ID ( accountId)、交易发生时间的时间戳 ( timestamp) 和美元金额 ( amount)。
  • transactions流包含来自大量用户的大量交易,因此需要由多个欺诈检测任务并行处理。由于欺诈发生在每个账户的基础上,您必须确保同一账户的所有交易都由欺诈检测器操作员的同一并行任务处理。
  • 为确保同一物理任务处理特定键的所有记录,您可以使用DataStream#keyBy. 该process()调用添加了一个运算符,该运算符将函数应用于流中的每个分区元素。通常说在keyBy 之后的运算符,在这种情况下FraudDetector,是在键控上下文中执行的。
  • 接收器将DataStream写入外部系统;例如 Apache Kafka、Cassandra 和 AWS Kinesis。AlertSink使用日志级别INFO记录每条记录,而不是将其写入持久存储,因此您可以轻松查看Alert结果。

FraudDetector.java

欺诈检测器

package spendreport;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        Alert alert = new Alert();
        alert.setId(transaction.getAccountId());

        collector.collect(alert);
    }
}
  • 欺诈检测器实现为KeyedProcessFunction. KeyedProcessFunction#processElement每个事务事件都会调用它的方法。第一个版本会对每笔交易产生警报,有些人可能会说这过于保守。

  • 本教程的后续步骤将指导您使用更有意义的业务逻辑扩展欺诈检测器。

版本2

对于第一个版本,欺诈检测器应该为任何进行小额交易的账户立即输出警报,然后是大笔交易。小是不到 1.00 美元,大是超过 500 美元。想象一下,您的欺诈检测器处理特定帐户的以下交易流。

12345678910
13.01250.09510102910.023070032

交易 3 和 4 应标记为欺诈,因为这是一笔小额交易,0.09 美元,然后是一笔大笔交易510 美元。或者,交易 7、8 和 9 不是欺诈,因为 0.02 美元的小额金额没有紧跟大额交易;相反,有一个中间交易打破了这种模式。

为此,欺诈检测器必须记住跨事件的信息;只有前一笔交易规模较小时,一笔大额交易才具有欺诈性。跨事件记住信息需要状态,这就是我们决定使用KeyedProcessFunction的原因。它提供了对状态和时间的细粒度控制,这将使我们能够在整个演练中根据更复杂的要求改进我们的算法。

最直接的实现是在处理小事务时设置的布尔标志。当大笔交易通过时,您可以简单地检查是否为该帐户设置了标志。

但是,仅将标志实现为类中的成员变量是FraudDetector行不通的。Flink 处理具有相同对象实例的多个账户的交易FraudDetector,这意味着如果账户 A 和 B 路由通过相同的实例FraudDetector,账户 A 的交易可以将标志设置为 true,然后账户 B 的交易可以设置关闭虚假警报。我们当然可以使用像 Map这样的数据结构来跟踪单个键的标志,但是,一个简单的成员变量不会容错,并且在发生故障时它的所有信息都会丢失。因此,如果应用程序必须重新启动以从故障中恢复,欺诈检测器可能会错过警报。

为了应对这些挑战,Flink 提供了容错状态的原语,这些原语几乎与常规成员变量一样易于使用。

Flink 中最基本的状态类型是ValueState,这是一种数据类型,可以为它包装的任何变量添加容错能力。 ValueState键控状态的一种形式,这意味着它仅在应用于键控上下文的运算符中可用;紧随其后的任何运算符DataStream#keyBy。运算符的键控状态自动限定为当前处理的记录的键。在这个例子中,key 是当前交易的账户 ID(由 声明keyBy()),并FraudDetector为每个账户维护一个独立的状态。 ValueState是使用创建的ValueStateDescriptor其中包含有关 Flink 应如何管理变量的元数据。状态应该在函数开始处理数据之前注册。正确的钩子是open()方法。

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private transient ValueState<Boolean> flagState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);
    }
    
    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        // Get the current state for the current key
        Boolean lastTransactionWasSmall = flagState.value();

        // Check if the flag is set
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                // Output an alert downstream
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);            
            }

            // Clean up our state
            flagState.clear();
        }

        if (transaction.getAmount() < SMALL_AMOUNT) {
            // Set the flag to true
            flagState.update(true);
        }
    }
    
}    
  • ValueState是一个包装类,类似于AtomicReferenceAtomicLong在 Java 标准库中。它提供了三种与其内容交互的方法;update设置状态,value获取当前值,并clear删除其内容。如果特定键的状态为空,例如在应用程序开始时或调用后ValueState#clearValueState#value则将返回null。不保证对返回的对象的修改ValueState#value被系统识别,因此所有更改都必须使用ValueState#update. 否则,容错由 Flink 在后台自动管理,因此您可以像使用任何标准变量一样与之交互。

  • 对于每笔交易,欺诈检测器都会检查该帐户的标志状态。请记住,ValueState总是范围为当前键,即帐户。如果标志不为空,则该帐户的最后一笔交易很小,因此如果该交易的金额很大,则检测器会输出欺诈警报。

    在那次检查之后,标志状态被无条件地清除。要么当前交易导致欺诈警报,并且模式结束,要么当前交易没有引起警报,并且模式被破坏并且需要重新启动。

    最后,检查交易金额是否小。如果是这样,则设置标志以便下一个事件可以检查它。请注意,它ValueState<Boolean>具有三个状态,未设置 ( null) true、 和false,因为所有ValueState’ 都可以为空。该作业仅使用 unset ( null) 并true检查标志是否已设置。

版本3

诈骗者不会等待很长时间进行大量购买,以减少他们的测试交易被注意到的机会。例如,假设您想为欺诈检测器设置 1 分钟的超时时间;即,在前面的示例中,交易 3 和 4 仅在它们发生在 1 分钟内时才会被视为欺诈。FlinkKeyedProcessFunction允许您设置在未来某个时间点调用回调方法的计时器。

让我们看看如何修改我们的 Job 以符合我们的新要求:

  • 每当标志设置为 时true,还要在未来设置一个 1 分钟的计时器。
  • 当计时器触发时,通过清除其状态来重置标志。
  • 如果标志被清除,则应取消计时器。

要取消计时器,您必须记住它设置的时间,并且记住意味着状态,因此您将从创建计时器状态和标志状态开始。

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    private transient ValueState<Boolean> flagState;
    private transient ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        // Get the current state for the current key
        Boolean lastTransactionWasSmall = flagState.value();

        // Check if the flag is set
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                //Output an alert downstream
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);
            }
            // Clean up our state
            cleanUp(context);
        }

        if (transaction.getAmount() < SMALL_AMOUNT) {
            // set the flag to true
            flagState.update(true);

            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
            context.timerService().registerProcessingTimeTimer(timer);

            timerState.update(timer);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        // remove flag after 1 minute
        timerState.clear();
        flagState.clear();
    }

    private void cleanUp(Context ctx) throws Exception {
        // delete timer
        Long timer = timerState.value();
        ctx.timerService().deleteProcessingTimeTimer(timer);

        // clean up all state
        timerState.clear();
        flagState.clear();
    }
}
  • KeyedProcessFunction#processElement``Context使用包含计时器服务的a 调用。定时器服务可用于查询当前时间、注册定时器和删除定时器。有了这个,您可以在每次设置标志时将计时器设置为 1 分钟,并将时间戳存储在timerState.

  • 处理时间为挂钟时间,由运行操作员的机器的系统时钟决定。

    当计时器触发时,它会调用KeyedProcessFunction#onTimer. 覆盖此方法是您如何实现回调以重置标志。

    最后,要取消定时器,需要删除已注册的定时器,并删除定时器状态。您可以将其包装在辅助方法中并调用此方法而不是flagState.clear().

使用提供的代码运行此代码TransactionSource将为帐户 3 发出欺诈警报。您应该在任务管理器日志中看到以下输出:

2019-08-19 14:22:06,220 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:11,383 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
举报

相关推荐

0 条评论