Flink系列-第20讲:Flink 高级应用之海量数据高效去重
本课时我们主要讲解 Flink 中的海量数据高效去重。
消除重复数据是我们在实际业务中经常遇到的一类问题。在大数据领域,重复数据的删除有助于减少存储所需要的存储容量。而且在一些特定的业务场景中,重复数据是不可接受的,例如,精确统计网站一天的用户数量、在事实表中统计每天发出的快递包裹数量。在传统的离线计算中,我们可以直接用 SQL 通过 DISTINCT 函数,或者数据量继续增加时会用到类似 MapReduce 的思想。那么在实时计算中,去重计数是一个增量和长期的过程,并且不同的场景下因为效率和精度问题方案也需要变化。
针对上述问题,我们在这里列出几种常见的 Flink 中实时去重方案:
-
基于状态后端
-
基于 HyperLogLog
-
基于布隆过滤器(BloomFilter)
-
基于 BitMap
-
基于外部数据库
下面我们依次讲解上述几种方案的适用场景和实现原理。
基于状态后端
我们在第 09 课时“Flink 状态与容错”中曾经讲过 Flink State 状态后端的概念和原理,其中状态后端的种类之一是 RocksDBStateBackend。它会将正在运行中的状态数据保存在 RocksDB 数据库中,该数据库默认将数据存储在 TaskManager 运行节点的数据目录下。
RocksDB 是一个 K-V 数据库,我们可以利用 MapState 进行去重。这里我们模拟一个场景,计算每个商品 SKU 的访问量。
整体的实现代码如下:
public class MapStateDistinctFunction extends KeyedProcessFunction<String,Tuple2<String,Integer>,Tuple2<String,Integer>> {
private transient ValueState<Integer> counts;
@Override
public void open(Configuration parameters) throws Exception {
//我们设置 ValueState 的 TTL 的生命周期为24小时,到期自动清除状态
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//设置 ValueState 的默认值
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
descriptor.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getState(descriptor);
super.open(parameters);
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">processElement</span><span class="hljs-params">(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out)</span> <span class="hljs-keyword">throws</span> Exception </span>{
String f0 = value.f0;
<span class="hljs-comment">//如果不存在则新增</span>
<span class="hljs-keyword">if</span>(counts.value() == <span class="hljs-keyword">null</span>){
counts.update(<span class="hljs-number">1</span>);
}<span class="hljs-keyword">else</span>{
<span class="hljs-comment">//如果存在则加1</span>
counts.update(counts.value()+<span class="hljs-number">1</span>);
}
out.collect(Tuple2.of(f0, counts.value()));
}
}
在上面的这段代码里,我们实现了这样的逻辑:定义了一个 MapStateDistinctFunction 类,该类继承了 KeyedProcessFunction。核心的处理逻辑在 processElement 方法中,当一条数据经过,我们会在 MapState 中判断这条数据是否已经存在,如果不存在那么计数为 1,如果存在,那么在原来的计数上加 1。
这里需要注意的是,我们自定义了状态的过期时间是 24 小时,在实际生产中大量的 Key 会使得状态膨胀,我们可以对存储的 Key 进行处理。例如,使用加密方法把 Key 加密成几个字节进再存储。
基于 HyperLogLog
HyperLogLog 是一种估计统计算法,被用来统计一个集合中不同数据的个数,也就是我们所说的去重统计。HyperLogLog 算法是用于基数统计的算法,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2 的 64 方个不同元素的基数。HyperLogLog 适用于大数据量的统计,因为成本相对来说是更低的,最多也就占用 12KB 内存。
我们在不需要 100% 精确的业务场景下,可以使用这种方法进行统计。首先新增依赖:
<dependency>
<groupId>net.agkn</groupId>
<artifactId>hll</artifactId>
<version>1.6.0</version>
</dependency>
我们还是以上述的商品 SKU 访问量作为业务场景,数据格式为 <SKU, 访问的用户 id>:
public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> {
@Override
public HLL createAccumulator() {
return new HLL(14, 5);
}
@Override
public HLL add(Tuple2<String, Long> value, HLL accumulator) {
//value 为访问记录 <商品sku, 用户id>
accumulator.addRaw(value.f1);
return accumulator;
}
@Override
public Long getResult(HLL accumulator) {
long cardinality = accumulator.cardinality();
return cardinality;
}
@Override
public HLL merge(HLL a, HLL b) {
a.union(b);
return a;
}
}
在上面的代码中,addRaw 方法用于向 HyperLogLog 中插入元素。如果插入的元素非数值型的,则需要 hash 过后才能插入。accumulator.cardinality() 方法用于计算 HyperLogLog 中元素的基数。
需要注意的是,HyperLogLog 并不是精准的去重,如果业务场景追求 100% 正确,那么一定不要使用这种方法。
基于布隆过滤器(BloomFilter)
BloomFilter(布隆过滤器)类似于一个 HashSet,用于快速判断某个元素是否存在于集合中,其典型的应用场景就是能够快速判断一个 key 是否存在于某容器,不存在就直接返回。
需要注意的是,和 HyperLogLog 一样,布隆过滤器不能保证 100% 精确。但是它的插入和查询效率都很高。
我们可以在非精确统计的情况下使用这种方法:
public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {
private transient ValueState<BloomFilter> bloomState;
private transient ValueState<Long> countState;
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">processElement</span><span class="hljs-params">(String value, Context ctx, Collector<Long> out)</span> <span class="hljs-keyword">throws</span> Exception </span>{
BloomFilter bloomFilter = bloomState.value();
Long skuCount = countState.value();
<span class="hljs-keyword">if</span>(bloomFilter == <span class="hljs-keyword">null</span>){
BloomFilter.create(Funnels.unencodedCharsFunnel(), <span class="hljs-number">10000000</span>);
}
<span class="hljs-keyword">if</span>(skuCount == <span class="hljs-keyword">null</span>){
skuCount = <span class="hljs-number">0L</span>;
}
<span class="hljs-keyword">if</span>(!bloomFilter.mightContain(value)){
bloomFilter.put(value);
skuCount = skuCount + <span class="hljs-number">1</span>;
}
bloomState.update(bloomFilter);
countState.update(skuCount);
out.collect(countState.value());
}
}
我们使用 Guava 自带的 BloomFilter,每当来一条数据时,就检查 state 中的布隆过滤器中是否存在当前的 SKU,如果没有则初始化,如果有则数量加 1。
基于 BitMap
上面的 HyperLogLog 和 BloomFilter 虽然减少了存储但是丢失了精度, 这在某些业务场景下是无法被接受的。下面的这种方法不仅可以减少存储,而且还可以做到完全准确,那就是使用 BitMap。
Bit-Map 的基本思想是用一个 bit 位来标记某个元素对应的 Value,而 Key 即是该元素。由于采用了 Bit 为单位来存储数据,因此可以大大节省存储空间。
假设有这样一个需求:在 20 亿个随机整数中找出某个数 m 是否存在其中,并假设 32 位操作系统,4G 内存。在 Java 中,int 占 4 字节,1 字节 = 8 位(1 byte = 8 bit)
如果每个数字用 int 存储,那就是 20 亿个 int,因而占用的空间约为 (2000000000*4/1024/1024/1024)≈7.45G
如果按位存储就不一样了,20 亿个数就是 20 亿位,占用空间约为 (2000000000/8/1024/1024/1024)≈0.233G
在使用 BitMap 算法前,如果你需要去重的对象不是数字,那么需要先转换成数字。例如,用户可以自己创造一个映射器,将需要去重的对象和数字进行映射,最简单的办法是,可以直接使用数据库维度表中自增 ID。
首先我们新增一个依赖:
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.8.0</version>
</dependency>
然后,我们还以商品的 SKU 的访问记录举例:
public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> Roaring64NavigableMap <span class="hljs-title">createAccumulator</span><span class="hljs-params">()</span> </span>{
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Roaring64NavigableMap();
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> Roaring64NavigableMap <span class="hljs-title">add</span><span class="hljs-params">(Long value, Roaring64NavigableMap accumulator)</span> </span>{
accumulator.add(value);
<span class="hljs-keyword">return</span> accumulator;
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> Long <span class="hljs-title">getResult</span><span class="hljs-params">(Roaring64NavigableMap accumulator)</span> </span>{
<span class="hljs-keyword">return</span> accumulator.getLongCardinality();
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> Roaring64NavigableMap <span class="hljs-title">merge</span><span class="hljs-params">(Roaring64NavigableMap a, Roaring64NavigableMap b)</span> </span>{
<span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;
}
}
在上述方法中,我们使用了 Roaring64NavigableMap,其是 BitMap 的一种实现,然后我们的数据是每次被访问的 SKU,把它直接添加到 Roaring64NavigableMap 中,最后通过 accumulator.getLongCardinality() 可以直接获取结果。
基于外部数据库
假如我们的业务场景非常复杂,并且数据量很大。为了防止无限制的状态膨胀,也不想维护庞大的 Flink 状态,我们可以采用外部存储的方式,比如可以选择使用 Redis 或者 HBase 存储数据,我们只需要设计好存储的 Key 即可。同时使用外部数据库进行存储,我们不需要关心 Flink 任务重启造成的状态丢失问题,但是有可能会出现因为重启恢复导致的数据多次发送,从而导致结果数据不准的问题。
总结
这一课时我们讲解了多种不同的 Flink 大数据下的去重方法,并且详细比较了它们的异同。在实际的业务场景中,精确去重和非精确去重需要灵活选择不同的方案,在准确性和效率上达到统一。
精选评论
**7162:
老师好,StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Listdata.add(new Tuple2data.add(new Tuple2data.add(new Tuple2data.add(new Tuple2DataStreamstream.keyBy(0).process(new MapStateDistinctFunction()) 这个行报错而这样下面这种重写就可以,是我用的不对吗stream.keyBy(0).process(new KeyedProcessFunction() {}
讲师回复:
keyBy算子后面的process中一定要是一个keyed stream。你的第一种写法中的MapStateDistinctFunction应该不是一个keyed stream。
**蜗牛:
您好,老师,请教个问题:位图如何解决哈希冲突问题?哪个布隆过滤器的实现不是建立在位图上解决哈希冲突的么???
讲师回复:
布隆过滤器有概率出现错误,他是根据hash函数算的。并不是严格的没有hash冲突。
**阳:
0807打卡:https://share.mubu.com/doc/3cFzd_p9p91 5种Flink海量数据高效去重的方法,面试应该可以用上
*帅:
@Overridepublic Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {return null;}
这里返回null 不会有问题吗?不会调用到这个方法么?Roaring64NavigableMap怎么合并?
讲师回复:
merge函数在使用session窗口的时候用到,因为需要合并窗口,这里不会被调用。
- 原文作者:知识铺
- 原文链接:https://index.zshipu.com/geek/post/bi/flink/2055-%E7%AC%AC20%E8%AE%B2Flink-%E9%AB%98%E7%BA%A7%E5%BA%94%E7%94%A8%E4%B9%8B%E6%B5%B7%E9%87%8F%E6%95%B0%E6%8D%AE%E9%AB%98%E6%95%88%E5%8E%BB%E9%87%8D/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。
- 免责声明:本页面内容均来源于站内编辑发布,部分信息来源互联网,并不意味着本站赞同其观点或者证实其内容的真实性,如涉及版权等问题,请立即联系客服进行更改或删除,保证您的合法权益。转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。也可以邮件至 sblig@126.com