MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

ElasticSearch聚合在电商数据分析中的应用

2021-05-233.9k 阅读

电商数据分析与 ElasticSearch 聚合概述

在当今数字化的商业世界中,电商平台积累了海量的数据。这些数据涵盖了商品信息、用户行为、交易记录等多个方面。通过对这些数据的深入分析,电商企业能够精准地把握市场趋势、优化商品推荐、提升用户体验,进而在激烈的市场竞争中脱颖而出。而 ElasticSearch(简称 ES)作为一款强大的分布式搜索引擎,其聚合功能为电商数据分析提供了高效且灵活的解决方案。

ElasticSearch 聚合功能简介

ElasticSearch 的聚合功能允许用户在搜索结果上执行复杂的数据分析和统计操作。它类似于 SQL 中的分组和聚合操作,但在 ES 中,聚合是基于文档集合进行的,并且支持更丰富的分析类型,如桶聚合(Bucket Aggregation)和指标聚合(Metric Aggregation)。

  • 桶聚合:桶聚合的作用是根据特定的条件将文档分组到不同的桶中。每个桶可以看作是一个满足特定条件的文档集合。例如,根据商品类别将所有商品文档分到不同的桶中,每个桶就代表一个商品类别。常见的桶聚合类型包括 Terms 聚合(用于对文本字段进行分组)、Date Histogram 聚合(用于按时间间隔对日期字段进行分组)等。

  • 指标聚合:指标聚合用于在桶内计算统计指标。比如计算某个桶内商品的平均价格、总销量等。常见的指标聚合类型有 Average 聚合(计算平均值)、Sum 聚合(计算总和)、Max 聚合(计算最大值)等。

电商数据特点及分析需求

电商数据具有数据量大、种类繁多、更新速度快等特点。其数据类型包括结构化数据(如商品价格、库存数量)、半结构化数据(如商品描述、用户评论)以及非结构化数据(如用户行为日志)。

电商企业在数据分析方面有诸多需求,例如:

  • 商品分析:了解不同类别商品的销售情况,找出畅销和滞销商品,分析商品价格分布,以便进行价格策略调整。
  • 用户行为分析:分析用户购买行为模式,如购买时间分布、购买频率,为用户提供个性化推荐和营销活动策划提供依据。
  • 市场趋势分析:通过对历史销售数据的分析,预测市场需求变化,提前做好库存管理和新品研发准备。

ElasticSearch 聚合在商品分析中的应用

商品类别销售分析

在电商平台中,了解不同商品类别的销售情况是至关重要的。通过 ElasticSearch 的聚合功能,我们可以轻松实现这一分析。

假设我们有一个电商商品索引,其中每个文档代表一个商品,包含商品名称、类别、销量等字段。我们可以使用 Terms 聚合按商品类别分组,然后使用 Sum 聚合计算每个类别商品的总销量。

以下是使用 Elasticsearch 的 Java API 实现的代码示例:

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.builder.SearchSourceBuilder;

public class ProductCategorySalesAnalysis {
    public static void main(String[] args) throws Exception {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")));

        SearchRequest searchRequest = new SearchRequest("products");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        searchSourceBuilder.aggregation(
                AggregationBuilders.terms("category_sales")
                      .field("category.keyword")
                      .subAggregation(AggregationBuilders.sum("total_sales").field("sales"))
        );

        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        Terms categorySales = searchResponse.getAggregations().get("category_sales");
        for (Terms.Bucket bucket : categorySales.getBuckets()) {
            String category = bucket.getKeyAsString();
            Sum totalSales = bucket.getAggregations().get("total_sales");
            System.out.println("Category: " + category + ", Total Sales: " + totalSales.getValue());
        }

        client.close();
    }
}

在上述代码中,我们首先创建了一个 SearchRequest 对象,并指定索引为 products。然后使用 SearchSourceBuilder 构建搜索请求,通过 AggregationBuilders.terms 创建一个名为 category_sales 的 Terms 聚合,按 category 字段(这里使用 keyword 类型,以确保精确匹配)进行分组。接着在 Terms 聚合内部,使用 AggregationBuilders.sum 创建一个名为 total_sales 的 Sum 聚合,用于计算每个类别商品的总销量。最后执行搜索请求,并解析聚合结果,输出每个商品类别的名称及其总销量。

