对信用卡欺诈sayno|百行代码实现简化版实时欺诈检测(代码片段)

woqutechteam woqutechteam     2023-02-18     769

关键词:

进入互联网时代,你的绝大部分操作都可以在网上进行,极大的方便了我们的生活。但是信用卡盗刷者也可以利用网络来诈骗,典型的做法是:诈骗者首先入侵安全级别较低系统来盗窃信用卡卡号,用盗得的信用卡进行很小额度的消费进行测试,如果测试消费成功,那么他们就会用这个信用卡进行大笔消费,来购买那些可以倒卖的财物,实现诈骗敛财的目标。

大部分银行都有针对信用卡诈骗的反欺诈检测系统,通过对诈骗模式进行识别,及时通知用户或者直接冻结账户,来避免进一步损失。flink的入门介绍文档中就展示一种信用卡诈骗检测的实现方式,但是数据来源是一个静态数组,不符合实际用户场景。本文将介绍一种方案,通过少于 100 行代码修改该实例程序,实现基于 Oracle 上账户表变更的实时欺诈检测。

1.传统实时欺诈检测方案分析

Flink示例程序会检测每一笔交易,若发现一个帐户在 1 分钟内,先出现了一笔小交易(小于 1),后面又出现了一笔大交易(大于 500),则认为出现了欺诈交易,立即输出警告。具体的代码解析可以阅读基于DataStream API 实现欺诈检测

