风控系统指标计算/特征提取分析与实现01,Redis、Zset、模版方法

个人博客:无奈何杨(wnhyang)

个人语雀:wnhyang

共享语雀:在线知识共享

Github:wnhyang - Overview


引用AI对于风控系统的介绍

风控系统是一种用于在线业务的安全管理系统,它帮助企业和平台防范潜在的欺诈、信用风险以及不合规行为。简单来说,它的核心作用就是“保安全、防欺诈、控风险”。

最近也一直在研究风控系统体系、功能等,看了一些有关的文章,并且也在实践尝试中。

其实前一篇可配置“输入参数的接口如何设计”就是实践尝试的一部分,未来还会有更多的。

而本篇文章就风控系统的指标计算,或者说是特征提取做一些探讨,以下统一称呼为“指标”。

指标不仅可以作为风控系统的一部分配合风控规则或是模型/机器学习使用,而且可以用于离线分析、事后追查、用户画像标签等方面。

参考文章

风控笔记06:一个完整的风控引擎,需要有哪些功能?

风控笔记07:最常用的风控工具-特征库

指标分类

指标是由数据流支撑的,指标是时间纬度的数据提取计算。

根据指标分类举几个例子:

  • 次数统计:最近24小时\({客户号}向\){银行卡卡号}转账笔数
  • 求和:最近2天\({客户号}向\){银行卡卡号}转账金额之和
  • 平均:最近1个月\({客户号}向\){银行卡卡号}转账金额的平均数
  • 关联次数:最近72小时\({客户号}关联\){设备mac地址}的次数
  • 等等

指标类型枚举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* @author wnhyang
* @date 2024/3/13
**/
@AllArgsConstructor
@Getter
public enum IndicatorType {
COUNT(0, "count", "次数统计"),
SUM(1, "sum", "求和"),
AVG(2, "avg", "平均"),
MAX(3, "max", "最大值"),
MIN(4, "min", "最小值"),
ASS(5, "ass", "关联次数");

private final Integer code;

private final String name;

private final String desc;
}

指标实体类

