京东搜索排序在线学习的优化实践
摘要:本文由京东搜索算法架构团队分享,主要介绍 Apache Flink 在京东商品搜索排序在线学习中的应用实践。文章的主要大纲如下:
一、背景
在京东的商品搜索排序中,经常会遇到搜索结果多样性不足导致系统非最优解的问题。为了解决数据马太效应带来的模型商品排序多样性的不足,我们利用基于二项式汤普森采样建模,但是该算法仍存在对所有用户采用一致的策略,未有效考虑用户和商品的个性化信息。基于该现状,我们采取在线学习,使深度学习和汤普森采样融合,实现个性化多样性排序方案,实时更新模型的关参数。
在该方案中,Flink 主要应用于实时样本的生成和 online learning 的实现。在在线学习过程中,样本是模型训练的基石,在超大规模样本数据的处理上,我们对比了 Flink、Storm 和 Spark Streaming 之后,最终选择用 Flink 作为实时样本流数据的生产以及迭代 online learning 参数的框架。在线学习的整体链路特别长,涉及在线端特征日志、流式特征处理、流式特征与用户行为标签关联、异常样本处理、模型动态参数实时训练与更新等环节,online learning 对样本处理和参数状态处理的准确性和稳定性要求较高,任何一个阶段都有可能出现问题,为此我们接入京东的 observer 体系,拥有完整的全链路监控系统,保证各个阶段数据的稳定性和完整性;下面我们首先介绍一下京东搜索在线学习架构。
二、京东搜索在线学习架构
京东搜索的排序模型系统架构主要包括以下几个部分:
- Predictor 是模型预估服务,在 load 模型中分为 static 部分和 dynamic 部分,static 部分由离线数据训练得到,主要学习 user 和 doc 的稠密特征表示,dynamic 部分主要包含 doc 粒度的权重向量,这部分由实时的 online learning 任务实时更新。
- Rank 主要包括一些排序策略,在排序最终结果确定之后,会实时落特征日志,将 doc 的特征按顺序写入特征数据流,作为后续实时样本的数据源(feature)。
- Feature Collector 的任务是承接在线预估系统发出的特征数据,对下游屏蔽缓存、去重、筛选等在线系统特有逻辑,产出 Query+Doc 粒度的特征流。
- Sample join 的任务将上面的 feature 数据、曝光、点击、加购、下单等用户行为标签数据作为数据源,通过 Flink 的 union + timer 数据模型关联成为符合业务要求的样本数据,算法可根据目标需求选择不同的标签作为正负样本标记。
- Online learning 任务负责消费上游生成的实时样本做训练,负责更新 model 的 dynamic 部分。
三、实时样本生成
Online Learning 对于在线样本生成的时效性和准确性都有很高的要求,同时也对作业的稳定性有很高的要求。在海量用户日志数据实时涌入的情况下,我们不仅要保证作业的数据延时低、样本关联率高且任务稳定,而且作业的吞吐不受影响、资源使用率达到最高。
京东搜索排序在线样本的主要流程如下:
- 数据源大致有曝光流、feature 流和用户行为流等作为实时样本的数据源,统一以 JDQ 管道流的形式,由京东实时计算平台提供平台支撑。
- 接到 feature 流和曝光流、label 流后,进行数据清洗,得到任务需要的数据格式。
- 拿到各个标准流后,对各个流进行 union 操作,之后进行 keyby。
- 我们在 process function 里面添加 Flink timer 定时器,作为样本生成的实时窗口。
- 将生成的样本实时落入 jdq 和 HDFS,jdq 可以用作后面的 online learning 的 input,HDFS 持久存储样本数据,用于离线训练、增量学习和数据分析。
在线样本任务优化实践:
京东搜索样本数据吞吐量每秒达到 GB 规模,对分布式处理分片、超大状态和异常处理提出很高的优化要求。
1、数据倾斜
使用 keyby 的时候,难免会有数据倾斜的情况,这里我们假设 key 设计合理、 shuffle 方式选择正确、任务没有反压且资源足够使用,由于任务 parallelism 设置导致的数据倾斜的情况。我们先看 Flink 里面 key 是如何被分发到 subtask 上面的。
keygroup = assignToKeyGroup(key, maxParallelism)subtaskNum =
computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroupId)
假设我们的并发设置的是 300,那么 maxParallelism 就是 512,如此设计,必然导致有的 subtask 分布 1 个 keygroup 有的分配两个,同时也导致了数据自然倾斜。针对上述问题,有两个解决方案:
- 设置并行度为 2 的 n 次方;
- 设置最大并行度为 并行度的 n 倍。
如果使用方案 1 ,调整并发的话只能调整 2 的幂次,建议使用方案 2,且假如 parallelism 为 300,maxParallelism 设置为 1200 的情况下假如数据还是有倾斜,可以再相应的把 maxParallelism 设置大一些保证每个 keygroup 的 key 少一些,如此也可以降低数据倾斜的发生。
2、large checkpoint
在线样本用到了 Flink 的 state,我们之前默认将 state 放到了内存里面,但是随着放量的增加,state 数据量激增,发现 GC 时间特别长,之后改变策略,将 state 放入了 RocksDB,GC 问题得以解决。我们针对 checkpoint 做了如下配置:
- 开启增量 checkpoint;
- 合理设置 checkpoint 的超时时间、间隔时间和最小暂停时间。
- 让 Flink 自己管理 RocksDB 占用的内存,对 RocksDB 的 blockcache、writebuffer 等进行调优。
- 优化 state 的数据使用,将 state 数据放入多个 state object 里面使用,降低序列化/反序列化的代价。
在任务调优的时候我们发现我们的任务访问 RocksDB 的时间非常长,查看 jstack 发现,很多线程都在等待数据的序列化和反序列化,随着算法特征的逐渐增多,样本中的特征个数超过 500 个,使得每条数据的量级越来越大。但是在做样本关联的时候其实是不需要特征关联的,只需要相应的主键关联就可以了,因此,我们用 ValueState 存储主键,用 MapState/ListState 存储特征等值。当然了还可以将这些特征值存储到外部存储里面,这里就需要对网络 io 和 本地 io 之间的选择做一个取舍。
- failure recovery 的时候开启本地恢复。
由于我们的 checkpoint 数据达到了 TB 级别,一旦任务发生 failover,不管是针对 HDFS 还是针对任务本身,压力都非常大,因此,我们优先使用本地进行 recovery,这样,不仅可以降低 HDFS 的压力还可以增加 recovery 的速度。
四、Flink Online Learning
对于 online learning,我们先介绍一下伯努利汤普森采样算法,假设每个商品的 reward 概率服从 Beta 分布,因此给每个商品维护两个参数成功次数 si 及失败次数 fi,及所有商品的公共先验参数成功次数 α 和失败次数 β。
每次根据商品相应的 Beta 分布采样为最优商品的期望 reward: Q(at) = θi,并选择期望 reward 最大的商品展现给用户。最后根据环境给出真实 reward,更新模型相应的参数达到 online learning 的效果。该参数代表一个商品特征,用一个 n 维向量表示,该向量由原始特征通过 MLP 网络预测得到。原始特征经过 DNN 网络得到一个 N 维向量作为该商品的个性化�
- 原文作者:知识铺
- 原文链接:https://index.zshipu.com/geek/post/%E4%BA%92%E8%81%94%E7%BD%91/%E4%BA%AC%E4%B8%9C%E6%90%9C%E7%B4%A2%E6%8E%92%E5%BA%8F%E5%9C%A8%E7%BA%BF%E5%AD%A6%E4%B9%A0%E7%9A%84%E4%BC%98%E5%8C%96%E5%AE%9E%E8%B7%B5/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。
- 免责声明:本页面内容均来源于站内编辑发布,部分信息来源互联网,并不意味着本站赞同其观点或者证实其内容的真实性,如涉及版权等问题,请立即联系客服进行更改或删除,保证您的合法权益。转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。也可以邮件至 sblig@126.com