但是,示例程序中的数据来源 TransactionSource 的数据来源是一个静态数组 private static List<Transaction> data = Arrays.asList(new Transaction(1L, 0L, 188.23D)...的迭代器。一般情况下,客户的余额是存储在 Oracle 的账户表中的。怎么将客户余额的变化输出到 Flink 中,来实现实时的欺诈检测列?能想到的方案列举如下:

  • 方案 1:轮询从 Oracle 账户表查询余额变更 应用程序固定时间间隔去轮询 Oracle 账户表的数据,检查到某个客户的账户余额发生了变化后,通知 Flink 进行欺诈检测。这种方案需要不断轮询 Oracle 数据库,对有数据库性能影响,并且就算轮询的间隔足够短,还是有可能漏掉了一些账户变更信息,不可取。

  • 方案 2:业务代码修改 Oracle 账户表时通知 Flink 修改交易程序,在它去更新 Oracle 账户表时,通知 Flink 进行欺诈检测。这种方案的优势在于不会丢掉任何账户变更的事件;但是需要修改交易程序,会导致业务程序耦合度提升。实现上如果采用同步模式,可能会由于 Flink 失败而导致交易失败,也会大大提高交易持续时间;而采用异步方式,需要考虑通知 Flink 和写入账户表的原子性,有可能成功通知了 Flink 但是写入账户表失败了,也有可能写入账户表成功了,却没有通知到 Flink。

  • 方案 3:利用 logminer 抽取账户表变更 Oracle 提供 logminer 来将数据库日志反解析成变更 SQL,这样就可以将 Oracle 账户表更新的信息抽取出来,通知 Flink 进行欺诈检测。这种方案的优点在于直接基于 Oracle 数据表的修改来做增量的同步(oracle 日志中记录账户表修改并提交了,说明客户修改账户是成功的,不用担心 Flink 通知了,账户表反而写失败了),降低了业务的耦合度,也不会担心丢失了账户变更事件;但是 logminer 每次只能挖掘一整个日志的变化,没法断点续传,并且挖掘的数据也只能写入 alert.log,会污染错误日志。这个方案缺陷也比较大。

上述三个方案都有一定的缺陷和问题,要么可能会漏掉部分变更数据,要么可能影响 oracle 性能。logminer 相对来说是避免漏数据,对数据库性能影响最小的方案,是否有一个类似于 logminer 而且支持断点续传,对 Flink 又比较友好的方案?

  • 方案 4:利用 OGG 抽取账户表变更 Oracle 公司的 OGG 和国内部分厂商基本能避免 logminer 的缺点,但是需要在 Oracle 服务器上安装客户端,有侵入,并且配置和使用比较复杂,价格上也不是很友好。

最后我们找到了一个轻量、免费日志解析工具QDecoder来替代 OGG,将 oracle 账户表的变更通知到 Flink,从而实现欺诈检测的方法。官方介绍:QDecoder 是沃趣科技自主研发,基于 Oracle redo 日志进行二进制解析的订阅同步工具,易集成、零侵入、高性能、全免费。目前,QDecoder 已经在多家证券和银行上线使用,稳定运行,得到诸多客户的肯定与认可。

2.Oracle 源端增量输出

期待实现的实时欺诈检测的架构:

如图:

  • QDecoder 实时获取 Oracle 的 redo log,将账户表数据变更解析出来,写入 kafka 的指定 topic 中。

  • Flink 程序(FraudDetection)使用 flink-connector-kafka 从 kafka 获取交易数据,进行流式计算,识别出可能的欺诈交易,并输出警告。

2.1 安装 QDecoder

为了从 Oracle 的日志中挖掘出 account 表的变更数据,我们需要安装 QDecoder。QDecoder 的安装也非常简单,并且它跟其他的数据库同步软件不一样,它默认是不需要到 Oracle 服务器上去部署客户端的,只需要给定 ASM 账号就可以通过网络连接到 Oracle 上取日志。

一键安装命令如下: docker run -it --name=qdecoder -p 9191:9191 -p 9092:9092 --pull always registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder

根据提示配置 QDecoder,更多信息可参考 Docker Hub

以下配置需要特别注意:

  • 配置项 1.1 中列出的 sql,请以 dba 权限在 oracle 中执行,这将配置 QDecoder 查询系统表需要的权限。

  • 配置项 2.1: 输入将要检测的表:qdecoder.account。简单起见,我们这里把 account 示例表新建在 qdecoder 账户下了。conn qdecoder/qdecoder;create table account(accountid int primary key, balance number);

  • 配置项 3.1: 选择输出到 kafka, bootstrap.servers 可以不输入,直接在容器中启动 kafka,qdecoder 会将变更数据写入“defaultapp.qdecoder.binlog.qdecoder”的 topic。生产环境下如果有 kafka 集群,请输入集群连接的地址和账号。

下面是配置示例: 

2.2 更新账户表观察 QDecoder 的输出

等 QDecoder 成功启动后,可以按照提示运行 binlogdumpK,从 kafka 读取 binlog 并打印出来。由于 account 表并没有更新,此时没有变更数据输出。

我们来更新 account 表: 

insert into account values(1,10000);insert into account values(2,20000);insert into account values(3,30000);commit;

binlogdumpK 输出的 binlog 如下图:

上图中 schemaName: "qdecoder",tableName: "account", eventType: INSERT 表示这里是对qdecoder.account表的 INSERT 操作。其实它对应的 sql 就是insert into qdecoder.account(accountid,balance) values(1,10000)

这里

  • Oracle 的日志解析出来是遵循阿里巴巴canal的 protobuf 格式,这个是关系型数据库增量输出的标准格式

  • binlogdumpK 只是为了观察一下 QDecoder 的输出,你可以随时关掉它,这并不影响 QDecoder 和 Flink 程序的运行。

至此,我们已经利用 QDecoder 从 Oracle 的日志解析出账户表的数据变更,那么,怎么将这些输出作为 Flink 现有的欺诈检测的输入源列呢?

3.实现 Kafka Consumer 对接 Oracle 源端增量数据

 Flink示例程序是利用 addSource(new TransactionSource())来将静态数组作为源加入流处理的

DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .name("transactions");

我们要修改为从 kafka 中取日志,可以利用 DataStream Connectors 配置一个 Kafka Consumer。按照Flink自带的示例中的DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));添加数据源。这里

  • "topic"应该修改为"defaultapp.qdecoder.binlog.qdecoder",这是 QDecoder 插入 kafka 的 topic 名称

  • properties 修改为你连接的 kafka 的地址,我们这里使用的是 QDecoder 自带的 kafka,修改为 127.0.0.1:9092

  • SimpleStringSchema()是直接以 string 的方式输出出来,QDecoder 的输出是 protobuf 的,不能直接用 string 的方式输出,而需要解析出来转换成欺诈检测认识的 Transaction

