ElasticSearch节点启动流程的核心操作揭秘
ElasticSearch 节点启动流程概述
ElasticSearch 是一个分布式的开源搜索和分析引擎,旨在处理大量数据并提供实时搜索功能。其节点启动流程涉及多个复杂步骤,这些步骤紧密协作,确保节点能够成功加入集群并开始提供服务。
配置加载
在 ElasticSearch 节点启动时,首要任务是加载配置文件。配置文件包含了节点运行所需的各种参数,如节点名称、集群名称、网络设置、数据存储路径等。这些配置决定了节点在集群中的角色和行为。
在 config/elasticsearch.yml
文件中,可以设置如下基本配置:
cluster.name: my_cluster
node.name: node-1
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200
通过上述配置,节点被命名为 node-1
,并加入名为 my_cluster
的集群。数据存储路径设置为 /var/lib/elasticsearch
,日志路径为 /var/log/elasticsearch
。网络绑定到所有可用地址,并监听 9200 端口用于 HTTP 通信。
配置加载过程由 SettingsModule
负责。它解析配置文件,将各种配置项转换为内部可识别的格式,并提供给后续启动流程使用。例如,Settings
类负责管理这些配置,代码如下:
Settings settings = Settings.builder()
.loadFromPath(new File("config/elasticsearch.yml"))
.build();
上述代码通过 Settings.builder()
构建器从指定路径加载配置文件,并构建 Settings
对象。这个 Settings
对象在后续启动过程中被广泛使用,为各个组件提供配置信息。
环境初始化
加载配置后,ElasticSearch 进入环境初始化阶段。此阶段主要是为节点运行准备必要的运行时环境,包括创建数据存储目录、日志目录等。
创建数据存储目录的代码如下:
Path dataPath = Paths.get(settings.get("path.data"));
if (!Files.exists(dataPath)) {
Files.createDirectories(dataPath);
}
上述代码从配置中获取数据存储路径,并使用 Files.createDirectories
方法创建目录(如果目录不存在)。
日志目录的创建也类似:
Path logsPath = Paths.get(settings.get("path.logs"));
if (!Files.exists(logsPath)) {
Files.createDirectories(logsPath);
}
除了目录创建,环境初始化还包括设置系统属性、初始化安全相关配置等。例如,为了设置 JVM 堆大小,可以在启动脚本中添加 -Xms512m -Xmx512m
等参数,这也属于环境初始化的一部分。这些设置确保节点在一个合适的运行环境中启动,为后续的功能实现提供基础。
插件加载
ElasticSearch 支持丰富的插件扩展,在节点启动过程中,会加载配置好的插件。插件可以提供额外的功能,如监控、安全认证等。
插件加载流程首先会扫描 plugins
目录,查找所有已安装的插件。然后,根据插件的 plugin-descriptor.properties
文件,加载插件的类和资源。
假设我们有一个自定义插件 my_plugin
,其目录结构如下:
my_plugin/
├── bin
├── lib
│ └── my_plugin.jar
└── plugin-descriptor.properties
在 plugin-descriptor.properties
文件中,定义了插件的基本信息,如 name=my_plugin
、description=My custom plugin
等。
加载插件的代码逻辑大致如下:
File pluginsDir = new File(settings.get("path.plugins"));
for (File pluginDir : pluginsDir.listFiles()) {
if (pluginDir.isDirectory()) {
File descriptorFile = new File(pluginDir, "plugin-descriptor.properties");
if (descriptorFile.exists()) {
// 加载插件相关类和资源
Plugin plugin = loadPlugin(pluginDir);
// 注册插件相关服务
registerPluginServices(plugin);
}
}
}
上述代码遍历 plugins
目录,找到每个插件的描述文件后,调用 loadPlugin
方法加载插件类和资源,并通过 registerPluginServices
方法注册插件提供的服务。这些插件在后续节点运行过程中发挥作用,扩展了 ElasticSearch 的功能。
节点角色确定
ElasticSearch 节点可以有多种角色,如主节点、数据节点、客户端节点等。在启动过程中,节点需要根据配置确定自己的角色。
主节点负责管理集群状态,如节点的加入和离开、索引的创建和删除等。数据节点负责存储和处理数据。客户端节点主要用于接收请求并转发给其他节点。
通过配置文件中的 node.master
和 node.data
等参数可以确定节点角色。例如:
node.master: true
node.data: true
上述配置表示该节点既是主节点候选,也存储数据。如果设置 node.master: false
和 node.data: true
,则该节点为数据节点。
节点角色确定的代码逻辑在 NodeRole
相关类中实现。它读取配置信息,根据配置设置节点的角色标识。例如:
boolean isMasterNode = settings.getAsBoolean("node.master", true);
boolean isDataNode = settings.getAsBoolean("node.data", true);
NodeRole role = new NodeRole(isMasterNode, isDataNode);
上述代码根据配置获取 node.master
和 node.data
的值,创建 NodeRole
对象,确定节点的角色。节点角色确定后,后续启动流程会根据角色加载相应的功能模块。
集群发现
集群发现是 ElasticSearch 节点启动的关键步骤之一。节点需要发现集群中的其他节点,并与之建立连接,以形成一个完整的集群。
ElasticSearch 使用基于 gossip 协议的发现机制,默认使用 Zen Discovery 模块。在配置文件中,可以设置集群发现相关参数,如:
discovery.seed_hosts: ["node-2:9300", "node-3:9300"]
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]
上述配置指定了种子节点(discovery.seed_hosts
),节点启动时会尝试连接这些种子节点来发现集群中的其他节点。cluster.initial_master_nodes
用于在集群初始化时确定主节点集合。
集群发现的核心代码在 Discovery
接口及其实现类中。以 Zen Discovery 为例,其 doStart
方法实现了发现逻辑:
@Override
public void doStart() {
List<DiscoveryNode> seeds = getSeeds();
for (DiscoveryNode seed : seeds) {
try {
TransportConnection connection = transportService.connectToNode(seed);
// 通过连接获取集群信息
ClusterInfo clusterInfo = connection.sendRequest(new DiscoveryRequest()).actionGet();
// 根据集群信息更新本地集群状态
updateClusterState(clusterInfo);
} catch (Exception e) {
logger.error("Failed to connect to seed node [{}]", seed, e);
}
}
}
上述代码从配置中获取种子节点列表,尝试连接每个种子节点。通过连接发送 DiscoveryRequest
请求获取集群信息,并根据这些信息更新本地集群状态。这个过程使得节点能够了解集群的当前状态,包括节点成员、索引信息等,为后续加入集群做好准备。
主节点选举(主节点候选)
如果节点被配置为 node.master: true
,则会参与主节点选举。主节点选举是一个分布式共识过程,确保集群中有且仅有一个主节点。
选举过程基于 Raft
算法的变种。在启动时,每个主节点候选节点会向其他节点发送投票请求。节点根据一定规则(如节点 ID、版本号等)决定是否投票给请求节点。
当一个节点获得超过半数节点的投票时,它将成为主节点。主节点选举的代码逻辑主要在 MasterService
类中。以下是简化的投票请求发送代码:
DiscoveryNode localNode = localNode();
for (DiscoveryNode node : masterNodes()) {
if (!node.equals(localNode)) {
TransportConnection connection = transportService.connectToNode(node);
connection.sendRequest(new VoteRequest(localNode)).addListener(new Listener() {
@Override
public void onResponse(VoteResponse response) {
if (response.isVoteGranted()) {
voteCount++;
if (voteCount > masterNodes().size() / 2) {
// 成为主节点
becomeMaster();
}
}
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to send vote request to [{}]", node, e);
}
});
}
}
上述代码中,本地节点向其他主节点候选节点发送 VoteRequest
请求。如果收到的投票响应中 isVoteGranted
为 true
,则增加投票计数。当投票计数超过主节点候选节点总数的一半时,该节点成为主节点。成为主节点后,它将负责管理集群状态,如处理节点加入、离开事件,以及索引相关操作。
数据恢复(数据节点)
对于数据节点,在启动并加入集群后,需要进行数据恢复操作。数据恢复是指从之前的备份或其他节点同步数据,以确保数据的完整性。
数据恢复过程首先会检查本地存储的数据分片状态。如果存在未完成的分片副本,节点会向主节点请求获取最新的分片状态信息。
主节点根据集群状态,通知数据节点从哪些节点同步数据。数据节点之间通过 Transport
机制进行数据传输。
假设数据节点 A 需要恢复某个分片副本,它会向主节点发送请求:
TransportConnection masterConnection = transportService.connectToNode(masterNode());
masterConnection.sendRequest(new ShardRecoveryRequest(shardId)).addListener(new Listener() {
@Override
public void onResponse(ShardRecoveryResponse response) {
if (response.isSuccess()) {
List<DiscoveryNode> sourceNodes = response.getSourceNodes();
for (DiscoveryNode sourceNode : sourceNodes) {
TransportConnection sourceConnection = transportService.connectToNode(sourceNode);
sourceConnection.sendRequest(new DataSyncRequest(shardId)).addListener(new Listener() {
@Override
public void onResponse(DataSyncResponse dataSyncResponse) {
// 处理同步过来的数据
handleSyncedData(dataSyncResponse.getData());
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to sync data from [{}]", sourceNode, e);
}
});
}
} else {
logger.error("Shard recovery request failed: [{}]", response.getFailureReason());
}
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to send shard recovery request to master", e);
}
});
上述代码中,数据节点 A 向主节点发送 ShardRecoveryRequest
请求,获取用于恢复分片的源节点列表。然后,它依次向这些源节点发送 DataSyncRequest
请求,同步数据。同步完成后,调用 handleSyncedData
方法处理数据,确保数据在本地正确存储,完成数据恢复过程。
服务启动
完成上述步骤后,ElasticSearch 节点开始启动各种服务,包括 HTTP 服务、Transport 服务等。
HTTP 服务用于接收外部客户端的请求,如搜索请求、索引创建请求等。它基于 Netty
框架实现,通过配置的 http.port
监听端口。
启动 HTTP 服务的代码如下:
HttpServer httpServer = new HttpServer(settings, transportService);
httpServer.start();
上述代码创建 HttpServer
对象并启动。HttpServer
会绑定到配置的端口,等待客户端请求。
Transport 服务用于节点之间的内部通信,如集群发现、数据同步等。它同样基于 Netty
框架,在启动时会绑定到配置的 transport.tcp.port
端口。
TransportService transportService = new TransportService(settings, threadPool, networkService);
transportService.start();
上述代码创建并启动 TransportService
,使得节点能够与其他节点进行通信。
此外,还会启动其他服务,如索引服务、查询服务等。这些服务协同工作,为用户提供完整的 ElasticSearch 功能。索引服务负责管理索引的创建、删除、更新等操作,查询服务则处理用户的搜索请求,并返回结果。它们在节点启动后持续运行,确保 ElasticSearch 能够高效地处理各种数据操作和查询请求。
集群状态同步
节点启动并加入集群后,需要与主节点进行集群状态同步。集群状态包含了集群中所有节点、索引、分片等信息。
主节点会定期向所有节点广播集群状态更新。节点接收到更新后,会根据更新内容调整本地状态。
假设节点接收到一个集群状态更新,代码如下:
public void handleClusterStateUpdate(ClusterStateUpdateRequest request) {
ClusterState newState = request.getNewState();
// 更新本地集群状态
localClusterState = newState;
// 根据新状态调整本地资源,如索引状态、分片分配等
adjustLocalResources(newState);
}
上述代码中,handleClusterStateUpdate
方法处理接收到的集群状态更新请求。它获取新的集群状态,并更新本地集群状态。然后,调用 adjustLocalResources
方法根据新状态调整本地资源,如更新索引的状态、重新分配分片等操作。通过这种方式,节点始终保持与集群状态的一致性,确保其能够正确地处理各种请求和操作。
节点启动完成
当所有上述步骤顺利完成后,ElasticSearch 节点成功启动。此时,节点已经加入集群,并且各种服务已经就绪,可以开始接收和处理请求。
节点启动完成后,会记录启动日志,如在日志文件中记录:
[2024-01-01T12:00:00,000][INFO ][o.e.n.Node ] [node-1] started
这条日志表示 node-1
节点已成功启动。
从客户端角度,可以通过发送 HTTP 请求来验证节点是否正常工作,例如发送 GET http://localhost:9200
请求,如果节点正常启动,会返回节点的基本信息,如:
{
"name" : "node-1",
"cluster_name" : "my_cluster",
"cluster_uuid" : "abcdef1234567890",
"version" : {
"number" : "7.17.0",
"build_flavor" : "default",
"build_type" : "zip",
"build_hash" : "c841f763279f18d8d29c1918c5f869718f7f8c39",
"build_date" : "2022-03-09T20:37:28.644493Z",
"build_snapshot" : false,
"lucene_version" : "8.11.1",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
通过上述信息,可以确认节点已成功启动并加入集群,能够正常提供服务。此时,ElasticSearch 节点可以根据其角色,参与集群的各种操作,如数据存储、搜索查询、集群管理等,为用户提供高效的搜索和分析功能。