学习笔记flink——基于flink在线交易反欺诈检测(代码片段)

别呀 别呀     2023-03-30     151

关键词:

一、背景介绍

信用卡欺诈
信用卡欺诈是指故意使用伪造、作废的信用卡,冒用他人的信用卡骗取财物,或用本人信用卡进行恶意透支的行为。在当今数字时代,信用卡欺诈行为越来越被重视。
罪犯可以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。 用盗得的信用卡进行很小额度的消费进行测试。 如果测试消费成功,那么他们就会用这个信用卡进行大笔消费。

信用卡欺诈行为

交易3和交易4应该被标记为欺诈行为,因为交易3是一个100¥的小额交易,而紧随着的交易4是一个10000¥的大额交易。

另外,交易5、6和交易7就不属于欺诈交易了,因为在交易5这个500¥的小额交易之后,并没有跟随一个大额交易,而是一个金额适中的交易,这使得交易5到交易7不属于欺诈行为。


二、架构设计

架构设计

数据流设计

数据流落地实现


三、Kafka信用卡消费数据

3.1、Kafka Producer

模拟Kafka Producer定时生成消费数据

TransactionData.java:

package fraud_detection;

public class TransactionData 
    private String user;
    private double money;

    public TransactionData()
    public TransactionData(String user,double money)
        this.user=user;
        this.money=money;
    

    @Override
    public String toString()
        return this.user + "," + this.money;
    

TransactionDataGenerator.java:

package fraud_detection;

import java.util.Random;

public class TransactionDataGenerator 
    public static final  int USER_SIZE = 10;
    public static final float BIG_MONEY_PERCENT = 0.02f;
    static Random random = new Random();
    public static TransactionData getData()
        return new TransactionData(generateUser() ,  generateMoney()) ;
    
    private static String generateUser()
        return "user_"+random.nextInt(USER_SIZE);
    
    private static float generateMoney()
        float i = random.nextFloat();
        if( i > BIG_MONEY_PERCENT)
            return random.nextFloat() * 1000;
        else
            return i * 10000000;
        
    

    public static void main(String[] args)
        TransactionData data = null;
        for(int i = 10000 ;i >0 ; i--)
            data = getData();
            System.out.println(data);
        
    

TransactionDataProducer.java:

package fraud_detection;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class TransactionDataProducer 
    public static void main(String[] args) throws InterruptedException 
        String topic = "fraud00";
        Map<String,Object> kafkaProperties = new HashMap<>();
        kafkaProperties.put("bootstrap.servers","node100:9092,node101:9092,node102:9092");
        kafkaProperties.put("acks", "all");
        kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(kafkaProperties);
        int size = 30*1000/10;
        long interval = 10L;
        String data = "";
        for (int i = 0; i < size; i++) 
            Thread.sleep(interval);
            data= TransactionDataGenerator.getData().toString();
            producer.send(new ProducerRecord<>(topic, data));
        
        producer.close();
        System.out.println("消息发送完成!");
    


3.2、整合Kafka Transaction数据

FraudDetection.scala:

package flink_kafka
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object FraudDetection 
  def main(args: Array[String]): Unit = 
    val topic = "fraud00"
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "node100:9092")
    properties.setProperty("group.id", "test")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
      .addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties))
    stream.map(x => val data = x.split(","); (data(0).trim, data(1).trim.toDouble))
      .keyBy(0)
      .mapWithState[(String, Double), Double]((in: (String, Double), state: Option[Double]) =>
        state match 
          case None => (("", 0.0), Some(in._2))
          case Some(previous) => if (in._2 > 10000.0 && previous < 1000.0)
            ((in._1 + "->" + previous, in._2), Some(in._2)) else (("", 0.0), Some(in._2))
        )
      .filter(x => x._2 > 0.0)
      .print()
    env.execute("Fraud Detection")
  

测试:

① 先创建fraud00 话题

将产生的数据存到/tmp目录下(了解)

② 运行FraudDetection:

③ 运行TransactionDataProducer

结果:


学习笔记flink——基于flink在线交易反欺诈检测(代码片段)

一、背景介绍信用卡欺诈信用卡欺诈是指故意使用伪造、作废的信用卡,冒用他人的信用卡骗取财物,或用本人信用卡进行恶意透支的行为。在当今数字时代,信用卡欺诈行为越来越被重视。罪犯可以通过诈骗或者入... 查看详情

学习笔记flink——基于flink在线交易反欺诈检测(代码片段)

一、背景介绍信用卡欺诈信用卡欺诈是指故意使用伪造、作废的信用卡,冒用他人的信用卡骗取财物,或用本人信用卡进行恶意透支的行为。在当今数字时代,信用卡欺诈行为越来越被重视。罪犯可以通过诈骗或者入... 查看详情