3.1 binlog 转换成 Transaction 实现

为了能完全重用 Flink 示例程序中的代码,我们这里需要

  • 使用 com.alibaba.otter.canal.protocol 反序列化 QDecoder 插入到 kafka 的 binlog 日志

  • 将 binlog 日志中的账户表变更转换成 Transaction 对象。

我们实现一个BinlogTransactionSchema类反序列化 binlog, 计算 balance 的变化,生成 org.apache.flink.walkthrough.common.entity.Transaction 对象。

主要代码如下:

// 反序列化Entry
CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(binlog);
// 获取表名
entry.getHeader().getTableName();
// 获取Entry type:ROWDATA|TRANSACTIONBEGIN|TRANSACTIONEND|...
entry.getEntryType();
// 获取row change
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
// 获取Event type: INSERT|UPDATE|DELTE|...
rowChange.getEventType();
// 获取执行时间
long executeTimeMs = entry.getHeader().getExecuteTime();
// 获取row data
CanalEntry.RowData rowData = rowChange.getRowDatas(0);

// 获取旧值
for (CanalEntry.Column col : rowData.getBeforeColumnsList()) 
    if (col.getName().equalsIgnoreCase("accountid"))  
       oldRow.accountId = Long.parseLong(col.getValue()); 
     else if (col.getName().equalsIgnoreCase("balance")) 
        oldRow.balance = Double.parseDouble(col.getValue());
    


// 获取新值
for (CanalEntry.Column col : rowData.getAfterColumnsList()) 
    if (col.getName().equalsIgnoreCase("accountid")) 
        newRow.accountId = Long.parseLong(col.getValue())
     else if (col.getName().equalsIgnoreCase("balance")) 
        newRow.balance = Double.parseDouble(col.getValue());
    


// 创建transactionnew
 Transaction(newRow.accountId, executeTimeMs, Math.abs(newRow.balance-oldRow.balance));

3.2 将数据源加入 Flink

QDecoder 输出的增量数据转换后,可以直接作为数据源形成 DataStream

修改后代码如下:

public static void main(String[] args)throws Exception
StreamExecutionEnvironment env=StramExecutionEnvironment.getExecutionEnvironment();

// 创建FlinkKafkaConsumer,用BinlogTransactionSchema反序列化
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("group.id", "flink.test");

// 指定"defaultapp.qdecoder.binlog.qdecoder"的topic,使用BinlogTransactionSchema来将QDecoder账户表的更新转换成Transaction
FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<Transaction>("defaultapp.qdecoder.binlog.qdecoder", new BinlogTransactionSchema(), properties);
kafkaSource.setStartFromEarliest();

DataStream<Transaction> transactions = env
    .addSource(kafkaSource)
    .name("transactions");
…

3.3 验证运行

修改好的应用程序我们已经上传到 github 上供下载,测试运行步骤如下:

3.3.1 程序下载

git clone https://github.com/woqutech/qdecoder.gitcd qdecoder/FlinkSample/frauddetection

3.3.2 程序运行

frauddetection 是一个 maven 创建的项目,有 pom.xml 项目文件,可以导入各种 IDE,进行调试和运行。这里只介绍大家常用的 intellij IDEA 运行验证方法

  • 打开项目 开始界面:open or import -> 选择 frauddetection 目录 或者 菜单: file/open -> 选择 frauddetection 目录

  • 运行程序 菜单: run -> run 'FraudDetectionJob'

注意:如果报告 slf4j 重复,且有大量的 log 输出,请在 module/dependencies 中删除 ch.qos.logback:logback-classic 和 ch.qos.logback:logback-core。

3.3.3 欺诈验证检测

QDecoder 和 frauddetection 正常运行以后,我们就可以更新 account.balance,模拟交易,来观察 frauddetection 程序是否能进行欺诈检测了。

  • 在 Oracle 上执行以下 SQL

