MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

ElasticSearch加载数据集的方法

2021-10-245.1k 阅读

直接使用 Elasticsearch API 加载数据集

准备工作

在使用 Elasticsearch API 加载数据集之前,需要确保 Elasticsearch 服务已经启动并可访问。同时,要对 Elasticsearch 的基本概念如索引(Index)、类型(Type,在 Elasticsearch 7.x 之后逐渐弃用)、文档(Document)有一定的了解。

单个文档加载

通过 HTTP 的 PUT 或 POST 请求可以向 Elasticsearch 索引中添加单个文档。例如,假设我们有一个名为 employees 的索引,并且希望添加一个员工文档。

首先,构建文档的 JSON 数据:

{
    "name": "John Doe",
    "age": 30,
    "department": "Engineering"
}

然后使用 cURL 命令发送请求:

curl -X POST "localhost:9200/employees/_doc" -H 'Content-Type: application/json' -d'
{
    "name": "John Doe",
    "age": 30,
    "department": "Engineering"
}
'

在上述命令中,localhost:9200 是 Elasticsearch 的地址和端口,employees 是索引名称,_doc 表示默认的文档类型(在 7.x 及之后版本,类型的概念逐渐淡化,但仍保留此写法以保持兼容性)。

批量加载文档

对于大量文档的加载,逐个添加文档效率较低。Elasticsearch 提供了 _bulk API 来实现批量操作。_bulk API 允许在一个请求中发送多个创建、更新或删除操作。

_bulk 请求的格式较为特殊,每个操作由一个元数据行和一个文档行组成。例如,假设我们要批量添加多个员工文档:

curl -X POST "localhost:9200/employees/_bulk" -H 'Content-Type: application/json' -d'
{"index": {"_index": "employees", "_id": "1"}}
{"name": "Alice", "age": 25, "department": "Marketing"}
{"index": {"_index": "employees", "_id": "2"}}
{"name": "Bob", "age": 35, "department": "Sales"}
'

在上述示例中,{"index": {"_index": "employees", "_id": "1"}} 是元数据行,指定了要操作的索引和文档 ID,紧跟其后的 {"name": "Alice", "age": 25, "department": "Marketing"} 是实际的文档内容。

从文件加载数据

如果数据集存储在文件中(如 JSON 或 CSV 文件),可以编写脚本结合 _bulk API 进行数据加载。

JSON 文件加载

假设我们有一个 employees.json 文件,内容如下:

{"name": "Charlie", "age": 28, "department": "Engineering"}
{"name": "David", "age": 32, "department": "Finance"}

可以使用 Python 结合 requests 库编写一个简单的加载脚本:

import requests

url = "http://localhost:9200/employees/_bulk"
headers = {'Content-Type': 'application/json'}

with open('employees.json', 'r') as f:
    bulk_data = ''
    for line in f:
        doc_id = hash(line)  # 简单生成文档 ID
        bulk_data += '{"index": {"_index": "employees", "_id": "%s"}}\n' % doc_id
        bulk_data += line.strip() + '\n'

response = requests.post(url, headers=headers, data=bulk_data)
print(response.text)

上述脚本逐行读取 JSON 文件,为每个文档生成元数据行,然后将整个批量数据发送到 Elasticsearch 的 _bulk API。

CSV 文件加载

对于 CSV 文件,需要先将其转换为适合 Elasticsearch 的 JSON 格式。假设我们有一个 employees.csv 文件:

name,age,department
Eva,27,HR
Frank,33,Engineering

可以使用 Python 的 pandas 库来处理:

import pandas as pd
import requests

url = "http://localhost:9200/employees/_bulk"
headers = {'Content-Type': 'application/json'}

df = pd.read_csv('employees.csv')
bulk_data = ''
for index, row in df.iterrows():
    doc_id = index
    data = {
        "name": row['name'],
        "age": row['age'],
        "department": row['department']
    }
    bulk_data += '{"index": {"_index": "employees", "_id": "%s"}}\n' % doc_id
    bulk_data += str(data).replace("'", '"') + '\n'

response = requests.post(url, headers=headers, data=bulk_data)
print(response.text)

此脚本使用 pandas 读取 CSV 文件,将每一行数据转换为 JSON 格式,并构造 _bulk 请求所需的格式进行数据加载。

使用 Logstash 加载数据集

Logstash 简介

Logstash 是 Elastic Stack 的一部分,它是一个数据收集、处理和转发的工具。它可以从各种数据源(如文件、数据库、网络等)读取数据,对数据进行过滤、转换等处理,然后将其发送到 Elasticsearch 等目标存储。