商品价格分布分析

了解商品价格分布有助于电商企业制定合理的价格策略。我们可以使用 ElasticSearch 的 Range 聚合来实现这一分析。Range 聚合可以根据设定的数值范围将文档分组到不同的桶中。

假设商品文档中有 price 字段表示商品价格,以下是使用 Elasticsearch 的 Python 客户端(Elasticsearch-py)实现商品价格分布分析的代码示例:

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

body = {
    "aggs": {
        "price_distribution": {
            "range": {
                "field": "price",
                "ranges": [
                    {"to": 50},
                    {"from": 50, "to": 100},
                    {"from": 100, "to": 200},
                    {"from": 200}
                ]
            },
            "aggs": {
                "product_count": {
                    "value_count": {
                        "field": "product_id"
                    }
                }
            }
        }
    }
}

response = es.search(index='products', body=body)

price_distribution = response['aggregations']['price_distribution']
for bucket in price_distribution['buckets']:
    if 'to' in bucket:
        print(f"Price range: 0 - {bucket['to']}, Product count: {bucket['product_count']['value']}")
    else:
        print(f"Price range: {bucket['from']} - +inf, Product count: {bucket['product_count']['value']}")

在上述代码中,我们定义了一个 body 字典,其中包含一个名为 price_distribution 的 Range 聚合。通过 field 指定使用 price 字段进行范围划分,ranges 中定义了不同的价格范围。在 Range 聚合内部,使用 value_count 指标聚合计算每个价格范围内的商品数量。最后执行搜索请求,并解析聚合结果,输出每个价格范围及其对应的商品数量。

ElasticSearch 聚合在用户行为分析中的应用

用户购买时间分布分析

分析用户购买时间分布可以帮助电商企业了解用户的活跃时间,以便在合适的时间进行营销活动推送。ElasticSearch 的 Date Histogram 聚合非常适合处理这种按时间维度的分析。

假设商品文档中有一个 purchase_date 字段记录用户购买时间,以下是使用 Elasticsearch 的 Kibana Dev Tools 控制台进行用户购买时间分布分析的 DSL(Domain Specific Language)示例:

POST /products/_search
{
    "size": 0,
    "aggs": {
        "purchase_time_distribution": {
            "date_histogram": {
                "field": "purchase_date",
                "calendar_interval": "day",
                "format": "yyyy - MM - dd"
            },
            "aggs": {
                "total_purchases": {
                    "sum": {
                        "field": "quantity"
                    }
                }
            }
        }
    }
}

在上述 DSL 中,我们使用 date_histogram 聚合按天(calendar_interval 设置为 day)对 purchase_date 字段进行分组,并指定输出格式为 yyyy - MM - dd。在 date_histogram 聚合内部,使用 sum 聚合计算每天的总购买数量(假设文档中有 quantity 字段表示购买数量)。通过执行此查询,我们可以得到每天的购买数量分布情况。

用户购买频率分析

了解用户的购买频率有助于电商企业识别忠实用户和潜在流失用户。我们可以通过计算用户购买次数,并使用 Bucket Selector 聚合筛选出不同购买频率范围的用户。

假设我们有一个用户购买记录索引,每个文档包含 user_idpurchase_date 字段。以下是使用 Elasticsearch 的 Java API 实现用户购买频率分析的代码示例:

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality;
import org.elasticsearch.search.aggregations.pipeline.BucketSelector;
import org.elasticsearch.search.aggregations.pipeline.ParsedBucketSelector;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class UserPurchaseFrequencyAnalysis {
    public static void main(String[] args) throws Exception {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")));

        SearchRequest searchRequest = new SearchRequest("purchase_records");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        searchSourceBuilder.aggregation(
                AggregationBuilders.terms("user_purchases")
                      .field("user_id.keyword")
                      .subAggregation(AggregationBuilders.cardinality("purchase_count").field("purchase_date"))
        );

        Map<String, Object> scriptParams = new HashMap<>();
        scriptParams.put("min_purchases", 5);
        scriptParams.put("max_purchases", 10);

        searchSourceBuilder.aggregation(
                AggregationBuilders.pipelineBucketSelector("active_users", "user_purchases",
                        "params.min_purchases <= _bucket.purchase_count && _bucket.purchase_count <= params.max_purchases",
                        scriptParams)
        );

        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        ParsedBucketSelector activeUsers = searchResponse.getAggregations().get("active_users");
        ParsedTerms userPurchases = activeUsers.getBuckets().get(0).getAggregations().get("user_purchases");
        for (Terms.Bucket bucket : userPurchases.getBuckets()) {
            String userId = bucket.getKeyAsString();
            Cardinality purchaseCount = bucket.getAggregations().get("purchase_count");
            System.out.println("User ID: " + userId + ", Purchase Count: " + purchaseCount.getValue());
        }

        client.close();
    }
}

