Elasticsearch习题 5.Runtime_time、update_by_query、pipeline

Elasticsearch习题 5.Runtime_time、update_by_query、pipeline

Task1

数据

执行如下语句添加索引

POST task1/_bulk?refresh=true
{"index":{}}
{"date":"2011-06-16 12:12:21","magnitude" : 1.4, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}
{"index":{}}
{"date":"2011-06-16 12:12:21","magnitude" : 1.3, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}
{"index":{}}
{"date":"2011-06-17 12:12:21","magnitude" : 1.5, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}
{"index":{}}
{"date":"2011-04-18 12:12:21","magnitude" : 1.6, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}
{"index":{}}
{"date":"2011-06-19 12:12:21","magnitude" : 1.9, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}
{"index":{}}
{"date":"2011-06-20 12:12:21","magnitude" : 2.0, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}
{"index":{}}
{"date":1308544245123,"magnitude" : 2.1, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}
{"index":{}}
{"date":1308717045123,"magnitude" : 2.8, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}
{"index":{}}
{"date":"2011-06-20 12:12:21","magnitude" : 2.9, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}
{"index":{}}
{"date":"2011-06-20 12:12:21","magnitude" : 3.3, "lon" : -116.0902, "lat" : 33.2253, "depth" : 9.98, "area" : " 10km NNE of Ocotillo Wells"}

要求

写一个查询满足以下要求

  • 1:按星期分桶统计地震数据
  • 2:输出星期一至星期日中平均地震等级 没有数据的不显示
  • 3:返回平均地震等级最大的一个 是星期几
  • 4:进阶问题 每个星期的平均地震等级
  • 5:进阶问题 平均地震等级最大的是哪个星期

答案