安装和配置 Logstash

  1. 下载 Logstash:从 Elastic 官方网站下载适合你操作系统的 Logstash 安装包,并解压。
  2. 配置 Logstash:Logstash 的配置文件采用特定的格式,通常由 inputfilteroutput 三个部分组成。

例如,要从一个 JSON 文件加载数据到 Elasticsearch,可以创建如下配置文件 load_data.conf

input {
    file {
        path => "/path/to/your/json/file.json"
        start_position => "beginning"
    }
}
filter {
    json {
        source => "message"
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "employees"
    }
}

在上述配置中:

  • input 部分指定从文件读取数据,path 为文件路径,start_position => "beginning" 表示从文件开头读取。
  • filter 部分使用 json 过滤器将读取到的文本数据解析为 JSON 格式。
  • output 部分将处理后的数据发送到本地的 Elasticsearch 集群,索引名称为 employees

运行 Logstash

在 Logstash 安装目录下,执行以下命令启动 Logstash 并加载数据:

bin/logstash -f load_data.conf

Logstash 会按照配置文件的定义,从指定文件读取数据,处理后发送到 Elasticsearch。如果数据量较大或有复杂的处理需求,Logstash 的多线程和分布式处理能力可以显著提高数据加载效率。

数据转换和过滤

Logstash 提供了丰富的过滤器插件,可以在数据加载过程中对数据进行转换和过滤。

例如,假设 JSON 文件中的 age 字段存储的是字符串类型,需要将其转换为数字类型,可以在 filter 部分添加如下配置:

filter {
    json {
        source => "message"
    }
    mutate {
        convert => { "age" => "integer" }
    }
}

mutate 过滤器的 convert 操作将 age 字段从字符串转换为整数类型。

又如,如果只想加载 departmentEngineering 的员工数据,可以添加 if 条件过滤:

filter {
    json {
        source => "message"
    }
    if [department] == "Engineering" {
        # 保留该文档
    } else {
        drop {}
    }
}

上述配置中,drop 过滤器会丢弃不符合条件的文档,只有 departmentEngineering 的文档会被发送到 Elasticsearch。

使用 Elasticsearch-Hadoop 加载数据集

Elasticsearch-Hadoop 简介

Elasticsearch-Hadoop 是一个用于在 Hadoop 生态系统和 Elasticsearch 之间进行数据交互的库。它允许使用 MapReduce、Hive、Pig 等 Hadoop 工具将数据加载到 Elasticsearch 中。

环境准备

  1. 安装 Hadoop:确保 Hadoop 环境已经搭建并正常运行。
  2. 添加 Elasticsearch-Hadoop 依赖:将 Elasticsearch-Hadoop 的 JAR 包添加到 Hadoop 相关工具(如 MapReduce 作业的类路径)中。可以从 Maven 仓库下载合适版本的 elasticsearch-hadoop JAR 包。

使用 MapReduce 加载数据

假设我们有一个存储在 HDFS 上的文本文件,每行是一个 JSON 格式的员工数据。可以编写一个简单的 MapReduce 作业将数据加载到 Elasticsearch。

首先,创建一个 Mapper 类:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class EsMapper extends Mapper<LongWritable, Text, Object, LinkedMapWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String jsonLine = value.toString();
        Map<String, Object> document = new HashMap<>();
        // 假设 JSON 格式为 {"name":"John Doe","age":30,"department":"Engineering"}
        // 这里简单解析,实际应用中可以使用 JSON 解析库
        String[] parts = jsonLine.split(",");
        for (String part : parts) {
            String[] keyValue = part.split(":");
            String field = keyValue[0].replace("{", "").replace("\"", "");
            String valueStr = keyValue[1].replace("}", "").replace("\"", "");
            if (field.equals("age")) {
                document.put(field, Integer.parseInt(valueStr));
            } else {
                document.put(field, valueStr);
            }
        }
        LinkedMapWritable esDocument = new LinkedMapWritable(document);
        context.write(null, esDocument);
    }
}

然后,创建一个驱动类来配置和运行 MapReduce 作业:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class EsLoader {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("es.nodes", "localhost");
        conf.set("es.port", "9200");
        conf.set("es.resource.write", "employees/_doc");

        Job job = Job.getInstance(conf, "Elasticsearch Data Load");
        job.setJarByClass(EsLoader.class);
        job.setMapperClass(EsMapper.class);
        job.setOutputFormatClass(EsOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));

        System.exit(job.waitForCompletion(true)? 0 : 1);
    }
}