在上述代码中,首先使用 Terms 聚合按 user_id 分组,并在每个用户组内使用 Cardinality 聚合计算每个用户的购买次数。然后通过 BucketSelector 聚合筛选出购买次数在 5 到 10 次之间的用户。最后执行搜索请求,并解析聚合结果,输出符合条件的用户 ID 及其购买次数。

ElasticSearch 聚合在市场趋势分析中的应用

历史销售数据趋势分析

通过分析历史销售数据趋势,电商企业可以预测未来市场需求,合理安排库存和生产计划。我们可以使用 ElasticSearch 的 Date Histogram 聚合按时间周期(如月、季度)对销售数据进行分组,并结合指标聚合计算每个周期的销售总量、平均销量等指标。

假设商品文档中有 sale_date 字段记录销售日期和 quantity 字段记录销售数量,以下是使用 Elasticsearch 的 Groovy 脚本在 Kibana Dev Tools 中进行历史销售数据趋势分析的示例:

POST /products/_search
{
    "size": 0,
    "aggs": {
        "monthly_sales_trend": {
            "date_histogram": {
                "field": "sale_date",
                "calendar_interval": "month",
                "format": "yyyy - MM"
            },
            "aggs": {
                "total_sales": {
                    "sum": {
                        "field": "quantity"
                    }
                },
                "average_sales": {
                    "avg": {
                        "field": "quantity"
                    }
                }
            }
        }
    }
}

在上述 DSL 中,date_histogram 聚合按月份对 sale_date 字段进行分组,并指定输出格式为 yyyy - MM。在 date_histogram 聚合内部,分别使用 sum 聚合计算每月的总销售数量和 avg 聚合计算每月的平均销售数量。通过执行此查询,我们可以得到每月的销售总量和平均销量趋势,为市场趋势分析提供数据支持。

新品销售趋势跟踪

对于电商企业推出的新品,跟踪其销售趋势可以评估新品的市场接受度。我们可以通过在文档中添加新品标识字段(如 is_new_product),然后结合 ElasticSearch 聚合功能分析新品在不同时间段的销售情况。

以下是使用 Elasticsearch 的 Python 客户端实现新品销售趋势跟踪的代码示例:

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

body = {
    "query": {
        "bool": {
            "filter": {
                "term": {
                    "is_new_product": true
                }
            }
        }
    },
    "aggs": {
        "new_product_sales_trend": {
            "date_histogram": {
                "field": "sale_date",
                "calendar_interval": "week",
                "format": "yyyy - w"
            },
            "aggs": {
                "total_sales": {
                    "sum": {
                        "field": "quantity"
                    }
                }
            }
        }
    }
}

response = es.search(index='products', body=body)

new_product_sales_trend = response['aggregations']['new_product_sales_trend']
for bucket in new_product_sales_trend['buckets']:
    week = bucket['key_as_string']
    total_sales = bucket['total_sales']['value']
    print(f"Week: {week}, Total sales of new products: {total_sales}")

在上述代码中,首先通过 query 部分筛选出 is_new_producttrue 的文档,即新品文档。然后使用 date_histogram 聚合按周对 sale_date 字段进行分组,并在每个周分组内使用 sum 聚合计算新品的总销售数量。最后执行搜索请求,并解析聚合结果,输出每周新品的总销售数量,从而跟踪新品的销售趋势。

ElasticSearch 聚合性能优化

合理设置索引和字段映射

