MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

ElasticSearch聚合功能入门与实践

2024-08-012.3k 阅读

1. ElasticSearch 聚合功能简介

ElasticSearch 作为一款强大的分布式搜索和分析引擎,聚合功能是其数据分析能力的核心体现。聚合(Aggregation)允许我们对 ElasticSearch 索引中的数据进行分组、统计和分析,就如同 SQL 中的 GROUP BY 语句,但 ElasticSearch 的聚合功能更为灵活和强大,能处理海量数据且支持分布式计算。

在 ElasticSearch 中,聚合操作通常是在搜索请求的基础上执行的,它可以与查询(Query)相结合,对符合特定查询条件的数据进行聚合分析。聚合功能的主要应用场景包括数据统计、指标计算、数据分组洞察等。例如,在电商场景中,我们可能需要统计不同品牌商品的销量,或者分析不同年龄段用户的购买行为等,这些都可以通过 ElasticSearch 的聚合功能来实现。

2. 聚合的基本概念

2.1 桶(Buckets)

桶是聚合的核心概念之一,它类似于 SQL 中的 GROUP BY 分组。每个桶都有一个关联的条件,文档通过与这些条件进行匹配,被分配到不同的桶中。例如,我们可以根据商品的品牌创建桶,所有属于同一品牌的商品文档就会被分到同一个桶里。桶可以包含子桶,形成多层次的分组结构。

2.2 指标(Metrics)

指标用于对桶内的数据进行统计计算。一旦文档被分配到桶中,我们可以对每个桶内的数据应用各种指标计算,如求和、平均值、最大值、最小值等。例如,对于按品牌分组的桶,我们可以计算每个品牌商品的平均价格,这里的“平均价格”就是一个指标。

2.3 管道(Pipelines)

管道聚合允许我们基于现有聚合的结果进行进一步的聚合操作。例如,我们可以先计算每个品牌的销售额,然后基于这些品牌销售额的结果,计算所有品牌销售额的总和。管道聚合为复杂数据分析提供了更强大的功能。

3. 常见的聚合类型

3.1 桶聚合(Bucket Aggregations)

  • Terms 聚合:这是最常用的桶聚合之一,用于根据文档中某个字段的值进行分组。例如,假设我们有一个电商商品索引,其中包含“brand”(品牌)字段,我们可以使用 Terms 聚合按品牌对商品进行分组。

以下是使用 ElasticSearch 的 Java High - Level REST Client 进行 Terms 聚合的代码示例:

SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

TermsAggregationBuilder brandAggregation = AggregationBuilders.terms("brand_aggregation")
      .field("brand");

searchSourceBuilder.aggregation(brandAggregation);
searchRequest.source(searchSourceBuilder);

