【开源】增强greptime的flows内部表的功能

issue链接: https://github.com/GreptimeTeam/greptimedb/issues/5566

一、要做的事情

需要在元数据表 information_schema.flows 中新增以下字段,以增强 Dashboard 对工作流(Flow)的管理能力:

  • create_time:记录 Flow 的创建时间(必需字段)
  • update_time:记录 Flow 最近一次配置更新时间(如字段修改、规则调整)
  • last_execution_time​(可选):记录 Flow 最近一次执行时间
  • source_table_names:以可读形式展示 Flow 的源表名称(当前仅存储了目标表名称)

二、如何进行测试?

Flow的相关文档:https://docs.greptime.cn/user-guide/flow-computation/overview

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
create database test;
use test;

-- 创建source表
CREATE TABLE ngx_access_log (
ip_address STRING,
http_method STRING,
request STRING,
status_code INT16,
body_bytes_sent INT32,
user_agent STRING,
response_size INT32,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (ip_address, http_method, user_agent, status_code)
) WITH ('append_mode'='true');

-- 创建目标的sink表
CREATE TABLE user_agent_statistics (
user_agent STRING,
total_count INT64,
update_at TIMESTAMP,
__ts_placeholder TIMESTAMP TIME INDEX,
PRIMARY KEY (user_agent)
);

-- 创建flow
CREATE FLOW user_agent_flow
SINK TO user_agent_statistics
AS
SELECT
user_agent,
COUNT(user_agent) AS total_count
FROM
ngx_access_log
GROUP BY
user_agent;

-- 插入数据
INSERT INTO ngx_access_log
VALUES
('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'),
('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'),
('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'),
('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z');

-- 查看sink表数据
SELECT * FROM user_agent_statistics;

三、代码实现

  1. 找到对应的内部表的schema定义的代码;参考其他的列定义,新增需要的列定义;
    src/catalog/src/system_schema/information_schema/flows.rs:60
  2. 参考其他的列定义,增加吐行的逻辑,其中table_names应该可以通过schema来获取;
  3. 读取时依赖了FlowInfoValue这个结构,需要对其增加三个time的成员变量;
  4. 在创建flow的时候,给三个变量赋值,last_execution_time置为0,create 和 update使用now()
  5. 经过不懈努力终于找到了一个疑似flow处理的地方:FlowServiceOperator::flush_inner,这里是调用flush的处理的地方,需要更新一次last_execution_time
  6. 不断排查,终于找到了处理insert的地方,在src/operator/src/insert.rs文件中有一个FlowMirrorTask,会同步flow的数据,这里需要更新一次last_execution_time
  7. 在flows输出数据的时候,需要通过source table ids 拿到对应的table_name即可实现上述问题;

四、代码提交

1
2
cargo fmt
taplo format