基于LiteFlow的风控系统指标版本控制

个人博客:无奈何杨(wnhyang)

个人语雀:wnhyang

共享语雀:在线知识共享

Github:wnhyang - Overview


更新日志

最近关于https://github.com/wnhyang/coolGuard此项目更新了如下内容:https://github.com/wnhyang/coolGuard/commits/main/

image

继前文GeoHash后项目又有了一些更新主要有:

1、【一般】增加了新的指标类型,历史取值。

2、【重要】更改表的关联关系,使用唯一索引而不是自增id。

这里很有必要展开一下,之前的所有表关系都是通过数据库自增id关联的,这有很大的弊端,尤其影响之后的计划(如:策略/规则/指标等导入导出,chain版本控制等),所以这个变化很有必要。

3、【一般】增加缓存删除,增加了一些场景的缓存删除,保障缓存-数据库一致性。

4、【一般】增加基础参数查询,主要包括一些常量枚举的查询,如指标类型、字段类型、逻辑操作等等。

5、【一般】完善注解校验。

6、【重要】增加指标版本控制。

如题,本篇文章将围绕指标版本控制详细展开,匆匆做完,就来分享,如有问题请指正!

版本控制

为什么要版本控制?

简单来说就几个重点:追踪变更、恢复与回滚、CICD、数据完整性。。。

常见的版本控制有哪些方法呢?

最常用的方法“主表+历史表”的设计,已经在《风控系统之事件溯源,决策流程记录与版本控制》中提到,在这次将其详细展开,通过实践检验真理。

主表+历史表的设计,就算没有了解过,听名字也大概知道怎么做了。

无非就是主表存储的是最新的、有效的数据记录。通常情况下,主表中会包含业务关键字段以及一些基本的元数据信息(如创建时间、最后修改时间等)。

历史表则用来保存所有历史版本的数据记录。每当主表中的数据发生变化时,旧的数据会被复制到历史表中,以便长期保存。

但是如果细心思考的话,其实还可以将其分为两种模式。

1、主表即运行

主表即当下运行,即running,历史表即historyversion

下图画的也不是很好(其中chain的变化没有体现出来),大概意思是:规则引擎运行的永远都是主表的数据,历史纯粹是历史,是用来回溯恢复的,历史表数据会和主表差一个版本。

这样设计完全没问题,可以满足基本的版本控制需要。

如果非要缺点可能就是主表一定要是确认的运行流程,只能存在终态。可以加上开关表示运行与停止,状态切换不记入历史表,其他任意的修改都需要生成新的版本(可以使用hash算法对比本次修改是否真的有变化)。

image

2、历史既是历史也是运行

主表表示编辑区,即edit,历史表才是真正的运行区,当然同时也是历史,即historyrunningversion

这样的设计会相对复杂一点,除了基本的新增、修改、删除,还会有发版、下线。本来也想尝试画图的,但发现好像达不到我要的效果,放弃了😂

但还是要补充说明一下,主表和历史表都需要一个状态标识,而且意义不同。主表的状态标识有两种p0p1p0标识编辑中,与运行状态不一致,发版后历史表创建新记录并运行;p1表示与运行状态一致不可发版,只能修改后再提交。历史表状态标识也有两种,0是历史,即真正历史表的作用,保留过去版本数据;1是当下运行状态。

新增、编辑、删除只是操作主表,下面直接引用项目代码了。

这里与上面最大的区别就是不用操作历史表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
@Transactional(rollbackFor = Exception.class)
@CacheEvict(value = RedisKey.INDICATOR, allEntries = true)
public Long createIndicator(IndicatorCreateVO createVO) {
// 1、指标名重复抛异常
if (indicatorMapper.selectByName(createVO.getName()) != null) {
throw exception(INDICATOR_NAME_EXIST);
}
// 2、变换并插入主表
Indicator indicator = IndicatorConvert.INSTANCE.convert(createVO);
indicator.setCode(IdUtil.fastSimpleUUID());
indicator.setReturnType(IndicatorUtil.getReturnType(indicator.getType(), indicator.getCalcField()));
indicator.setTimeSlice(WinSize.getWinSizeValue(indicator.getWinSize()));
indicator.setCondStr(JsonUtils.toJsonString(createVO.getCond()));
indicatorMapper.insert(indicator);
return indicator.getId();
}

@Override
@Transactional(rollbackFor = Exception.class)
@CacheEvict(value = RedisKey.INDICATOR, allEntries = true)
public void updateIndicator(IndicatorUpdateVO updateVO) {
// 1、校验存在和name重复
// TODO 校验存在和name
// 2、hash确认是否真的有更改
// TODO hash确认是否真的有更改
Indicator indicator = IndicatorConvert.INSTANCE.convert(updateVO);
indicator.setReturnType(IndicatorUtil.getReturnType(indicator.getType(), indicator.getCalcField()));
indicator.setCondStr(JsonUtils.toJsonString(updateVO.getCond()));
// 注意:状态只能变为fase即与运行区不一致,可以提交,不然没有做修改就是和运行区一致,不可提交
indicator.setStatus(Boolean.FALSE);
indicatorMapper.updateById(indicator);
}