flink:风控/反欺诈检测系统案例研究1,2,3

https://flink.apache.org/news/2020/01/15/demo-fraud-detection.htmlhttps://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.htmlhttps://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.ht 查看详情

flink:风控/反欺诈检测系统案例研究1,2,3

https://flink.apache.org/news/2020/01/15/demo-fraud-detection.htmlhttps://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.htmlhttps://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.ht 查看详情

flink:风控/反欺诈检测系统案例研究1,2,3

...f0c;您可以如何利用低级进程函数API。具体来说,您将学习如何在Windows上实现 查看详情

flink学习笔记概述

...使用,例如实时数据处理、批处理、图形计算和机器学习等。Flink还具有高可用性、低延迟、高吞吐量和高扩展性等特点,是近年来非常流行的数据处理框架之一。二、flink的使用场景有哪些实时数据处理:Flink可用于... 查看详情

flink学习笔记:flink的最简安装

文章目录一、Flink概述二、Flink的最简安装(一)获取flink安装包(二)上传flink安装包(三)解压flink安装包(四)配置环境变量三、利用FlinkShell实现词频统计(一)准备数据文件(二)启动flink本地服务(三)实现词频统计1、... 查看详情

flink尚硅谷学习笔记(代码片段)

简介尚硅谷Flink(Scala版)教程丨清华硕士-武晟然老师主讲https://www.bilibili.com/video/BV1Qp4y1Y7YN官网:http://flink.apache.org/目录   P01.尚硅谷_Flink-Flink简介------------   07:56   P02.尚硅谷_Flink-Flink应用场景---- 查看详情

学习笔记flink——flink简介(介绍基本概念应用场景)

一、Flink介绍ApacheFlink是一个分布式流批一体化的开源平台。Flink的核心是一个提供数据分发、通信以及自动容错的流计算引擎。Flink在流计算之上构建批处理,并且原生的支持迭代计算,内存管理以及程序优化。对Flink而... 查看详情

学习笔记flink——flink简介(介绍基本概念应用场景)

一、Flink介绍ApacheFlink是一个分布式流批一体化的开源平台。Flink的核心是一个提供数据分发、通信以及自动容错的流计算引擎。Flink在流计算之上构建批处理,并且原生的支持迭代计算,内存管理以及程序优化。对Flink而... 查看详情

flink系统性学习笔记

1.流计算须知Streaming101:批处理之外的流式世界第一部分Streaming102:批处理之外的流式世界第二部分Exactlyonce未必严格一次2.基础Flink安装与启动 查看详情

flink系统性学习笔记

1.流计算须知Streaming101:批处理之外的流式世界第一部分Streaming102:批处理之外的流式世界第二部分Exactlyonce未必严格一次2.基础Flink安装与启动 查看详情

学习笔记flink——flink安装启动与监控(代码片段)

一、Linux环境准备Centos7,1CPU,2GMemory,20GDisk,VirtualSystemHostname:node110.centos.com、node111.centos.com、node112.centos.comIPAddress:192.168.128.110、192.168.128.111、192.168.128 查看详情

学习笔记flink——flink安装启动与监控(代码片段)

一、Linux环境准备Centos7,1CPU,2GMemory,20GDisk,VirtualSystemHostname:node110.centos.com、node111.centos.com、node112.centos.comIPAddress:192.168.128.110、192.168.128.111、192.168.128 查看详情

学习笔记flink——flink数据流模型时间窗口和核心概念

一、Flink编程数据流模型1.1、Flink–API封装Flink提供不同级别的API封装来支持流/批处理应用程序。1.2、Flink-编程数据流Source:一个不会结束的数据记录流。Transformations:使用一个或多个数据流作为输入,生成一个或多... 查看详情

学习笔记flink——flink数据流模型时间窗口和核心概念

一、Flink编程数据流模型1.1、Flink–API封装Flink提供不同级别的API封装来支持流/批处理应用程序。1.2、Flink-编程数据流Source:一个不会结束的数据记录流。Transformations:使用一个或多个数据流作为输入,生成一个或多... 查看详情

学习笔记flink——flink基础api及核心数据结构(代码片段)

一、Flink基础API-Flink编程的基本概念1.1、Flink程序Flink程序是实现了分布式集合转换(例如过滤、映射、更新状态、join、分组、定义窗口、聚合)的规范化程序。集合初始创建自source(例如读取文件、kafka主题,或... 查看详情

学习笔记flink——flink基础api及核心数据结构(代码片段)

一、Flink基础API-Flink编程的基本概念1.1、Flink程序Flink程序是实现了分布式集合转换(例如过滤、映射、更新状态、join、分组、定义窗口、聚合)的规范化程序。集合初始创建自source(例如读取文件、kafka主题,或... 查看详情