目前filterScript条件还未确实如何做,使用Groovy脚本还是什么,之后再定吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/**
* @author wnhyang
* @date 2024/3/13
**/
@Data
@EqualsAndHashCode(callSuper = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("de_indicator")
public class IndicatorPO extends BasePO {

@Serial
private static final long serialVersionUID = 1L;

/**
* 自增编号
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;

/**
* 指标名
*/
@TableField("name")
private String name;

/**
* 状态
*/
@TableField("status")
private Boolean status;

/**
* 类型
*/
@TableField("type")
private Integer type;

/**
* 计算字段
*/
@TableField("calc_field")
private String calcField;

/**
* 窗口大小
*/
@TableField("win_size")
private String winSize;

/**
* 窗口类型
*/
@TableField("win_type")
private String winType;

/**
* 窗口数量
*/
@TableField("win_count")
private Integer winCount;

/**
* 时间片
*/
@TableField("time_slice")
private Long timeSlice;

/**
* 主字段
*/
@TableField("master_field")
private String masterField;

/**
* 从字段
*/
@TableField("slave_fields")
private String slaveFields;

/**
* 过滤脚本
*/
@TableField("filter_script")
private String filterScript;

/**
* 版本号
*/
@TableField("version")
private Integer version;

/**
* 描述
*/
@TableField("description")
private String description;
}

指标窗口枚举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author wnhyang
* @date 2024/3/13
**/
@AllArgsConstructor
@Getter
public enum WinType {

LAST(0, "last", "最近"),
CUR(1, "cur", "本");

private final Integer code;

private final String name;

private final String desc;
}

指标存储

指标数据如何存储呢?下面是Redis方案。

1、使用Redis的有序集合(Sorted Set)结构,有序集合中的每个元素都有一个分数(score),这里的分数我们可以设置为时间戳。

2、添加数据:每当有新的请求到来时,将当前时间戳作为score,用一个固定的字符串(如"request")或者其他唯一标识符作为member,插入到有序集合中。

3、清理过期数据:每次添加新数据后,通过ZRANGEBYSCORE命令获取并删除窗口范围之外的数据。

4、统计指标:要得到窗口内的请求次数,可以直接使用ZCARD命令获取有序集合中元素的数量。

5、定时任务:为了确保过期数据能够自动清除,可以结合RedisKey空间通知机制(Keyspace Notifications)或者外部定时任务定期执行上述清理操作。

计算类指标

计算类指标比较通用,需要存储的数据很容易分析。

但还是有些差别的,次数统计可以在zset中存储value为${事件id},score为时间戳,计算时为zsetsize,但是对于平均、最大值、最小值、求和不能这么做,因为这些都是有计算字段的,并不是如次数统计那样取size就行,所以对于这类指标数据存储是不一样的。

zsetzset+hash两种方案。

单zset

zset作为时间窗口,value为${事件id}+{计算字段},score为时间戳,变化就是value变为事件id与计算字段的组合,计算字段这样就存储下来了,之后计算平均、最大值、最小值、求和取出来再算就可以了,事件id是为了在zset中存储时防重,另外也方便找到原始数据。

优点:直接从zset中获取计算字段,可以独立手动过期删除。

缺点:value存和取需要设计,数据冗余大。

zset+hash

zset作为时间窗口,value为${事件id},score为时间戳,这里没有变化,另外需要hash存储对应事件下需要计算字段的数据。

优点:hash可以存储多项数据,数据冗余少。

缺点:无法确定事件过期删除时间,每次需要多步查询。

指标计算与查询

指标计算可以简单梳理如下。

以下根据此流程分析,其实编程(也不全是指编程)有趣的地方是设计和实现的过程。

模版方法

设计模式-模版方法可以在这应用,从上面的流程并结合指标的分类来分析,主流程中的判断指标状态、指标条件、获取当前时间戳、获取redis数据设置过期、清理过期数据都是通用的,根据指标类型变化的只有添加事件这个步骤,所以定义模版抽象类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@Setter
@Getter
@Slf4j
public abstract class AbstractIndicator {

/**
* 指标
*/
protected IndicatorPO indicator;

/**
* 指标类型
*/
protected final IndicatorType INDICATOR_TYPE;

/**
* redisson客户端
*/
protected final RedissonClient redissonClient;

protected AbstractIndicator(IndicatorType indicatorType, RedissonClient redissonClient) {
INDICATOR_TYPE = indicatorType;
this.redissonClient = redissonClient;
}

/**
* 获取指标类型
*
* @return 指标类型
*/
public Integer getType() {
return INDICATOR_TYPE.getCode();
}

/**
* 获取指标状态
*
* @return true/false
*/
public boolean getStatus() {
return indicator.getStatus();
}

/**
* 指标过滤
*
* @return true/false
*/
public boolean filter(Map<String, String> eventDetail) {
// 1、主属性、从属性不为空
if (indicator.getMasterField() != null && eventDetail.get(indicator.getMasterField()) != null) {
if (indicator.getSlaveFields() != null) {
String[] split = indicator.getSlaveFields().split(",");
for (String s : split) {
if (eventDetail.get(s) == null) {
return false;
}
}
// 2、过滤脚本
if (indicator.getFilterScript() == null) {
return true;
} else {
// TODO 脚本过滤
return true;
}
}
}

return false;
}

/**
* 获取redis key
*
* @param eventDetail 事件详情
* @return redis key
*/
public String getRedisKey(Map<String, String> eventDetail) {
return RedisKeys.INDICATOR + indicator.getId() + ":" + INDICATOR_TYPE.getName() + ":" + eventDetail.get(indicator.getMasterField()) + "-" + eventDetail.get(indicator.getSlaveFields());
}

/**
* 获取计算指标结果
*
* @param currentTime 当前时间戳
* @param set redis set
* @return 计算指标结果
*/
public abstract BigDecimal getResult(long currentTime, RScoredSortedSet<String> set);

/**
* 获取计算指标结果
*
* @param eventDetail 事件详情
* @return 计算指标结果
*/
public BigDecimal getResult(Map<String, String> eventDetail) {
// 1、获取当前时间戳
long currentTime = System.currentTimeMillis();
// 2、获取redis中数据
RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(getRedisKey(eventDetail));
// 3、清理过期数据
if ("last".equals(indicator.getWinType())) {
set.removeRangeByScore(-1, true, currentTime - Duration.ofSeconds(indicator.getTimeSlice()).toMillis(), false);

} else {
set.removeRangeByScore(-1, true, calculateEpochMilli(LocalDateTime.now()), false);
}

return getResult(currentTime, set);
}

/**
* 计算指标
*
* @param indicator 指标
* @param eventDetail 事件详情
*/
public void compute(IndicatorPO indicator, Map<String, String> eventDetail) {
if (indicator == null) {
return;
} else {
this.indicator = indicator;
}
// 1、状态检查和过滤
if (getStatus() && filter(eventDetail)) {

// 2、获取当前时间戳
long currentTime = System.currentTimeMillis();
// 3、获取redis中数据
log.info("redisKey:{}", getRedisKey(eventDetail));
RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(getRedisKey(eventDetail));
if ("last".equals(this.indicator.getWinType())) {
set.expire(Duration.ofSeconds(this.indicator.getTimeSlice() * this.indicator.getWinCount()));
} else {
set.expire(Duration.ofSeconds(this.indicator.getTimeSlice()));
}

// 4、添加事件
addEvent(currentTime, set, eventDetail);

// 5、清理过期数据
cleanExpiredDate(currentTime, set);
}

}

/**
* 添加事件
*
* @param currentTime 当前时间戳
* @param set redis set
* @param eventDetail 事件详情
*/
public abstract void addEvent(long currentTime, RScoredSortedSet<String> set, Map<String, String> eventDetail);

/**
* 清理过期数据
*
* @param currentTime 当前时间戳
* @param set redis set
*/
public void cleanExpiredDate(long currentTime, RScoredSortedSet<String> set) {
if ("last".equals(indicator.getWinType())) {
set.removeRangeByScore(-1, true, currentTime - Duration.ofSeconds(indicator.getTimeSlice()).toMillis(), false);
} else {
set.removeRangeByScore(-1, true, calculateEpochMilli(LocalDateTime.now()), false);
}
}

/**
* 计算时间戳
*
* @param now 当前时间
* @return 时间戳
*/
public long calculateEpochMilli(LocalDateTime now) {
ZoneId zoneId = ZoneId.systemDefault();
// 这个default分支仅处理WindowSize枚举中未包含的情况
return switch (indicator.getWinSize()) {
case "M" -> now.withDayOfMonth(1).with(LocalTime.MIN).atZone(zoneId).toInstant().toEpochMilli();
case "d" -> now.with(LocalTime.MIN).atZone(zoneId).toInstant().toEpochMilli();
case "H" -> now.withMinute(0).withSecond(0).withNano(0).atZone(zoneId).toInstant().toEpochMilli();
case "m" -> now.withSecond(0).withNano(0).atZone(zoneId).toInstant().toEpochMilli();
case "s" -> now.withNano(0).atZone(zoneId).toInstant().toEpochMilli();
default -> throw new IllegalArgumentException("Unsupported window size: " + indicator.getWinSize());
};
}
}

次数统计指标

因为篇幅原因,这里只贴次数统计指标实现类了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @author wnhyang
* @date 2024/3/11
**/
@Component
public class CountIndicator extends AbstractIndicator {

public CountIndicator(RedissonClient redissonClient) {
super(IndicatorType.COUNT, redissonClient);
}

@Override
public BigDecimal getResult(long currentTime, RScoredSortedSet<String> set) {

return BigDecimal.valueOf(set.size());
}

@Override
public void addEvent(long currentTime, RScoredSortedSet<String> set, Map<String, String> eventDetail) {

set.add(currentTime, eventDetail.get("seqId"));
}

}

总结

先到这里吧,还有其他内容到下次吧。

写在最后

拙作艰辛,字句心血,望诸君垂青,多予支持,不胜感激。


个人博客:无奈何杨(wnhyang)

个人语雀:wnhyang

共享语雀:在线知识共享

Github:wnhyang - Overview