铭毅天下Elasticsearch 出品

1、实战项目需求

需求1:有一个小需求

kafka源数据:

topicA:{"A_content":"XXX","name":"A","type":"XXX","id":1}
topicB:{"B_content":"XXX","name":"B","type":"XXX","id":1}

现在想将两个topic的数据写到同一个es索引中,但由于更新性能太慢,有啥思路可以加速写入性能呢(topicA和topicB的数据可能会有几天的延时)?

需求2:

在cluster1上有如a,b两索引,均有字段filed_a,索引a,b各自包含其它字段,建立新索引如c,要求c包含a索引全部文档,且在a和b索引关联字段 field_a 相同的文档中把b文档其它字段更新到索引c中。

2、需求分析

如上两个需求都涉及两个索引数据之间的关联。

提到 数据关联 或者 多表关联,我们都能想到的是四种多表关联核心实现:

  • 宽表,特点:空间换时间。
  • Nested 嵌套文档,特点:适合于子文档更新不频繁场景。
  • Join 父子文档,特点:适合于子文档频繁更新的场景。
  • 业务层面自己实现,特点:灵活自控。

以上四种都无法实现上述需求涉及的问题。

需求2的本质是:跨索引相同字段关联扩充字段实现。

在 7.5 版本的 ingest 预处理环节新增了enrich processer 字段丰富功能,能很好的实现上述需求。

2、enrich processor 解读

2.1 enrich processor 全局认知

全局来看: enrich processor 是 ingest 预处理管道中众多 processors中的一个。

2.2 enrich processor 最早发布版本

如前所述,Elasticsearch 7.5 版本后新增了该项功能。

2.3 enrich processor 定义

enrich:中文可以翻译成丰富,本质也可以理解:“使丰富”的意思。

借助 enrich 预处理管道,可以将已有索引中的数据添加到新写入的文档中。

官方举例如下:

  • 根据已知 IP 添加 web 服务或供应商。
  • 根据产品 ID 添加零售订单。
  • 根据电子邮件补充添加联系信息。
  • 根据用户地址添加邮政编码。

2.4 非 enrich processor 工作原理

为了对比,我们先讲一下: 非 enrich processor 的工作原理。

非 enrich 的预处理管道 都相对“简单、直白”,如下图所示:

图片来自:Elastic官方文档

新写入的文档中间经过预处理管道预处理实现了数据的 ETL 清洗后写入到目标索引中。

中间的 ETL 清洗包含但不限于:trim、drop、append、foreach等管道处理方式。

2.5 enrich processor 工作原理

区别于 非 enrich processor 的“直来直去”,enrich processor 在预处理管道中间加了“秘制配方”。

图片来自:Elastic官方文档

加了什么呢?

多了:enrich policy。

大家可以回想一下,上一次您在 Elasticsearch 中听到 policy 是在什么时候?

ILM 索引生命周期管理 里面 policy 实际是阶段 phrase 和动作 action 的综合体。

而 enrich 数据预处理环节,enrich 的组成有哪些呢?

2.5.1 enrich policy

对应上图中中间虚线框的圆圈部分,先上例子,建立下直观的认知。

PUT /_enrich/policy/data-policy
{
  "match": {
    "indices": "index_test_b",
    "match_field": "field_a",
    "enrich_fields": [
      "author",
      "publisher"
    ]
  }
}
  • indices:一个或多个源索引的列表,存储的是待 enrich 扩展的数据。
  • match:policy 类型,除了传统的match类型,还有应用于地理位置场景的:geo_match。
  • match_field:源索引中用于匹配传入文档的匹配字段。
  • enrich_field:源索引中的字段列表,用于添加到新传入的文档中。

2.5.2 source index 源索引

用于丰富新写入文档 (incoming documents)的索引。

它是目标索引中添加的待丰富数据的源头索引。没有了它, enrich 将无从谈起。

2.5.3 enrich index 丰富索引

这是一个咱们从来没有见过的新概念,有必要详细解读一下。

enrich index 是执行 enrich policy 生成的索引。

执行命令如下:

POST /_enrich/policy/data-policy/_execute

enrich index 特点如下:

  • elasticsearch 内部管理的系统级索引。
  • 目的很“单一”——仅用于 enrich processor。
  • 以 .enrich-* 开头。
  • 只读,不支持人为修改。

get 索引会有说明禁止修改

更新索引报错如上

  • 会被强制段合并,以实现快速检索。

这时候,读者可能会有疑问:直接用 source 索引不香吗?费那劲干啥?

原因:直接将传入文档与源索引中的文档进行匹配可能会很慢且需要大量资源。

为了加快速度,enrich 索引应运而生。

如果再引申的话,source 源索引可能会有大量的增删改查操作,而 enrich 一经创建,便不允许更改。

除非进行重新执行 policy。

2.6 enrich processor 适用场景

  • 日志场景
  • 其他需要预处理跨索引丰富数据的场景

2.7 enrich processor 性能问题

enrich processor 执行多项操作,可能会影响 ingest 管道的速度。

官方强烈建议在将 enrich process 部署到生产环境之前对其进行测试和基准测试。

官方不建议使用 enrich 处理器来 enrich (丰富)实时数据。enrich processor 最适合不经常更改的索引数据类型。

3、enrich processor 实战解读

针对文章开头的需求1、需求2:传统的索引之间的关联方式都不能解决问题。

核心实现步骤如下图所示:

借助 enrich processor 实现解读如下:

如下各个步骤和上图一一对应。

3.1 第一步:创建初始索引

DELETE index_test_a
PUT index_test_a
{
  "mappings":{
    "properties":{
      "field_a":{
        "type":"keyword"
      },
      "title":{
        "type":"keyword"
      },
      "publish_time":{
        "type":"date"
      }
    }
  }
}

POST index_test_a/_bulk
{"index":{"_id":1}}
{"field_a":"aaa", "title":"elasticsearch in action", "publish_time":"2017-07-01T00:00:00"}

DELETE index_test_b

PUT index_test_b
{
  "mappings": {
    "properties": {
      "field_a": {
        "type": "keyword"
      },
      "author": {
        "type": "keyword"
      },
      "publisher": {
        "type": "keyword"
      }
    }
  }
}

POST index_test_b/_bulk
{"index":{"_id":1}}
{"field_a":"aaa", "author":"jerry", "publisher":"Tsinghua"}

3.2 第二步:创建data-policy

DELETE _enrich/policy/data-policy
PUT /_enrich/policy/data-policy
{
  "match": {
    "indices": "index_test_b",
    "match_field": "field_a",
    "enrich_fields": ["author","publisher"]
  }
}

3.3 第三步:执行data-policy

POST /_enrich/policy/data-policy/_execute

3.4 第四步:创建 pipeline

DELETE /_ingest/pipeline/data_lookup
PUT /_ingest/pipeline/data_lookup
{
  "processors": [
    {
      "enrich": {
        "policy_name": "data-policy",
        "field": "field_a",
        "target_field": "field_from_bindex",
        "max_matches": "1"
      }
    },
    {
      "append": {
        "field": "author",