(比较难 这个题 不知道dayOfWeekEnum这个东西 看了大佬 ````朱``` 的写法才知道)

GET task1/_search
{
  "size": 0, 
  "runtime_mappings": {
    "date":{
      "type": "date",
      "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
    },
    "day_of_week": {
      "type": "keyword",
      "script": {
        "source": "emit(doc['date'].value.dayOfWeekEnum.getDisplayName(TextStyle.FULL, Locale.ROOT))"
      }
    }
  },
  "aggs": {
    "week_aggs": {
      "terms": {
        "field": "day_of_week"
      },
      "aggs": {
        "state_aggs": {
          "stats": {
            "field": "magnitude"
          }
        }
      }
    },
    "each_week_aggs":{
      "date_histogram": {
        "field": "date",
        "calendar_interval": "1w"
      },
      "aggs": {
        "each_week_avg_aggs": {
          "avg": {
            "field": "magnitude"
          }
        }
      }
    },
    "avg_magnitude_by_week":{
      "avg_bucket": {
        "buckets_path": "week_aggs>state_aggs.avg"
      }
    },
    "max_avg_magnitude_by_week":{
      "max_bucket": {
        "buckets_path": "week_aggs>state_aggs.avg"
      }
    },
    "max_avg_magnitude_by_each_week":{
      "max_bucket": {
        "buckets_path": "each_week_aggs>each_week_avg_aggs"
      }
    }
  }
}

Task2

给定两个索引,earthquakesmagnitude_type_desc。这两个所以里有一个共同的 字段,并且是关联字段magnitude_type,第二个索引里的第二个字段是desc, 用来描述地震类型。

数据

POST earthquakes/_bulk
{"index":{}}
{"country":"TitleA","magnitude":"yyy","magnitude_type":"aaa"}
{"index":{}}
{"country":"TitleB","magnitude":"yyy","magnitude_type":"bbb"}
{"index":{}}
{"country":"TitleD","magnitude":"yyy","magnitude_type":"ddd"}
{"index":{}}
{"country":"TitleE","magnitude":"yyy","magnitude_type":"eee"}
{"index":{}}
{"country":"TitleE","magnitude":"yyy","magnitude_type":"hhh"}

POST magnitude_type_desc/_bulk
{"index":{}}
{"magnitude_type":"aaa","desc":"thisisaaa"}
{"index":{}}
{"magnitude_type":"bbb","desc":"thisisbbb"}
{"index":{}}
{"magnitude_type":"ccc","desc":"thisisccc"}
{"index":{}}
{"magnitude_type":"eee"}
{"index":{}}
{"magnitude_type":"hhh","desc":null}

要求

  • 创建一个新的索引earthquakes_enrich,里面包含了earthquakes里面的全部数据,

  • 每一条数据都要根据地震类型添加一个新的字段,就是desc

  • 最终的数据必须为如下所示

答案


PUT /_enrich/policy/earthquakes-policy
{
  "match": {
    "indices": "magnitude_type_desc",
    "match_field": "magnitude_type",
    "enrich_fields": ["desc"]
  }
}

POST /_enrich/policy/earthquakes-policy/_execute

# 群里大佬 小艾的方法
# pipline 组合使用 刷新了我的认知
# remove的时候会报错 ignore_failure可以忽略错误
PUT /_ingest/pipeline/earthquakes_pipeline
{
  "processors" : [
    {
      "enrich" : {
        "description": "创建一个新的索引`earthquakes_enrich`,里面包含了`earthquakes`里面的全部数据",
        "policy_name": "earthquakes-policy",
        "field" : "magnitude_type",
        "target_field": "temp_filed",
        "max_matches": 1
      }
    },
    {
      "set": {
        "field": "desc",
        "value": "{{{temp_filed.desc}}}"
      }
    },
    {
      "remove": {
        "field": "temp_filed",
        "ignore_failure": true
      }
    }
  ]
}


POST _reindex
{
  "source": {
    "index": "earthquakes_2"
  },
  "dest": {
    "index": "earthquakes_enrich",
    "pipeline": "earthquakes_pipeline"
  }
}

Task3

索引task3的字段tags(该字段是数组)里的每个子项都包含空格,创建一个新的索引task3_new

数据

PUT task3/_bulk
 {"index":{"_id":1}}
 {"tags":[" movie "," abc "]}
 {"index":{"_id":2}}
 {"tags":[" wl ", " master "]}

要求

  • 新索引中tags字段中每个项都要去掉空格
  • 增加一个新字段,这个新字段的值是index_a中每个tags字段中每个项的拼接(不包含空格)

答案

PUT /_ingest/pipeline/task3_new
{
  "processors": [
    {
      "foreach": {
        "field": "tags",
        "processor": {
          "trim": {
            "field": "_ingest._value"
          }
        }
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": """
          String index_a = ""; 
          for(int i=0;i<ctx['tags'].length;i++){
            index_a += ctx['tags'][i];
          }
          ctx['index_a'] = index_a;
        """
      }
    }
  ]
}

还有一种解法是
#PUT _ingest/pipeline/handle_task3
#{
#  "processors": [
#    {
#      "foreach": {
#        "field": "tags",
#        "processor": {
#          "trim": {
#            "field": "_ingest._value"
#          }
#        }
#      }
#    },
#    {
#      "foreach": {
#        "field": "tags",
#        "processor": {
#          "set": {
#            "field": "index_a",
#            "value": "{{{index_a}}}{{{_ingest._value}}}"
#          }
#        }
#      }
#    }
#  ]
#}

POST _reindex
{
  "source": {
    "index": "task3"
  },
  "dest": {
    "index": "task3_new",
    "pipeline": "task3_new"
  }
}

本文由 在码圈 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可
您可以自由的转载和修改,但请务必注明文章来源并且不可用于商业目的。
本站部分内容收集于互联网,如果有侵权内容、不妥之处,请联系我们删除。敬请谅解!
原文链接:https://www.bedebug.com/archives/elasticsearch-exe5
最后更新于:2022-03-02 00:04:48

请博主喝咖啡 ☕.