在上述代码中:

  • EsMapper 类将输入的文本行解析为 Elasticsearch 文档格式,并通过 context.write 输出。
  • EsLoader 类配置了 Elasticsearch 的节点地址、端口以及要写入的索引和文档类型,同时设置了 MapReduce 作业的相关参数。

编译并打包上述代码,然后在 Hadoop 集群上运行作业:

hadoop jar es-loader.jar EsLoader /path/to/input/file.txt

这样就可以通过 MapReduce 将 HDFS 上的文件数据加载到 Elasticsearch 中。

使用 Hive 加载数据

通过 Elasticsearch-Hadoop 也可以在 Hive 中操作 Elasticsearch 数据,包括加载数据。

首先,创建一个 Hive 外部表关联 Elasticsearch 数据:

CREATE EXTERNAL TABLE employees (
    name STRING,
    age INT,
    department STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES (
    'es.resource' = 'employees/_doc',
    'es.nodes' = 'localhost',
    'es.port' = '9200'
);

上述语句创建了一个名为 employees 的 Hive 外部表,关联到 Elasticsearch 的 employees 索引的 _doc 文档类型。

然后,可以使用 INSERT INTO 语句将数据从 Hive 表加载到 Elasticsearch:

INSERT INTO TABLE employees
SELECT 'Eva', 27, 'HR' UNION ALL
SELECT 'Frank', 33, 'Engineering';

这样就可以利用 Hive 的数据处理能力将数据加载到 Elasticsearch 中,并且可以结合 Hive 的查询语法对数据进行预处理。

使用 Kibana 加载数据集(特定场景)

Kibana 数据导入工具

Kibana 主要是用于 Elasticsearch 数据可视化和探索的工具,但在某些情况下也可以用于简单的数据加载。Kibana 提供了一个数据导入工具,可以从 JSON 或 NDJSON(Newline Delimited JSON)文件导入数据。

操作步骤

  1. 准备数据文件:确保数据文件为 JSON 或 NDJSON 格式。例如,一个 NDJSON 文件 employees.ndjson 内容如下:
{"name": "Grace", "age": 26, "department": "Marketing"}
{"name": "Henry", "age": 31, "department": "Sales"}
  1. 打开 Kibana 数据导入界面:在 Kibana 界面中,导航到“Management” -> “Data” -> “Import Data”。
  2. 选择文件并配置导入:上传 employees.ndjson 文件,Kibana 会自动检测文件格式。在导入配置中,可以指定索引名称(如 employees),以及是否自动创建索引等选项。
  3. 执行导入:点击“Import”按钮开始导入数据。Kibana 会将文件中的数据逐行解析并发送到 Elasticsearch 创建相应的文档。

虽然 Kibana 的数据导入功能相对简单,适用于少量数据的快速导入或测试场景,但它提供了一种直观的方式在 Kibana 界面内完成数据加载操作,无需编写额外的代码或使用其他工具。

注意事项和优化建议

数据格式和映射

在加载数据之前,要确保数据格式符合 Elasticsearch 的要求。同时,合理定义索引的映射(Mapping)可以提高数据存储和查询效率。例如,如果某个字段只包含数字,应将其映射为数值类型,而不是字符串类型,这样不仅可以节省存储空间,还能提高数值相关的查询性能。

批量操作大小

在使用 _bulk API 或类似的批量加载方式时,要注意批量操作的大小。如果批量数据过大,可能会导致网络传输问题或 Elasticsearch 节点内存不足。可以通过测试不同的批量大小,找到适合当前环境和数据集的最佳值。一般来说,几百到几千条文档的批量大小比较常见。

索引设置

在加载数据之前,可以根据数据集的特点调整索引的设置。例如,对于读多写少的场景,可以适当增加副本数来提高查询性能;而对于写多读少的场景,可以减少副本数,将更多资源用于写入操作。另外,合理设置分片数也很重要,分片数过多会增加管理开销,过少则可能影响数据的并行处理能力。

性能监控和调优

在数据加载过程中,使用 Elasticsearch 的监控工具(如 Kibana 的监控面板)来实时监测节点的性能指标,如 CPU、内存、磁盘 I/O 和网络使用情况。根据监控数据,对 Elasticsearch 集群进行调优,例如调整 JVM 堆大小、优化磁盘 I/O 配置等,以确保数据加载的高效进行。

通过上述多种方法,可以根据不同的数据集特点、应用场景和技术栈选择最合适的 Elasticsearch 数据集加载方式,并通过优化措施提高加载效率和系统性能。