@Override
@Transactional(rollbackFor = Exception.class)
@CacheEvict(value = RedisKey.INDICATOR, allEntries = true)
public void deleteIndicator(Long id) {
// 1、检查存在
Indicator indicator = indicatorMapper.selectById(id);
// 2、确认此指标是否在运行,运行不可删除,
// 感觉有点没必要了
IndicatorVersion indicatorVersion = indicatorVersionMapper.selectRunningByCode(indicator.getCode());
if (indicatorVersion != null) {
throw exception(INDICATOR_IS_RUNNING);
}
indicatorMapper.deleteById(id);
}

提交版本

提交版本会复杂一些,同时操作了指标、指标历史、chain表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
@Transactional(rollbackFor = Exception.class)
public BatchVersionSubmitResultVO submit(VersionSubmitVO submitVO) {
BatchVersionSubmitResultVO result = new BatchVersionSubmitResultVO().setId(submitVO.getId());
// 提交后就同时在version表中增加一条记录,表示该指标在运行状态
// 检查指标存在
Indicator indicator = indicatorMapper.selectById(submitVO.getId());
if (ObjectUtil.isNull(indicator)) {
throw exception(INDICATOR_NOT_EXIST);
}
// 校验当前提交是不是和运行区一致,一致无需提交
if (indicator.getStatus()) {
throw exception(INDICATOR_VERSION_EXIST);
}
// 1、更新当前指标为已提交
indicatorMapper.updateById(new Indicator().setId(submitVO.getId()).setStatus(Boolean.TRUE));
// 2、查询是否有已运行的,有版本+1,没有版本1
IndicatorVersion indicatorVersion = indicatorVersionMapper.selectRunningByCode(indicator.getCode());
int version = 1;
if (indicatorVersion != null) {
version = indicatorVersion.getVersion() + 1;
// 关闭已运行的
indicatorVersionMapper.updateById(new IndicatorVersion().setId(indicatorVersion.getId()).setStatus(Boolean.FALSE));
}
// 3、插入新纪录并加入chain
IndicatorVersion convert = IndicatorVersionConvert.INSTANCE.convert(indicator);
convert.setVersion(version);
convert.setVersionDesc(submitVO.getVersionDesc());
convert.setStatus(Boolean.TRUE);
indicatorVersionMapper.insert(convert);
// 4、更新chain
String iChain = StrUtil.format(LFUtil.INDICATOR_CHAIN, indicator.getCode());
// 构造指标el
String condEl = LFUtil.buildCondEl(convert.getCondStr());
if (chainMapper.selectByChainName(iChain)) {
Chain chain = chainMapper.getByChainName(iChain);
chain.setElData(StrUtil.format(LFUtil.IF_EL, condEl,
LFUtil.INDICATOR_TRUE_COMMON_NODE,
LFUtil.INDICATOR_FALSE_COMMON_NODE));
chainMapper.updateById(chain);
} else {
chainMapper.insert(new Chain().setChainName(iChain).setElData(StrUtil.format(LFUtil.IF_EL, condEl,
LFUtil.INDICATOR_TRUE_COMMON_NODE,
LFUtil.INDICATOR_FALSE_COMMON_NODE)));
}
result.setSuccess(Boolean.TRUE);
return result;
}

下线

下线相对简单

1
2
3
4
5
6
7
@Override
@Transactional(rollbackFor = Exception.class)
public void offline(Long id) {
IndicatorVersion indicatorVersion = indicatorVersionMapper.selectById(id);
indicatorVersionMapper.updateById(new IndicatorVersion().setId(id).setStatus(Boolean.FALSE));
chainMapper.deleteByChainName(StrUtil.format(LFUtil.INDICATOR_CHAIN, indicatorVersion.getCode()));
}

查编辑区和运行区就不用展示了。

LiteFlow流程变化

我前面提到的最近更新标注了【重要】的真的很关键,没错说的就是“更改表的关联关系,使用唯一索引而不是自增id”。没有这次改造也不会有这篇指标版本控制。

因为我的LiteFlow流程设计中,所有指标运行的核心EL是这样的FOR(i_fn).parallel(true).DO(i_cn);使用的是异步次数循环,对于我只需要更改i_fn组建循环的部分就可以完成从主表表示运行到历史表表示运行的切换。

如图,只是修改了查询当前运行指标的代码。

image

关于项目进度

我是如何测试的?

如果你看过项目代码,发现极少的单元测试,确实,测试这部分很不完善,仅有的就是通过apifox同步的接口,运行几个接口保存为测试集合,改完代码后通常要做这些事情:1、Reformat Code,必须要格式化一下,强迫症受不了;2、mvn clean install,因为使用mapstruct等其他插件,为了避免依赖的一些问题;3、如果有接口更新,同步一下apifox;4、运行apifox测试集合;5、针对修改的点mock数据发一下接口;6、如果有更改表结构,或是必要数据,导出sql到项目替换;7、确认没问题后,commit-push

git代码分支,为什么都在main上?

项目还未发版,而且没有协作者,自己随意了一些。

下一步?

乘胜追击,指标版本控制做了,下一步策略、规则版本控制。

至于发版,我感觉遥遥无期🤔

写在最后

拙作艰辛,字句心血,望诸君垂青,多予支持,不胜感激。


个人博客:无奈何杨(wnhyang)

个人语雀:wnhyang

共享语雀:在线知识共享

Github:wnhyang - Overview