try {
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    Aggregations aggregations = searchResponse.getAggregations();
    if (aggregations != null) {
        Terms brandTerms = aggregations.get("brand_aggregation");
        for (Terms.Bucket bucket : brandTerms.getBuckets()) {
            System.out.println("Brand: " + bucket.getKeyAsString() + ", Doc Count: " + bucket.getDocCount());
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}

在上述代码中,我们创建了一个 TermsAggregationBuilder,命名为“brand_aggregation”,并指定按“brand”字段进行分组。执行搜索请求后,我们从响应中获取聚合结果,并遍历每个品牌桶,输出品牌名称和该品牌下的文档数量。

  • Date Histogram 聚合:主要用于按时间间隔对日期类型的字段进行分组。例如,在网站访问日志分析中,我们可以按天、周、月等时间粒度统计访问量。

以下是使用 Python Elasticsearch 客户端进行 Date Histogram 聚合的代码示例:

from elasticsearch import Elasticsearch

es = Elasticsearch()

body = {
    "aggs": {
        "daily_visits": {
            "date_histogram": {
                "field": "visit_date",
                "calendar_interval": "day"
            }
        }
    }
}

response = es.search(index="website_logs", body=body)

for bucket in response['aggregations']['daily_visits']['buckets']:
    print("Date: ", bucket['key_as_string'], ", Visit Count: ", bucket['doc_count'])

在这段代码中,我们在“website_logs”索引上进行搜索,使用 date_histogram 聚合按天对“visit_date”字段进行分组,并输出每天的访问量。

3.2 指标聚合(Metric Aggregations)

  • Avg 聚合:用于计算某个数值型字段的平均值。例如,计算商品的平均价格。

以下是使用 ElasticSearch 的 REST API 进行 Avg 聚合的示例请求:

{
    "aggs": {
        "average_price": {
            "avg": {
                "field": "price"
            }
        }
    }
}

上述 JSON 请求表示在整个索引数据上计算“price”字段的平均值,聚合结果命名为“average_price”。

  • Sum 聚合:计算某个数值型字段的总和。例如,计算所有商品的总销售额。
{
    "aggs": {
        "total_sales": {
            "sum": {
                "field": "sales_amount"
            }
        }
    }
}

此请求计算“sales_amount”字段的总和,聚合结果命名为“total_sales”。

3.3 管道聚合(Pipeline Aggregations)

  • Sibling Pipeline Aggregations:这类管道聚合操作是基于同级的其他聚合结果。例如,derivative 聚合可以计算两个数值型聚合结果之间的导数。假设我们已经有了按天统计的网站访问量聚合结果,我们可以使用 derivative 聚合来计算每天访问量的变化率。

以下是使用 ElasticSearch 的 DSL(Domain - Specific Language)进行 derivative 聚合的示例:

from elasticsearch_dsl import Search, A

s = Search(index="website_logs")

daily_visits = A('date_histogram', field='visit_date', calendar_interval='day',
                 aggs={'total_visits': A('sum', field='visit_count')})
s.aggs.bucket('daily_visits', daily_visits)

derivative_agg = A('derivative', buckets_path='daily_visits>total_visits')
s.aggs.metric('visit_rate_change', derivative_agg)

response = s.execute()

for bucket in response.aggregations.daily_visits.buckets:
    print("Date: ", bucket.key_as_string, ", Total Visits: ", bucket.total_visits.value)

print("Visit Rate Change: ", response.aggregations.visit_rate_change.value)

在上述代码中,我们首先使用 date_histogram 按天统计访问量,然后基于这个聚合结果,使用 derivative 计算访问量的变化率。

  • Parent Pipeline Aggregations:这类管道聚合操作是基于父级聚合的结果。例如,cumulative_sum 聚合可以计算某个数值型聚合结果的累积和。假设我们有按月份统计的销售额聚合结果,我们可以使用 cumulative_sum 聚合计算从年初到每个月的累积销售额。
{
    "aggs": {
        "monthly_sales": {
            "date_histogram": {
                "field": "sale_date",
                "calendar_interval": "month"
            },
            "aggs": {
                "total_sales": {
                    "sum": {
                        "field": "sale_amount"
                    }
                },
                "cumulative_sales": {
                    "cumulative_sum": {
                        "buckets_path": "total_sales"
                    }
                }
            }
        }
    }
}

上述 JSON 请求在按月份统计销售额的基础上,计算每个月的累积销售额。

4. 多层聚合与嵌套聚合

4.1 多层聚合

多层聚合允许我们在桶聚合的基础上再创建桶聚合,形成多层次的分组结构。例如,在电商数据分析中,我们可以先按品牌进行分组,然后在每个品牌组内再按商品类别进行分组,最后计算每个类别商品的平均价格。

以下是使用 ElasticSearch 的 Java High - Level REST Client 实现多层聚合的代码示例:

SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

TermsAggregationBuilder brandAggregation = AggregationBuilders.terms("brand_aggregation")
      .field("brand");

TermsAggregationBuilder categoryAggregation = AggregationBuilders.terms("category_aggregation")
      .field("category");

AvgAggregationBuilder avgPriceAggregation = AggregationBuilders.avg("average_price")
      .field("price");

categoryAggregation.subAggregation(avgPriceAggregation);
brandAggregation.subAggregation(categoryAggregation);

searchSourceBuilder.aggregation(brandAggregation);
searchRequest.source(searchSourceBuilder);

try {
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    Aggregations aggregations = searchResponse.getAggregations();
    if (aggregations != null) {
        Terms brandTerms = aggregations.get("brand_aggregation");
        for (Terms.Bucket brandBucket : brandTerms.getBuckets()) {
            System.out.println("Brand: " + brandBucket.getKeyAsString());
            Terms categoryTerms = brandBucket.getAggregations().get("category_aggregation");
            for (Terms.Bucket categoryBucket : categoryTerms.getBuckets()) {
                System.out.println("  Category: " + categoryBucket.getKeyAsString() + ", Average Price: " + categoryBucket.getAggregations().get("average_price").getAvg());
            }
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}

在这段代码中,我们首先创建了品牌聚合(brandAggregation),然后在品牌聚合内创建了类别聚合(categoryAggregation),最后在类别聚合内创建了平均价格聚合(avgPriceAggregation)。通过嵌套的方式实现了多层聚合,并输出了每个品牌下每个类别的平均价格。

4.2 嵌套聚合

嵌套聚合主要用于处理文档中嵌套结构的数据。假设我们有一个订单索引,每个订单文档包含多个订单项,每个订单项有自己的产品信息和数量。如果我们想按产品统计订单中产品的总数量,就需要使用嵌套聚合。

以下是使用 ElasticSearch 的 REST API 进行嵌套聚合的示例:

{
    "aggs": {
        "orders": {
            "nested": {
                "path": "order_items"
            },
            "aggs": {
                "products": {
                    "terms": {
                        "field": "order_items.product"
                    },
                    "aggs": {
                        "total_quantity": {
                            "sum": {
                                "field": "order_items.quantity"
                            }
                        }
                    }
                }
            }
        }
    }
}

在上述请求中,我们首先使用 nested 聚合指定要处理的嵌套路径为“order_items”。然后在嵌套聚合内,通过 terms 聚合按产品名称进行分组,并在每个产品组内使用 sum 聚合计算产品的总数量。

5. 聚合的性能优化

5.1 减少文档扫描范围

在进行聚合操作前,尽量通过查询条件过滤掉不需要的数据。例如,在电商数据聚合中,如果我们只关心某个时间段内的商品数据,就可以在搜索请求中添加时间范围的查询条件。这样可以减少参与聚合计算的文档数量,从而提高性能。

SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 添加时间范围查询条件
RangeQueryBuilder dateQuery = QueryBuilders.rangeQuery("create_date")
      .gte("2023 - 01 - 01")
      .lte("2023 - 12 - 31");

searchSourceBuilder.query(dateQuery);

TermsAggregationBuilder brandAggregation = AggregationBuilders.terms("brand_aggregation")
      .field("brand");

searchSourceBuilder.aggregation(brandAggregation);
searchRequest.source(searchSourceBuilder);

5.2 合理选择聚合类型

不同的聚合类型在性能上有差异。例如,Terms 聚合在处理高基数(字段有大量不同值)的字段时,可能会消耗较多资源。在这种情况下,可以考虑使用 Cardinality 聚合来近似计算不同值的数量,它在处理高基数字段时性能更好。

{
    "aggs": {
        "unique_customers": {
            "cardinality": {
                "field": "customer_id"
            }
        }
    }
}

5.3 利用缓存

ElasticSearch 支持对聚合结果进行缓存。可以通过设置 _cache 参数为 true 来启用缓存,这样相同的聚合请求如果再次执行,就可以直接从缓存中获取结果,提高响应速度。

{
    "aggs": {
        "brand_sales": {
            "terms": {
                "field": "brand",
                "_cache": true
            },
            "aggs": {
                "total_sales": {
                    "sum": {
                        "field": "sales_amount"
                    }
                }
            }
        }
    }
}

6. 聚合与搜索的结合

聚合功能通常与搜索功能紧密结合。我们可以先通过搜索条件筛选出符合特定条件的文档,然后对这些文档进行聚合分析。例如,在电商搜索中,用户可能先搜索某个关键词,然后我们对搜索结果中的商品进行聚合,如按品牌统计商品数量。

以下是使用 ElasticSearch 的 Java High - Level REST Client 实现搜索与聚合结合的代码示例:

SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 搜索关键词为“laptop”
MatchQueryBuilder matchQuery = QueryBuilders.matchQuery("product_name", "laptop");
searchSourceBuilder.query(matchQuery);

TermsAggregationBuilder brandAggregation = AggregationBuilders.terms("brand_aggregation")
      .field("brand");

searchSourceBuilder.aggregation(brandAggregation);
searchRequest.source(searchSourceBuilder);

try {
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    Aggregations aggregations = searchResponse.getAggregations();
    if (aggregations != null) {
        Terms brandTerms = aggregations.get("brand_aggregation");
        for (Terms.Bucket bucket : brandTerms.getBuckets()) {
            System.out.println("Brand: " + bucket.getKeyAsString() + ", Doc Count: " + bucket.getDocCount());
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}

在上述代码中,我们先通过 MatchQuery 搜索产品名称中包含“laptop”的商品,然后对这些搜索结果按品牌进行聚合,统计每个品牌的商品数量。

7. 实际应用案例

7.1 电商数据分析

在电商平台中,我们可以利用 ElasticSearch 的聚合功能进行多种数据分析。例如,分析不同地区用户的购买偏好。假设我们的订单索引包含“customer_address”(用户地址)和“product_category”(产品类别)字段。

{
    "aggs": {
        "regions": {
            "terms": {
                "field": "customer_address.region"
            },
            "aggs": {
                "category_purchases": {
                    "terms": {
                        "field": "product_category"
                    },
                    "aggs": {
                        "total_purchases": {
                            "sum": {
                                "field": "quantity"
                            }
                        }
                    }
                }
            }
        }
    }
}

上述请求通过多层聚合,先按地区对订单进行分组,然后在每个地区内按产品类别分组,并统计每个类别产品的购买总量。通过这种方式,电商平台可以了解不同地区用户对各类产品的购买倾向,从而优化库存管理和营销策略。

7.2 日志分析

在系统日志分析场景中,假设我们有一个服务器日志索引,包含“timestamp”(时间戳)、“log_level”(日志级别)和“service_name”(服务名称)字段。我们可以使用 ElasticSearch 聚合来分析不同服务在不同时间段内的日志级别分布情况。

from elasticsearch import Elasticsearch

es = Elasticsearch()

body = {
    "aggs": {
        "services": {
            "terms": {
                "field": "service_name"
            },
            "aggs": {
                "time_intervals": {
                    "date_histogram": {
                        "field": "timestamp",
                        "calendar_interval": "hour"
                    },
                    "aggs": {
                        "log_level_distribution": {
                            "terms": {
                                "field": "log_level"
                            },
                            "aggs": {
                                "count": {
                                    "value_count": {
                                        "field": "log_level"
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

response = es.search(index="server_logs", body=body)

for service_bucket in response['aggregations']['services']['buckets']:
    print("Service: ", service_bucket['key'])
    for time_bucket in service_bucket['time_intervals']['buckets']:
        print("  Time: ", time_bucket['key_as_string'])
        for level_bucket in time_bucket['log_level_distribution']['buckets']:
            print("    Log Level: ", level_bucket['key'], ", Count: ", level_bucket['count']['value'])

在这段代码中,我们通过多层聚合,先按服务名称分组,然后在每个服务内按小时对日志进行分组,最后在每个时间间隔内按日志级别统计日志数量。通过这种分析,运维人员可以快速了解不同服务在不同时间段内的日志级别情况,以便及时发现潜在的系统问题。