MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

ElasticSearch节点启动流程的核心操作揭秘

2023-03-302.7k 阅读

ElasticSearch 节点启动流程概述

ElasticSearch 是一个分布式的开源搜索和分析引擎,旨在处理大量数据并提供实时搜索功能。其节点启动流程涉及多个复杂步骤,这些步骤紧密协作,确保节点能够成功加入集群并开始提供服务。

配置加载

在 ElasticSearch 节点启动时,首要任务是加载配置文件。配置文件包含了节点运行所需的各种参数,如节点名称、集群名称、网络设置、数据存储路径等。这些配置决定了节点在集群中的角色和行为。

config/elasticsearch.yml 文件中,可以设置如下基本配置:

cluster.name: my_cluster
node.name: node-1
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200

通过上述配置,节点被命名为 node-1,并加入名为 my_cluster 的集群。数据存储路径设置为 /var/lib/elasticsearch,日志路径为 /var/log/elasticsearch。网络绑定到所有可用地址,并监听 9200 端口用于 HTTP 通信。

配置加载过程由 SettingsModule 负责。它解析配置文件,将各种配置项转换为内部可识别的格式,并提供给后续启动流程使用。例如,Settings 类负责管理这些配置,代码如下:

Settings settings = Settings.builder()
       .loadFromPath(new File("config/elasticsearch.yml"))
       .build();

上述代码通过 Settings.builder() 构建器从指定路径加载配置文件,并构建 Settings 对象。这个 Settings 对象在后续启动过程中被广泛使用,为各个组件提供配置信息。

环境初始化

加载配置后,ElasticSearch 进入环境初始化阶段。此阶段主要是为节点运行准备必要的运行时环境,包括创建数据存储目录、日志目录等。

创建数据存储目录的代码如下:

Path dataPath = Paths.get(settings.get("path.data"));
if (!Files.exists(dataPath)) {
    Files.createDirectories(dataPath);
}

上述代码从配置中获取数据存储路径,并使用 Files.createDirectories 方法创建目录(如果目录不存在)。

日志目录的创建也类似:

Path logsPath = Paths.get(settings.get("path.logs"));
if (!Files.exists(logsPath)) {
    Files.createDirectories(logsPath);
}

除了目录创建,环境初始化还包括设置系统属性、初始化安全相关配置等。例如,为了设置 JVM 堆大小,可以在启动脚本中添加 -Xms512m -Xmx512m 等参数,这也属于环境初始化的一部分。这些设置确保节点在一个合适的运行环境中启动,为后续的功能实现提供基础。

插件加载

ElasticSearch 支持丰富的插件扩展,在节点启动过程中,会加载配置好的插件。插件可以提供额外的功能,如监控、安全认证等。

插件加载流程首先会扫描 plugins 目录,查找所有已安装的插件。然后,根据插件的 plugin-descriptor.properties 文件,加载插件的类和资源。

假设我们有一个自定义插件 my_plugin,其目录结构如下:

my_plugin/
├── bin
├── lib
│   └── my_plugin.jar
└── plugin-descriptor.properties

plugin-descriptor.properties 文件中,定义了插件的基本信息,如 name=my_plugindescription=My custom plugin 等。

加载插件的代码逻辑大致如下:

File pluginsDir = new File(settings.get("path.plugins"));
for (File pluginDir : pluginsDir.listFiles()) {
    if (pluginDir.isDirectory()) {
        File descriptorFile = new File(pluginDir, "plugin-descriptor.properties");
        if (descriptorFile.exists()) {
            // 加载插件相关类和资源
            Plugin plugin = loadPlugin(pluginDir);
            // 注册插件相关服务
            registerPluginServices(plugin);
        }
    }
}

上述代码遍历 plugins 目录,找到每个插件的描述文件后,调用 loadPlugin 方法加载插件类和资源,并通过 registerPluginServices 方法注册插件提供的服务。这些插件在后续节点运行过程中发挥作用,扩展了 ElasticSearch 的功能。

节点角色确定

ElasticSearch 节点可以有多种角色,如主节点、数据节点、客户端节点等。在启动过程中,节点需要根据配置确定自己的角色。

主节点负责管理集群状态,如节点的加入和离开、索引的创建和删除等。数据节点负责存储和处理数据。客户端节点主要用于接收请求并转发给其他节点。

通过配置文件中的 node.masternode.data 等参数可以确定节点角色。例如:

node.master: true
node.data: true

上述配置表示该节点既是主节点候选,也存储数据。如果设置 node.master: falsenode.data: true,则该节点为数据节点。

节点角色确定的代码逻辑在 NodeRole 相关类中实现。它读取配置信息,根据配置设置节点的角色标识。例如:

boolean isMasterNode = settings.getAsBoolean("node.master", true);
boolean isDataNode = settings.getAsBoolean("node.data", true);
NodeRole role = new NodeRole(isMasterNode, isDataNode);

上述代码根据配置获取 node.masternode.data 的值,创建 NodeRole 对象,确定节点的角色。节点角色确定后,后续启动流程会根据角色加载相应的功能模块。

集群发现

集群发现是 ElasticSearch 节点启动的关键步骤之一。节点需要发现集群中的其他节点,并与之建立连接,以形成一个完整的集群。

ElasticSearch 使用基于 gossip 协议的发现机制,默认使用 Zen Discovery 模块。在配置文件中,可以设置集群发现相关参数,如:

discovery.seed_hosts: ["node-2:9300", "node-3:9300"]
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]

上述配置指定了种子节点(discovery.seed_hosts),节点启动时会尝试连接这些种子节点来发现集群中的其他节点。cluster.initial_master_nodes 用于在集群初始化时确定主节点集合。

集群发现的核心代码在 Discovery 接口及其实现类中。以 Zen Discovery 为例,其 doStart 方法实现了发现逻辑:

@Override
public void doStart() {
    List<DiscoveryNode> seeds = getSeeds();
    for (DiscoveryNode seed : seeds) {
        try {
            TransportConnection connection = transportService.connectToNode(seed);
            // 通过连接获取集群信息
            ClusterInfo clusterInfo = connection.sendRequest(new DiscoveryRequest()).actionGet();
            // 根据集群信息更新本地集群状态
            updateClusterState(clusterInfo);
        } catch (Exception e) {
            logger.error("Failed to connect to seed node [{}]", seed, e);
        }
    }
}

上述代码从配置中获取种子节点列表,尝试连接每个种子节点。通过连接发送 DiscoveryRequest 请求获取集群信息,并根据这些信息更新本地集群状态。这个过程使得节点能够了解集群的当前状态,包括节点成员、索引信息等,为后续加入集群做好准备。

主节点选举(主节点候选)

如果节点被配置为 node.master: true,则会参与主节点选举。主节点选举是一个分布式共识过程,确保集群中有且仅有一个主节点。

选举过程基于 Raft 算法的变种。在启动时,每个主节点候选节点会向其他节点发送投票请求。节点根据一定规则(如节点 ID、版本号等)决定是否投票给请求节点。

当一个节点获得超过半数节点的投票时,它将成为主节点。主节点选举的代码逻辑主要在 MasterService 类中。以下是简化的投票请求发送代码:

DiscoveryNode localNode = localNode();
for (DiscoveryNode node : masterNodes()) {
    if (!node.equals(localNode)) {
        TransportConnection connection = transportService.connectToNode(node);
        connection.sendRequest(new VoteRequest(localNode)).addListener(new Listener() {
            @Override
            public void onResponse(VoteResponse response) {
                if (response.isVoteGranted()) {
                    voteCount++;
                    if (voteCount > masterNodes().size() / 2) {
                        // 成为主节点
                        becomeMaster();
                    }
                }
            }

            @Override
            public void onFailure(Exception e) {
                logger.error("Failed to send vote request to [{}]", node, e);
            }
        });
    }
}

上述代码中,本地节点向其他主节点候选节点发送 VoteRequest 请求。如果收到的投票响应中 isVoteGrantedtrue,则增加投票计数。当投票计数超过主节点候选节点总数的一半时,该节点成为主节点。成为主节点后,它将负责管理集群状态,如处理节点加入、离开事件,以及索引相关操作。

数据恢复(数据节点)

对于数据节点,在启动并加入集群后,需要进行数据恢复操作。数据恢复是指从之前的备份或其他节点同步数据,以确保数据的完整性。

数据恢复过程首先会检查本地存储的数据分片状态。如果存在未完成的分片副本,节点会向主节点请求获取最新的分片状态信息。

主节点根据集群状态,通知数据节点从哪些节点同步数据。数据节点之间通过 Transport 机制进行数据传输。

假设数据节点 A 需要恢复某个分片副本,它会向主节点发送请求:

TransportConnection masterConnection = transportService.connectToNode(masterNode());
masterConnection.sendRequest(new ShardRecoveryRequest(shardId)).addListener(new Listener() {
    @Override
    public void onResponse(ShardRecoveryResponse response) {
        if (response.isSuccess()) {
            List<DiscoveryNode> sourceNodes = response.getSourceNodes();
            for (DiscoveryNode sourceNode : sourceNodes) {
                TransportConnection sourceConnection = transportService.connectToNode(sourceNode);
                sourceConnection.sendRequest(new DataSyncRequest(shardId)).addListener(new Listener() {
                    @Override
                    public void onResponse(DataSyncResponse dataSyncResponse) {
                        // 处理同步过来的数据
                        handleSyncedData(dataSyncResponse.getData());
                    }

                    @Override
                    public void onFailure(Exception e) {
                        logger.error("Failed to sync data from [{}]", sourceNode, e);
                    }
                });
            }
        } else {
            logger.error("Shard recovery request failed: [{}]", response.getFailureReason());
        }
    }

    @Override
    public void onFailure(Exception e) {
        logger.error("Failed to send shard recovery request to master", e);
    }
});

上述代码中,数据节点 A 向主节点发送 ShardRecoveryRequest 请求,获取用于恢复分片的源节点列表。然后,它依次向这些源节点发送 DataSyncRequest 请求,同步数据。同步完成后,调用 handleSyncedData 方法处理数据,确保数据在本地正确存储,完成数据恢复过程。

服务启动

完成上述步骤后,ElasticSearch 节点开始启动各种服务,包括 HTTP 服务、Transport 服务等。

HTTP 服务用于接收外部客户端的请求,如搜索请求、索引创建请求等。它基于 Netty 框架实现,通过配置的 http.port 监听端口。

启动 HTTP 服务的代码如下:

HttpServer httpServer = new HttpServer(settings, transportService);
httpServer.start();

上述代码创建 HttpServer 对象并启动。HttpServer 会绑定到配置的端口,等待客户端请求。

Transport 服务用于节点之间的内部通信,如集群发现、数据同步等。它同样基于 Netty 框架,在启动时会绑定到配置的 transport.tcp.port 端口。

TransportService transportService = new TransportService(settings, threadPool, networkService);
transportService.start();

上述代码创建并启动 TransportService,使得节点能够与其他节点进行通信。

此外,还会启动其他服务,如索引服务、查询服务等。这些服务协同工作,为用户提供完整的 ElasticSearch 功能。索引服务负责管理索引的创建、删除、更新等操作,查询服务则处理用户的搜索请求,并返回结果。它们在节点启动后持续运行,确保 ElasticSearch 能够高效地处理各种数据操作和查询请求。

集群状态同步

节点启动并加入集群后,需要与主节点进行集群状态同步。集群状态包含了集群中所有节点、索引、分片等信息。

主节点会定期向所有节点广播集群状态更新。节点接收到更新后,会根据更新内容调整本地状态。

假设节点接收到一个集群状态更新,代码如下:

public void handleClusterStateUpdate(ClusterStateUpdateRequest request) {
    ClusterState newState = request.getNewState();
    // 更新本地集群状态
    localClusterState = newState;
    // 根据新状态调整本地资源,如索引状态、分片分配等
    adjustLocalResources(newState);
}

上述代码中,handleClusterStateUpdate 方法处理接收到的集群状态更新请求。它获取新的集群状态,并更新本地集群状态。然后,调用 adjustLocalResources 方法根据新状态调整本地资源,如更新索引的状态、重新分配分片等操作。通过这种方式,节点始终保持与集群状态的一致性,确保其能够正确地处理各种请求和操作。

节点启动完成

当所有上述步骤顺利完成后,ElasticSearch 节点成功启动。此时,节点已经加入集群,并且各种服务已经就绪,可以开始接收和处理请求。

节点启动完成后,会记录启动日志,如在日志文件中记录:

[2024-01-01T12:00:00,000][INFO ][o.e.n.Node               ] [node-1] started

这条日志表示 node-1 节点已成功启动。

从客户端角度,可以通过发送 HTTP 请求来验证节点是否正常工作,例如发送 GET http://localhost:9200 请求,如果节点正常启动,会返回节点的基本信息,如:

{
    "name" : "node-1",
    "cluster_name" : "my_cluster",
    "cluster_uuid" : "abcdef1234567890",
    "version" : {
        "number" : "7.17.0",
        "build_flavor" : "default",
        "build_type" : "zip",
        "build_hash" : "c841f763279f18d8d29c1918c5f869718f7f8c39",
        "build_date" : "2022-03-09T20:37:28.644493Z",
        "build_snapshot" : false,
        "lucene_version" : "8.11.1",
        "minimum_wire_compatibility_version" : "6.8.0",
        "minimum_index_compatibility_version" : "6.0.0-beta1"
    },
    "tagline" : "You Know, for Search"
}

通过上述信息,可以确认节点已成功启动并加入集群,能够正常提供服务。此时,ElasticSearch 节点可以根据其角色,参与集群的各种操作,如数据存储、搜索查询、集群管理等,为用户提供高效的搜索和分析功能。