图片

前言

在业务数据处理过程中,我们时常会遇到不同业务模块 / 存储系统间实时数据同步需求。比如, 报表模块依赖订单模块数据进行增量更新,检索引擎依赖业务数据进行实时同步等。针对这类场景,我们目前采用了Flink-CDC的技术方案用于数据同步。

Flink-CDC(CDC,全称是 Change Data Capture),是基于Apache Flink® 生态的数据源连接器,该连接器集成了Debezium引擎。其目标是为了用于监控和捕获数据变更,以便下游服务针对变更进行相应处理。基于CDC场景,比较成熟的解决方案还包括 Maxwell、Canal等 。

图片

方案对比

Flink - CDC(Debezium)/ Maxwell / Canal

图片

以上是我们进行的解决方案对比,可以看到相较于Maxwell、Canal,Flink-CDC在数据源支持上更为丰富,同时基于Apache Flink®生态,在数据同步任务的处理过程中,还提供了诸如Map、Filter、Union的丰富算子支持。

Flink-CDC ⽀持数据源、数据下游⽀持:

图片

图片

原理

以Mysql为例,Flink-CDC的底层原理实际上是基于读取数据库的Binlog日志,同时内置集成的Debezium引擎,会将Binlog日志解析为行级变更的数据结构。目前Flink-CDC支持 DataStream-API / SQL-API 两种实现进行CDC监控,以下我们主要以DataStream-API实现举例。

DataStream API实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class MySqlBinlogTask {

  public static void main(String[] args) throws Exception {
    # 定义数据源
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
            .hostname("host_name")
            .port("port")
            .databaseList("order_infos")
            .tableList(["order_infos.order_info_0", "order_infos.order_info_2"])
            .username("user_name")
            .password("user_pass")
            .startupOptions(StartupOptions.initial())
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction, "data_source_01");
    # 数据下游输出
    dataStreamSource.print(); 
    env.execute();
  }
}

到这⾥Flink-CDC的基础配置已经完成,启动后如果order_infos.order_info_0数据变更,那么程序就 可以监听到对应的变更信息了。⽰例代码的下游配置为标准输出流,实际在线上业务场景,可通过dataStreamSource.addSink()的⽅式对接下游服务,⽐如Kafka / RabbitMq / Elasticsearch等。基本操作 Debezium 格式化后的数据结构,包含 before、after、source、op、ts_ms 5个字段,含义 如下:

图片

INSERT⽰例:

{
    # New Row Data
    "after":{
        "order_id":"mock_oid_01",
        "shop_id":"mock_sid_01",
        "order_status":"CANCELLED",
        "payment_method":"Wallet V2",
        "total_amount":"8881.000000",
        "create_datetime":"2020-09-03T03:03:36Z",
        "update_datetime":"2022-04-08T02:23:12Z",
        "transaction_id":"a154111f857514b0"
    },
    # Metadata
    "source":{
        "version":"1.4.1.Final",
        "connector":"mysql",
        "name":"mysql_binlog_source",
        "ts_ms":"1649384718000",
        "db":"order_infos",
        "table":"order_info_01",
        "server_id":"225",
        "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
        "file":"mysql-bin.000025",
        "pos":"45950793",
        "row":"0",
        "thread":"27273"
    },
    "op":"c",
    "ts_ms":"1649384718067"
}

UPDATE⽰例:

{
    # Old Row Data
    "before":{
        "order_id":"mock_oid_01",
        "shop_id":"mock_sid_01",
        "order_status":"SHIPPING",
        "payment_method":"Wallet V2",
        "total_amount":"8881.000000",
        "create_datetime":"2020-09-03T03:03:36Z",
        "update_datetime":"2022-04-08T02:23:12Z",
        "transaction_id":"a154111f857514b0"
    },
    # New Row Data
    "after":{
        "order_id":"mock_oid_01",
        "shop_id":"mock_sid_01",
        "order_status":"CANCELLED",
        "payment_method":"Wallet V2",
        "total_amount":"8881.000000",
        "create_datetime":"2020-09-03T03:03:36Z",
        "update_datetime":"2022-04-08T02:23:12Z",
        "transaction_id":"a154111f857514b0"
    },
    # Metadata
    "source":{
        "version":"1.4.1.Final",
        "connector":"mysql",
        "name":"mysql_binlog_source",
        "ts_ms":"1649384718000",
        "db":"order_infos",
        "table":"order_info_01",
        "server_id":"225",
        "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
        "file":"mysql-bin.000025",
        "pos":"45950793",
        "row":"0",
        "thread":"27273"
    },
    "op":"u",
    "ts_ms":"1649384718067"
}