update account set balance = balance - 0.1 where accountid = 1;commit;update account set balance = balance - 0.2 where accountid = 1;commit;update account set balance = balance + 100 where accountid = 2;commit;update account set balance = balance - 501 where accountid = 1;commit;update account set balance = balance - 200 where accountid = 2;commit;

这里我们模拟了两个账号 account=1/2 的交易。其中 account=2 先入账 100 元,然后扣了 200 元是正常交易,不满足欺诈检测条件。account=1 的账户余额先扣了 0.1 元,然后再扣了 0.2 元,最后直接扣了 501 元,满足欺诈检测的条件:在一分钟内(private static final long ONE_MINUTE = 60 * 1000;)先做小于 1 元(private static final double SMALL_AMOUNT = 1.00)的小额交易,然后再做 500 以上的大额交易(private static final double LARGE_AMOUNT = 500.00;)。

  • 检查欺诈交易是否被正确识别

执行完上述 SQL,frauddetection 程序会立即输出:

21:11:20,107 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alertid=1

表示 accountid=1 的帐号检测到欺诈交易。

至此,我们的 frauddetection 欺诈检测程序就修改完成了。

回顾整个过程:

  • 利用 QDecoder 实现了 Oracle 数据库的日志订阅增量数据导出

  • 利用 alibaba canal 的 protobuf 来解析 Oracle 增量变化

  • 实现了不到 100 行代码的 BinlogTransactionSchema 类,用于将解析数据转换为欺诈检测识别的数据

  • 利用 Flink 的 DataStream Connectors 从 kafka 取出增量变化数据

  • 利用了 Flink 实现增量流数据的有状态计算分布式处理,实现欺诈检测

4.总结

综上所述,我们利用 QDecoder 和 Flink 写了少于 100 行的代码,实现了一个简化版的银行信用卡欺诈检测程序。整体来说,利用 Flink+QDecoder 可以很容易将 oracle 的增量变化取出来,同步给大数据平台或者数据湖,有助于将静态数据流动起来,帮助企业盘活数据资产,提升运营决策效果。

信用卡欺诈的 Python 算法

】信用卡欺诈的Python算法【英文标题】:PythonalgorithmforCreditCardfraud【发布时间】:2020-10-1102:53:43【问题描述】:提示:根据一些简单的算法,实现一个判断卡号是否有效的函数。假设信用卡号是一个由14个字符组成的字符串,格... 查看详情

学习笔记flink——基于flink在线交易反欺诈检测(代码片段)

一、背景介绍信用卡欺诈信用卡欺诈是指故意使用伪造、作废的信用卡,冒用他人的信用卡骗取财物,或用本人信用卡进行恶意透支的行为。在当今数字时代,信用卡欺诈行为越来越被重视。罪犯可以通过诈骗或者入... 查看详情

学习笔记flink——基于flink在线交易反欺诈检测(代码片段)

一、背景介绍信用卡欺诈信用卡欺诈是指故意使用伪造、作废的信用卡,冒用他人的信用卡骗取财物,或用本人信用卡进行恶意透支的行为。在当今数字时代,信用卡欺诈行为越来越被重视。罪犯可以通过诈骗或者入... 查看详情

学习笔记flink——基于flink在线交易反欺诈检测(代码片段)

一、背景介绍信用卡欺诈信用卡欺诈是指故意使用伪造、作废的信用卡,冒用他人的信用卡骗取财物,或用本人信用卡进行恶意透支的行为。在当今数字时代,信用卡欺诈行为越来越被重视。罪犯可以通过诈骗或者入... 查看详情

学习笔记flink——基于flink在线交易反欺诈检测(代码片段)

一、背景介绍信用卡欺诈信用卡欺诈是指故意使用伪造、作废的信用卡,冒用他人的信用卡骗取财物,或用本人信用卡进行恶意透支的行为。在当今数字时代,信用卡欺诈行为越来越被重视。罪犯可以通过诈骗或者入... 查看详情

基于逻辑回归信用卡欺诈检测(代码片段)

文件读取importpandasaspdimportmatplotlib.pyplotaspltimportnumpyasnp%matplotlibinline#由于数据太多,只读取前1000行data=pd.read_csv("creditcard.csv",nrows=1000)data.head()数据预处理缺失值、异常值的处理、删除多余列#判断是否有缺失值data.isnull()#如果有 查看详情

