ElasticSearch聚合功能入门与实践
1. ElasticSearch 聚合功能简介
ElasticSearch 作为一款强大的分布式搜索和分析引擎,聚合功能是其数据分析能力的核心体现。聚合(Aggregation)允许我们对 ElasticSearch 索引中的数据进行分组、统计和分析,就如同 SQL 中的 GROUP BY
语句,但 ElasticSearch 的聚合功能更为灵活和强大,能处理海量数据且支持分布式计算。
在 ElasticSearch 中,聚合操作通常是在搜索请求的基础上执行的,它可以与查询(Query)相结合,对符合特定查询条件的数据进行聚合分析。聚合功能的主要应用场景包括数据统计、指标计算、数据分组洞察等。例如,在电商场景中,我们可能需要统计不同品牌商品的销量,或者分析不同年龄段用户的购买行为等,这些都可以通过 ElasticSearch 的聚合功能来实现。
2. 聚合的基本概念
2.1 桶(Buckets)
桶是聚合的核心概念之一,它类似于 SQL 中的 GROUP BY
分组。每个桶都有一个关联的条件,文档通过与这些条件进行匹配,被分配到不同的桶中。例如,我们可以根据商品的品牌创建桶,所有属于同一品牌的商品文档就会被分到同一个桶里。桶可以包含子桶,形成多层次的分组结构。
2.2 指标(Metrics)
指标用于对桶内的数据进行统计计算。一旦文档被分配到桶中,我们可以对每个桶内的数据应用各种指标计算,如求和、平均值、最大值、最小值等。例如,对于按品牌分组的桶,我们可以计算每个品牌商品的平均价格,这里的“平均价格”就是一个指标。
2.3 管道(Pipelines)
管道聚合允许我们基于现有聚合的结果进行进一步的聚合操作。例如,我们可以先计算每个品牌的销售额,然后基于这些品牌销售额的结果,计算所有品牌销售额的总和。管道聚合为复杂数据分析提供了更强大的功能。
3. 常见的聚合类型
3.1 桶聚合(Bucket Aggregations)
- Terms 聚合:这是最常用的桶聚合之一,用于根据文档中某个字段的值进行分组。例如,假设我们有一个电商商品索引,其中包含“brand”(品牌)字段,我们可以使用 Terms 聚合按品牌对商品进行分组。
以下是使用 ElasticSearch 的 Java High - Level REST Client 进行 Terms 聚合的代码示例:
SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermsAggregationBuilder brandAggregation = AggregationBuilders.terms("brand_aggregation")
.field("brand");
searchSourceBuilder.aggregation(brandAggregation);
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
Aggregations aggregations = searchResponse.getAggregations();
if (aggregations != null) {
Terms brandTerms = aggregations.get("brand_aggregation");
for (Terms.Bucket bucket : brandTerms.getBuckets()) {
System.out.println("Brand: " + bucket.getKeyAsString() + ", Doc Count: " + bucket.getDocCount());
}
}
} catch (IOException e) {
e.printStackTrace();
}
在上述代码中,我们创建了一个 TermsAggregationBuilder
,命名为“brand_aggregation”,并指定按“brand”字段进行分组。执行搜索请求后,我们从响应中获取聚合结果,并遍历每个品牌桶,输出品牌名称和该品牌下的文档数量。
- Date Histogram 聚合:主要用于按时间间隔对日期类型的字段进行分组。例如,在网站访问日志分析中,我们可以按天、周、月等时间粒度统计访问量。
以下是使用 Python Elasticsearch 客户端进行 Date Histogram 聚合的代码示例:
from elasticsearch import Elasticsearch
es = Elasticsearch()
body = {
"aggs": {
"daily_visits": {
"date_histogram": {
"field": "visit_date",
"calendar_interval": "day"
}
}
}
}
response = es.search(index="website_logs", body=body)
for bucket in response['aggregations']['daily_visits']['buckets']:
print("Date: ", bucket['key_as_string'], ", Visit Count: ", bucket['doc_count'])
在这段代码中,我们在“website_logs”索引上进行搜索,使用 date_histogram
聚合按天对“visit_date”字段进行分组,并输出每天的访问量。
3.2 指标聚合(Metric Aggregations)
- Avg 聚合:用于计算某个数值型字段的平均值。例如,计算商品的平均价格。
以下是使用 ElasticSearch 的 REST API 进行 Avg 聚合的示例请求:
{
"aggs": {
"average_price": {
"avg": {
"field": "price"
}
}
}
}
上述 JSON 请求表示在整个索引数据上计算“price”字段的平均值,聚合结果命名为“average_price”。
- Sum 聚合:计算某个数值型字段的总和。例如,计算所有商品的总销售额。
{
"aggs": {
"total_sales": {
"sum": {
"field": "sales_amount"
}
}
}
}
此请求计算“sales_amount”字段的总和,聚合结果命名为“total_sales”。
3.3 管道聚合(Pipeline Aggregations)
- Sibling Pipeline Aggregations:这类管道聚合操作是基于同级的其他聚合结果。例如,
derivative
聚合可以计算两个数值型聚合结果之间的导数。假设我们已经有了按天统计的网站访问量聚合结果,我们可以使用derivative
聚合来计算每天访问量的变化率。
以下是使用 ElasticSearch 的 DSL(Domain - Specific Language)进行 derivative
聚合的示例:
from elasticsearch_dsl import Search, A
s = Search(index="website_logs")
daily_visits = A('date_histogram', field='visit_date', calendar_interval='day',
aggs={'total_visits': A('sum', field='visit_count')})
s.aggs.bucket('daily_visits', daily_visits)
derivative_agg = A('derivative', buckets_path='daily_visits>total_visits')
s.aggs.metric('visit_rate_change', derivative_agg)
response = s.execute()
for bucket in response.aggregations.daily_visits.buckets:
print("Date: ", bucket.key_as_string, ", Total Visits: ", bucket.total_visits.value)
print("Visit Rate Change: ", response.aggregations.visit_rate_change.value)
在上述代码中,我们首先使用 date_histogram
按天统计访问量,然后基于这个聚合结果,使用 derivative
计算访问量的变化率。
- Parent Pipeline Aggregations:这类管道聚合操作是基于父级聚合的结果。例如,
cumulative_sum
聚合可以计算某个数值型聚合结果的累积和。假设我们有按月份统计的销售额聚合结果,我们可以使用cumulative_sum
聚合计算从年初到每个月的累积销售额。
{
"aggs": {
"monthly_sales": {
"date_histogram": {
"field": "sale_date",
"calendar_interval": "month"
},
"aggs": {
"total_sales": {
"sum": {
"field": "sale_amount"
}
},
"cumulative_sales": {
"cumulative_sum": {
"buckets_path": "total_sales"
}
}
}
}
}
}
上述 JSON 请求在按月份统计销售额的基础上,计算每个月的累积销售额。
4. 多层聚合与嵌套聚合
4.1 多层聚合
多层聚合允许我们在桶聚合的基础上再创建桶聚合,形成多层次的分组结构。例如,在电商数据分析中,我们可以先按品牌进行分组,然后在每个品牌组内再按商品类别进行分组,最后计算每个类别商品的平均价格。
以下是使用 ElasticSearch 的 Java High - Level REST Client 实现多层聚合的代码示例:
SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermsAggregationBuilder brandAggregation = AggregationBuilders.terms("brand_aggregation")
.field("brand");
TermsAggregationBuilder categoryAggregation = AggregationBuilders.terms("category_aggregation")
.field("category");
AvgAggregationBuilder avgPriceAggregation = AggregationBuilders.avg("average_price")
.field("price");
categoryAggregation.subAggregation(avgPriceAggregation);
brandAggregation.subAggregation(categoryAggregation);
searchSourceBuilder.aggregation(brandAggregation);
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
Aggregations aggregations = searchResponse.getAggregations();
if (aggregations != null) {
Terms brandTerms = aggregations.get("brand_aggregation");
for (Terms.Bucket brandBucket : brandTerms.getBuckets()) {
System.out.println("Brand: " + brandBucket.getKeyAsString());
Terms categoryTerms = brandBucket.getAggregations().get("category_aggregation");
for (Terms.Bucket categoryBucket : categoryTerms.getBuckets()) {
System.out.println(" Category: " + categoryBucket.getKeyAsString() + ", Average Price: " + categoryBucket.getAggregations().get("average_price").getAvg());
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
在这段代码中,我们首先创建了品牌聚合(brandAggregation
),然后在品牌聚合内创建了类别聚合(categoryAggregation
),最后在类别聚合内创建了平均价格聚合(avgPriceAggregation
)。通过嵌套的方式实现了多层聚合,并输出了每个品牌下每个类别的平均价格。
4.2 嵌套聚合
嵌套聚合主要用于处理文档中嵌套结构的数据。假设我们有一个订单索引,每个订单文档包含多个订单项,每个订单项有自己的产品信息和数量。如果我们想按产品统计订单中产品的总数量,就需要使用嵌套聚合。
以下是使用 ElasticSearch 的 REST API 进行嵌套聚合的示例:
{
"aggs": {
"orders": {
"nested": {
"path": "order_items"
},
"aggs": {
"products": {
"terms": {
"field": "order_items.product"
},
"aggs": {
"total_quantity": {
"sum": {
"field": "order_items.quantity"
}
}
}
}
}
}
}
}
在上述请求中,我们首先使用 nested
聚合指定要处理的嵌套路径为“order_items”。然后在嵌套聚合内,通过 terms
聚合按产品名称进行分组,并在每个产品组内使用 sum
聚合计算产品的总数量。
5. 聚合的性能优化
5.1 减少文档扫描范围
在进行聚合操作前,尽量通过查询条件过滤掉不需要的数据。例如,在电商数据聚合中,如果我们只关心某个时间段内的商品数据,就可以在搜索请求中添加时间范围的查询条件。这样可以减少参与聚合计算的文档数量,从而提高性能。
SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 添加时间范围查询条件
RangeQueryBuilder dateQuery = QueryBuilders.rangeQuery("create_date")
.gte("2023 - 01 - 01")
.lte("2023 - 12 - 31");
searchSourceBuilder.query(dateQuery);
TermsAggregationBuilder brandAggregation = AggregationBuilders.terms("brand_aggregation")
.field("brand");
searchSourceBuilder.aggregation(brandAggregation);
searchRequest.source(searchSourceBuilder);
5.2 合理选择聚合类型
不同的聚合类型在性能上有差异。例如,Terms
聚合在处理高基数(字段有大量不同值)的字段时,可能会消耗较多资源。在这种情况下,可以考虑使用 Cardinality
聚合来近似计算不同值的数量,它在处理高基数字段时性能更好。
{
"aggs": {
"unique_customers": {
"cardinality": {
"field": "customer_id"
}
}
}
}
5.3 利用缓存
ElasticSearch 支持对聚合结果进行缓存。可以通过设置 _cache
参数为 true
来启用缓存,这样相同的聚合请求如果再次执行,就可以直接从缓存中获取结果,提高响应速度。
{
"aggs": {
"brand_sales": {
"terms": {
"field": "brand",
"_cache": true
},
"aggs": {
"total_sales": {
"sum": {
"field": "sales_amount"
}
}
}
}
}
}
6. 聚合与搜索的结合
聚合功能通常与搜索功能紧密结合。我们可以先通过搜索条件筛选出符合特定条件的文档,然后对这些文档进行聚合分析。例如,在电商搜索中,用户可能先搜索某个关键词,然后我们对搜索结果中的商品进行聚合,如按品牌统计商品数量。
以下是使用 ElasticSearch 的 Java High - Level REST Client 实现搜索与聚合结合的代码示例:
SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 搜索关键词为“laptop”
MatchQueryBuilder matchQuery = QueryBuilders.matchQuery("product_name", "laptop");
searchSourceBuilder.query(matchQuery);
TermsAggregationBuilder brandAggregation = AggregationBuilders.terms("brand_aggregation")
.field("brand");
searchSourceBuilder.aggregation(brandAggregation);
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
Aggregations aggregations = searchResponse.getAggregations();
if (aggregations != null) {
Terms brandTerms = aggregations.get("brand_aggregation");
for (Terms.Bucket bucket : brandTerms.getBuckets()) {
System.out.println("Brand: " + bucket.getKeyAsString() + ", Doc Count: " + bucket.getDocCount());
}
}
} catch (IOException e) {
e.printStackTrace();
}
在上述代码中,我们先通过 MatchQuery
搜索产品名称中包含“laptop”的商品,然后对这些搜索结果按品牌进行聚合,统计每个品牌的商品数量。
7. 实际应用案例
7.1 电商数据分析
在电商平台中,我们可以利用 ElasticSearch 的聚合功能进行多种数据分析。例如,分析不同地区用户的购买偏好。假设我们的订单索引包含“customer_address”(用户地址)和“product_category”(产品类别)字段。
{
"aggs": {
"regions": {
"terms": {
"field": "customer_address.region"
},
"aggs": {
"category_purchases": {
"terms": {
"field": "product_category"
},
"aggs": {
"total_purchases": {
"sum": {
"field": "quantity"
}
}
}
}
}
}
}
}
上述请求通过多层聚合,先按地区对订单进行分组,然后在每个地区内按产品类别分组,并统计每个类别产品的购买总量。通过这种方式,电商平台可以了解不同地区用户对各类产品的购买倾向,从而优化库存管理和营销策略。
7.2 日志分析
在系统日志分析场景中,假设我们有一个服务器日志索引,包含“timestamp”(时间戳)、“log_level”(日志级别)和“service_name”(服务名称)字段。我们可以使用 ElasticSearch 聚合来分析不同服务在不同时间段内的日志级别分布情况。
from elasticsearch import Elasticsearch
es = Elasticsearch()
body = {
"aggs": {
"services": {
"terms": {
"field": "service_name"
},
"aggs": {
"time_intervals": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "hour"
},
"aggs": {
"log_level_distribution": {
"terms": {
"field": "log_level"
},
"aggs": {
"count": {
"value_count": {
"field": "log_level"
}
}
}
}
}
}
}
}
}
}
response = es.search(index="server_logs", body=body)
for service_bucket in response['aggregations']['services']['buckets']:
print("Service: ", service_bucket['key'])
for time_bucket in service_bucket['time_intervals']['buckets']:
print(" Time: ", time_bucket['key_as_string'])
for level_bucket in time_bucket['log_level_distribution']['buckets']:
print(" Log Level: ", level_bucket['key'], ", Count: ", level_bucket['count']['value'])
在这段代码中,我们通过多层聚合,先按服务名称分组,然后在每个服务内按小时对日志进行分组,最后在每个时间间隔内按日志级别统计日志数量。通过这种分析,运维人员可以快速了解不同服务在不同时间段内的日志级别情况,以便及时发现潜在的系统问题。