1、应用场景分析

参考徐崴老师Flink项目

  • 数据清洗【实时ETL】
  • 数据报表

1.1、数据清洗【实时ETL】

image-20220908140816281

1.1.1、需求分析

针对算法产生的日志数据进行清洗拆分

  1. 算法产生的日志数据是嵌套大JSON格式(json嵌套json),需要拆分打平
  2. 针对算法中的国家字段进行大区转换
  3. 最后把不同类型的日志数据分别进行存储

1.1.2、架构图

image-20220908140844463

1.2、新建项目工程

创建一个大的项目,用Maven来维护

image-20220908141208109

每个flink需求,即job创建一个 ”Module”

image-20220908141242828

第一步:

image-20220908141253692

image-20220908141305419

第二步: 在工程项目pom.xml中添加依赖管理,在其中管理各个需求子项目的依赖版本

image-20220908141317769

在依赖管理中添加工程项目相关依赖:

image-20220908141335011

第三步: 在DataClean module中的pom.xml中添加依赖(不需要加入版本):

image-20220908141351196

2、DataClean代码开发

开发介绍采用的是 Java 代码实现的,完整工程代码及 Scala 代码的实现详见底部 GitHub 代码地址

2.1、MyRedisSource实现

功能: 自定义 Redis Source 由于存储的是 国家大区和编码的映射关系 类似于 k-v ,所以返回 HashMap 格式比较好

在 Redis 中保存的国家和大区的关系 Redis中进行数据的初始化,数据格式:         Hash      大区      国家         hset areas;   AREA_US    US         hset areas;   AREA_CT    TW,HK         hset areas    AREA_AR   PK,SA,KW         hset areas    AREA_IN    IN 需要把大区和国家的对应关系组装成 java 的 hashmap

代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package henry.flink.customSource;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author: Henry
 * @Description: 自定义 Redis Source
 *              由于存储的是 国家大区和编码的映射关系
 *              类似于 k-v ,所以返回 HashMap 格式比较好
 * 需要把大区和国家的对应关系组装成 java 的 hashmap
 *
 * @Date: Create in 2019/5/25 18:12
 **/

public class MyRedisSource implements SourceFunction<HashMap<String,String>>{
    private Logger logger = LoggerFactory.getLogger(MyRedisSource.class);

    private final long SLEEP_MILLION = 60000 ;

    private boolean isrunning = true;
    private Jedis jedis = null;

    public void run(SourceContext<HashMap<String, String>> ctx) throws Exception {

        this.jedis = new Jedis("master", 6379);
        //  存储所有国家和大区的对应关系
        HashMap<String, String> keyValueMap = new HashMap<String, String>();
        while (isrunning){
            try{
                //  每次执行前先清空,去除旧数据
                keyValueMap.clear();
                //  取出数据
                Map<String, String> areas = jedis.hgetAll("areas");
                //  进行迭代
                for (Map.Entry<String, String> entry : areas.entrySet()){
                    String key = entry.getKey();      //  大区:AREA_AR
                    String value = entry.getValue();  //  国家:PK,SA,KW
                    String[] splits = value.split(",");
                    for (String split : splits){
                        //  这里 split 相当于key, key 是 value
                        keyValueMap.put(split, key); // 即 PK,AREA_AR
                    }
                }
                //  防止取到空数据
                if(keyValueMap.size() > 0){
                    ctx.collect(keyValueMap);
                }
                else {
                    logger.warn("从Redis中获取到的数据为空!");
                }
                //  一分钟提取一次
                Thread.sleep(SLEEP_MILLION);
            }
            // 捕获 Jedis 链接异常
            catch (JedisConnectionException e){
                //  重新获取链接
                jedis = new Jedis("master", 6379);
                logger.error("Redis链接异常,重新获取链接", e.getCause());
            }// 捕获其他异常处理,通过日志记录
            catch (Exception e){
                logger.error("Source数据源异常", e.getCause());
            }
        }
    }

    /**
     *  任务停止,设置 false
     * */
    public void cancel() {
        isrunning = false;
        // 这样可以只获取一次连接在while一直用
        if(jedis != null){
            jedis.close();
        }
    }
}

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384

2.2、DataClean实现