通俗易懂--信用卡欺诈预测案例讲解(算法+案例)(代码片段)

1.信用卡欺诈预测案例这是一道kaggle上的题目。我们都知道信用卡,能够透支一大笔钱来供自己消费,正因为这一点,不法分子就利用信用卡进一特性来实施欺诈行为。银行为了能够检测出这一欺诈行为,通过机器学习模型进行... 查看详情

全面解析反欺诈(羊毛盾)api,助你识别各类欺诈风险(代码片段)

...失。在线支付在线支付时用于检测是否存在欺诈行为,如信用卡欺诈、虚假退款等。社交媒体平台在社交媒体平台上用于检测虚假账号、水军等欺诈行为。在线招聘用于识别虚假简历、造假等欺诈行为,保障招聘的公平性和效率... 查看详情

如何阻止欺诈信用卡/借记卡?

】如何阻止欺诈信用卡/借记卡?【英文标题】:Howtoblockfraudcredit/debitcards?【发布时间】:2019-02-2514:38:36【问题描述】:我不确定这是不是问这个问题的正确地方,请让我知道我可以在哪里尝试。我非常担心阻止/禁止使用欺诈行... 查看详情

实战案例|基于机器学习的python信用卡欺诈检测!(代码片段)

...具!当我们在网上购买产品时,很多人喜欢使用信用卡。但信用卡欺诈常常会在身边发生,网络安全正成为我们生活中至关重要的一部分。为了解决这个问题,我们需要利用机器学习算法构建一个异常行为的识别... 查看详情

大数据在银行业的应用与实践

...分析场所。二、客户信用评级银行可以通过手机客户申请信用卡的数据,分析客户的信用程度,从而帮助业务人员做出相应的决策。三、客户与市场洞察银行可以通过跟踪社交媒体的评论信息,利用各种非结构化数据,对客户进... 查看详情

小额贷款反欺诈及信用风控

反欺诈检验的,是客户借款用途是否为伪造,对应兑付意愿;信用风险检测的,是符合该借款用途的客户能否满足产品的价格下限,对应的是兑付能力。检验客户的兑付意愿和兑付能力,就是微观层面的风控。 兑付意愿包含... 查看详情

信用卡欺诈检测分析案例

data=pd.read_csv("creditcard.csv")data.head()count_classes=pd.value_counts(data[‘class‘],sort=True).sort_index()#value_count:计算数值的个数count_classes.plot(kind=‘bar‘)#绘制条形图plt.title("Fraudclasshiistogram" 查看详情

flinkv1.13实现金融反诈骗案例(代码片段)

...处理程序。你要搭建一个什么系统#在当今数字时代,信用卡欺诈行为越来越被 查看详情

flinkv1.13实现金融反诈骗案例(代码片段)

...处理程序。你要搭建一个什么系统#在当今数字时代,信用卡欺诈行为越来越被 查看详情

如何处理数据不均衡问题(分类问题)(代码片段)

...不平衡通常反映了数据集中类别的不均匀分布。例如,在信用卡欺诈检测数据集中,大多数信用卡交易类型都不是欺诈,仅有很少一部分类型是欺诈交易,如此以来,非欺诈交易和欺诈交易之间的比率达到50:1。本文中,我将使... 查看详情

科普文:银行业9大数据科学应用案例解析!

...(10)结论1、欺诈识别机器学习对于有效检测和防范涉及信用卡,会计,保险等的欺诈行为至关重要。银行业务中的主动欺诈检测对于为客户和员工提供安全性至关重要。银行越早检测到欺诈行为,其越快可以限制帐户活动以减... 查看详情

在线支付欺诈预防最佳实践

...题描述】:我正在寻找一些可靠的方法或算法来帮助检测信用卡在线支付中的可疑活动。例如,如果有人提出非常频繁的付款请求或付款金额超出一定限额等。从理论上讲,给定用户的付款可能会非常快。有什么想法吗?【问题... 查看详情