DELETE⽰例:

{
    # Old Row Data
    "before":{
        "order_id":"mock_oid_01",
        "shop_id":"mock_sid_01",
        "order_status":"SHIPPING",
        "payment_method":"Wallet V2",
        "total_amount":"8881.000000",
        "create_datetime":"2020-09-03T03:03:36Z",
        "update_datetime":"2022-04-08T02:23:12Z",
        "transaction_id":"a154111f857514b0"
    },
    # Metadata
    "source":{
        "version":"1.4.1.Final",
        "connector":"mysql",
        "name":"mysql_binlog_source",
        "ts_ms":"1649384718000",
        "db":"order_infos",
        "table":"order_info_01",
        "server_id":"225",
        "gtid":"d2f4fc13-9df2-11ec-a9f6-0242ac1f0002",
        "file":"mysql-bin.000025",
        "pos":"45950793",
        "row":"0",
        "thread":"27273"
    },
    "op":"d",
    "ts_ms":"1649384718067"
}

图片

实践

在多个数据源同步场景下,Flink提供了union算⼦⽅便进⾏多数据流的合并。 拓扑结构:

图片

示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class MySqlBinlogTask {

  public static void main(String[] args) throws Exception {
    # 定义多个数据源
    SourceFunction<String> sourceFunction01 = initMySQLSource(1)
    SourceFunction<String> sourceFunction02 = initMySQLSource(2)
    SourceFunction<String> sourceFunction03 = initMySQLSource(3)
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> dataStreamSource01 = env.addSource(sourceFunction01, "data_source_01");
    DataStreamSource<String> dataStreamSource02 = env.addSource(sourceFunction02, "data_source_02");
    DataStreamSource<String> dataStreamSource03 = env.addSource(sourceFunction03, "data_source_03");
    # 多数据流合并
    DataStream<String> dataStreams = dataStreamSource01.union(dataStreamSource02, dataStreamSource03);
    # 数据下游输出
    dataStreams.print(); 
    env.execute();
  }
}

4.2数据过滤&转换增加Filter算子进行异常数据过滤、增加Map算子进行数据格式转换。 拓扑结构:

图片

示例代码:

# 过滤异常数据
dataStreamSource.filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String value) throws Exception {
        if (value == null) {
            return false;
        }
        return true;
    }
});
# 数据转换
dataStreamSource.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
        return value.trim();
    }
});

增加addSink调用,配置数据流写入Kafka服务。

拓扑结构:

图片

示例代码:

1
2
3
4
5
6
7
8
9
# Kafka配置
String sinkTopicName = "order_infos_topic";
Properties sinkProperties = new Properties();
sinkProperties.setProperty("bootstrap.servers", bsServersSB.toString());

# 数据流写入Kafka
dataStreamSource.addSink(new FlinkKafkaProducer<String>(
        sinkTopicName, new SimpleStringSchema(), sinkProperties))
.name("write to kafka topic: " + sinkTopicName );

图片

总结

​ 到这里一个基本的Flink-CDC的数据同步逻辑就实现了。Flink-CDC方案,目前已落地生产环境并得到有效验证,日均千万级的数据同步,业务检索系统可达到秒级同步,报表数据可达到分钟级同步。

​ 当然,这其中也包含了基于生产环境更多因素优化。比如Flink任务基于窗口的数据合并,任务并行度配置等。

​ 后续,随着业务数据的增长,数据同步仍然会面临很多挑战,我们会持续优化并完善数据同步方案,也欢迎对数据同步 / ETL感兴趣的同学,可以提出您的建议共同学习交流。

图片

参考资料

https://github.com/ververica/flink-cdc-connectors

https://github.com/zendesk/maxwell

https://github.com/alibaba/canal

https://debezium.io/