issue链接: https://github.com/GreptimeTeam/greptimedb/issues/5566
一、要做的事情
需要在元数据表 information_schema.flows 中新增以下字段,以增强 Dashboard 对工作流(Flow)的管理能力:
- create_time:记录 Flow 的创建时间(必需字段)
- update_time:记录 Flow 最近一次配置更新时间(如字段修改、规则调整)
- last_execution_time(可选):记录 Flow 最近一次执行时间
- source_table_names:以可读形式展示 Flow 的源表名称(当前仅存储了目标表名称)
二、如何进行测试?
Flow的相关文档:https://docs.greptime.cn/user-guide/flow-computation/overview
1 | create database test; |
三、代码实现
- 找到对应的内部表的schema定义的代码;参考其他的列定义,新增需要的列定义;
src/catalog/src/system_schema/information_schema/flows.rs:60
- 参考其他的列定义,增加吐行的逻辑,其中table_names应该可以通过schema来获取;
- 读取时依赖了
FlowInfoValue
这个结构,需要对其增加三个time的成员变量; - 在创建flow的时候,给三个变量赋值,last_execution_time置为0,create 和 update使用now()
- 经过不懈努力终于找到了一个疑似flow处理的地方:
FlowServiceOperator::flush_inner
,这里是调用flush的处理的地方,需要更新一次last_execution_time - 不断排查,终于找到了处理insert的地方,在
src/operator/src/insert.rs
文件中有一个FlowMirrorTask
,会同步flow的数据,这里需要更新一次last_execution_time - 在flows输出数据的时候,需要通过source table ids 拿到对应的table_name即可实现上述问题;
四、代码提交
1 | cargo fmt |