主要代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//  指定 Kafka Source
 String topic = "allData";
 Properties prop = new Properties();
 prop.setProperty("bootstrap.servers", "master:9092");
 prop.setProperty("group.id", "con1");
 FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>(
         topic, new SimpleStringSchema(),prop);

 //  获取 Kafka 中的数据,Kakfa 数据格式如下:
 //  {"dt":"2019-01-01 11:11:11", "countryCode":"US","data":[{"type":"s1","score":0.3},{"type":"s1","score":0.3}]}
 DataStreamSource<String> data = env.addSource(myConsumer);    // 并行度根据 kafka topic partition数设定

 //  mapData 中存储最新的国家码和大区的映射关系
 DataStream<HashMap<String,String>> mapData = env.addSource(new MyRedisSource())
         .broadcast();    //  可以把数据发送到后面算子的所有并行实际例中进行计算,否则处理数据丢失数据

 //  通过 connect 方法将两个数据流连接在一起,然后再flatMap
 DataStream<String> resData = data.connect(mapData).flatMap(
                             //参数类型代表:  data ,    mapData         , 返回结果; Json
          new CoFlatMapFunction<String, HashMap<String, String>, String>() {
             //  存储国家和大区的映射关系
             private HashMap<String, String> allMap = new HashMap<String, String>();

             //  flatMap1 处理 Kafka 中的数据
             public void flatMap1(String value, Collector<String> out)
                     throws Exception {
                 //  原数据是 Json 格式
                 JSONObject jsonObject = JSONObject.parseObject(value);
                 String dt = jsonObject.getString("dt");
                 String countryCode = jsonObject.getString("countryCode");
                 //  获取大区
                 String area = allMap.get(countryCode);
                 //  迭代取数据,jsonArray每个数据都是一个jsonobject
                 JSONArray jsonArray = jsonObject.getJSONArray("data");
                 for (int i = 0; i < jsonArray.size(); i++) {
                     JSONObject jsonObject1 = jsonArray.getJSONObject(i);
                     System.out.println("areas : -  " + area);
                     jsonObject1.put("area", area);
                     jsonObject1.put("dt", dt);
                     out.collect(jsonObject1.toJSONString());
                 }
             }

             //  flatMap2 处理 Redis 返回的 map 类型的数据
             public void flatMap2(HashMap<String, String> value, Collector<String> out)
                     throws Exception {
                 this.allMap = value;
             }
         });

 String outTopic = "allDataClean";
 Properties outprop= new Properties();
 outprop.setProperty("bootstrap.servers", "master:9092");
 //设置事务超时时间
 outprop.setProperty("transaction.timeout.ms",60000*15+"");

 FlinkKafkaProducer011<String> myproducer = new FlinkKafkaProducer011<>(outTopic,
         new KeyedSerializationSchemaWrapper<String>(
                 new SimpleStringSchema()), outprop,
         FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
 resData.addSink(myproducer);
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061

3、实践运行

3.1、Redis

启动redis:

  1. 先从一个终端启动redis服务
1
2
./redis-server
1

image-20220908141422812   2. 先从一个终端启动redis客户端,并插入数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
./redis-cli
127.0.0.1:6379> hset areas   AREA_US   US
(integer) 1
127.0.0.1:6379> hset areas   AREA_CT   TW,HK
(integer) 1
127.0.0.1:6379> hset areas   AREA_AR   PK,SA,KW
(integer) 1
127.0.0.1:6379> hset areas   AREA_IN   IN
(integer) 1
127.0.0.1:6379>
12345678910

hgetall查看插入数据情况:

image-20220908141436349

3.2、Kafka

启动kafka:

1
2
./kafka-server-start.sh -daemon ../config/server.properties
1

jps查看启动进程:

image-20220908141452611

kafka创建topc:

1
2
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic allData
1

创建topic成功:

image-20220908141509418

监控kafka topic:

1
2
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic allDataClean
1

image-20220908141527273

3.3、启动程序

先启动 DataClean 程序,再启动生产者程序,kafka生产者产生数据如下:

image-20220908141540163

最后终端观察处理输出的数据: image-20220908141703126 DataStream<HashMap<String,String» mapData = env.addSource(new MyRedisSource()).broadcast();

1

运行结果

image-20220908141717041

控制台打印结果: image-20220908141729357 3.4、Flink yarn集群启动

向yarn提交任务:

1
2
./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c henry.flink.DataClean /root/flinkCode/DataClean-1.0-SNAPSHOT-jar-with-dependencies.jar
1

任务成功运行启动: image-20220908141742978 通过 yarn UI 查看任务,并进入Flink job: image-20220908141812108 程序中设置的并行度: image-20220908141824800 启动kafka生产者: image-20220908141841606 监控topic消费情况:

1
2
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic allDataClean
1

最终终端输出结果, 同IDEA中运行结果:

image-20220908141854648

转:https://blog.csdn.net/hongzhen91/article/details/90549079