在进行 ElasticSearch 聚合操作之前,合理设置索引和字段映射对于性能至关重要。例如,对于需要进行聚合的文本字段,应根据实际需求选择合适的字段类型。如果需要精确匹配和分组,可使用 keyword 类型;如果需要全文搜索和模糊匹配,则使用 text 类型,但 text 类型一般不适合直接用于聚合操作。

对于日期字段,应确保其映射类型为 date,并根据聚合的时间精度需求选择合适的日期格式。此外,合理设置索引的分片数和副本数也会影响聚合性能。分片数过多可能导致资源浪费和性能下降,而分片数过少可能无法充分利用集群资源。

减少聚合的文档数量

在执行聚合操作时,尽量减少参与聚合的文档数量可以显著提高性能。可以通过添加查询条件来过滤掉不必要的文档。例如,在进行商品销售分析时,如果只关注某几个特定类别的商品销售情况,可以在查询中添加类别过滤条件。

以下是在 Java API 中添加过滤条件减少文档数量的示例:

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.builder.SearchSourceBuilder;

public class FilteredProductCategorySalesAnalysis {
    public static void main(String[] args) throws Exception {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")));

        SearchRequest searchRequest = new SearchRequest("products");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        searchSourceBuilder.query(QueryBuilders.termsQuery("category.keyword", "Electronics", "Clothing"));

        searchSourceBuilder.aggregation(
                AggregationBuilders.terms("category_sales")
                      .field("category.keyword")
                      .subAggregation(AggregationBuilders.sum("total_sales").field("sales"))
        );

        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        Terms categorySales = searchResponse.getAggregations().get("category_sales");
        for (Terms.Bucket bucket : categorySales.getBuckets()) {
            String category = bucket.getKeyAsString();
            Sum totalSales = bucket.getAggregations().get("total_sales");
            System.out.println("Category: " + category + ", Total Sales: " + totalSales.getValue());
        }

        client.close();
    }
}

在上述代码中,通过 searchSourceBuilder.query(QueryBuilders.termsQuery("category.keyword", "Electronics", "Clothing")) 添加了类别过滤条件,只对 “Electronics” 和 “Clothing” 类别进行聚合分析,从而减少了参与聚合的文档数量,提高了性能。

缓存聚合结果

对于一些不经常变化的数据进行聚合分析时,可以考虑缓存聚合结果。ElasticSearch 本身提供了一些缓存机制,如请求缓存(Request Cache),可以缓存搜索请求和聚合结果。但需要注意的是,请求缓存仅适用于只读索引,并且缓存是基于整个搜索请求的,包括查询条件、排序和聚合等。

在 Java API 中启用请求缓存的示例如下:

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.builder.SearchSourceBuilder;

public class CachedProductCategorySalesAnalysis {
    public static void main(String[] args) throws Exception {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")));

        SearchRequest searchRequest = new SearchRequest("products");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        searchSourceBuilder.aggregation(
                AggregationBuilders.terms("category_sales")
                      .field("category.keyword")
                      .subAggregation(AggregationBuilders.sum("total_sales").field("sales"))
        );

        searchSourceBuilder.requestCache(true);

        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        Terms categorySales = searchResponse.getAggregations().get("category_sales");
        for (Terms.Bucket bucket : categorySales.getBuckets()) {
            String category = bucket.getKeyAsString();
            Sum totalSales = bucket.getAggregations().get("total_sales");
            System.out.println("Category: " + category + ", Total Sales: " + totalSales.getValue());
        }

        client.close();
    }
}

在上述代码中,通过 searchSourceBuilder.requestCache(true) 启用了请求缓存。当相同的搜索请求再次执行时,如果缓存未过期,将直接从缓存中获取结果,从而提高查询性能。

通过合理运用 ElasticSearch 的聚合功能,并进行性能优化,电商企业能够更高效地从海量数据中提取有价值的信息,为企业决策提供有力支持,在激烈的市场竞争中保持优势。无论是商品分析、用户行为分析还是市场趋势分析,ElasticSearch 聚合都为电商数据分析提供了强大而灵活的工具。同时,在实际应用中,需要根据数据特点和业务需求不断优化聚合操作,以达到最佳的分析效果和性能表现。