MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

ElasticSearch并发控制类deciders的实践

2022-08-082.8k 阅读

ElasticSearch并发控制类deciders概述

在ElasticSearch的复杂生态系统中,并发控制是确保数据一致性和系统高性能运行的关键因素。并发控制类deciders在这个过程中扮演着重要角色。它们负责在多线程或多进程环境下,对数据的读写操作进行协调,避免数据冲突和不一致性。

从本质上讲,deciders是一种决策机制,它依据特定的规则和条件,决定是否允许某个操作继续执行。在ElasticSearch中,这些操作通常包括文档的索引(indexing)、更新(updating)和删除(deleting)等。例如,当多个客户端同时尝试更新同一个文档时,decider会判断如何处理这些请求,以确保最终的数据状态是正确且符合预期的。

常见的并发控制场景与问题

  1. 写 - 写冲突:这是最常见的并发问题之一。当两个或多个写入操作同时针对同一个文档时,可能会导致数据覆盖或丢失。例如,假设有两个进程A和B,A读取文档后打算将字段X的值加1,B同时读取文档后打算将字段X的值加2。如果没有合适的并发控制,可能最终只保留了一个进程的修改,导致数据不一致。
  2. 读 - 写冲突:读取操作可能会在写入操作进行到一半时发生,从而读取到不完整或错误的数据。比如,在文档更新过程中,新数据还未完全写入磁盘,此时读取操作可能获取到旧数据与新数据的混合状态。

ElasticSearch中的deciders实现原理

  1. 乐观并发控制(Optimistic Concurrency Control):ElasticSearch默认采用乐观并发控制策略。在这种策略下,每次文档更新时,版本号会自动递增。当客户端尝试更新文档时,它必须提供当前文档的版本号。如果版本号匹配,更新操作会成功执行,并再次递增版本号。如果版本号不匹配,说明在客户端读取文档后,其他操作已经修改了文档,更新操作将失败。 代码示例(使用Java High - Level REST Client):
RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(
                new HttpHost("localhost", 9200, "http")));
UpdateRequest updateRequest = new UpdateRequest("your_index", "your_type", "your_id")
       .doc(XContentType.JSON, "field", "new_value")
       .version(1); // 假设初始版本号为1
try {
    UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
    if (updateResponse.getResult().name().equals("UPDATED")) {
        System.out.println("Document updated successfully");
    } else {
        System.out.println("Version conflict, document not updated");
    }
} catch (IOException e) {
    e.printStackTrace();
}
  1. 悲观并发控制(Pessimistic Concurrency Control):虽然ElasticSearch默认不使用悲观并发控制,但在某些特定场景下,开发者可以通过插件或自定义逻辑来实现。悲观并发控制假设冲突很可能发生,因此在操作开始前就获取锁,防止其他操作同时访问同一资源。例如,在更新文档前,先获取文档的排他锁,直到更新完成后才释放锁。

自定义deciders的实践

  1. 需求分析:假设在一个电商应用中,对于商品库存的更新操作,需要更严格的并发控制。除了版本号检查外,还需要确保库存数量不会因为并发操作而出现负数。
  2. 实现步骤
    • 定义自定义decider类
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestUpdateAction;
import org.elasticsearch.transport.TransportService;

public class CustomInventoryDecider {
    private final Client client;

    @Inject
    public CustomInventoryDecider(Settings settings, TransportService transportService, Client client) {
        this.client = client;
    }

    public boolean canUpdate(String index, String type, String id, int newStock) {
        try {
            GetRequest getRequest = new GetRequest(index, type, id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
            if (getResponse.isExists()) {
                int currentStock = getResponse.getSourceAsMap().getOrDefault("stock", 0);
                return currentStock + newStock >= 0;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }
}
  • 集成到ElasticSearch更新操作中
public class CustomUpdateAction extends HandledTransportAction<UpdateRequest, UpdateResponse> {
    private final CustomInventoryDecider customInventoryDecider;

    @Inject
    public CustomUpdateAction(Settings settings, TransportService transportService, CustomInventoryDecider customInventoryDecider) {
        super(settings, transportService, UpdateAction.NAME);
        this.customInventoryDecider = customInventoryDecider;
    }

    @Override
    protected void doExecute(UpdateRequest request, ActionListener<UpdateResponse> listener) {
        String index = request.index();
        String type = request.type();
        String id = request.id();
        int newStock = Integer.parseInt(request.doc().getSourceAsMap().getOrDefault("stock", 0).toString());
        if (customInventoryDecider.canUpdate(index, type, id, newStock)) {
            super.doExecute(request, listener);
        } else {
            listener.onFailure(new IllegalArgumentException("Stock cannot be negative after update"));
        }
    }
}
  • 注册自定义action
public class CustomRestPlugin extends Plugin {
    @Override
    public List<RestHandler> getRestHandlers(Settings settings, RestController restController) {
        Client client = restController.getClient();
        CustomInventoryDecider customInventoryDecider = new CustomInventoryDecider(settings, restController.getTransportService(), client);
        TransportService transportService = restController.getTransportService();
        CustomUpdateAction customUpdateAction = new CustomUpdateAction(settings, transportService, customInventoryDecider);
        restController.registerHandler(
                RestRequest.Method.POST,
                "/{index}/{type}/{id}/_update",
                new RestUpdateAction(customUpdateAction));
        return Collections.emptyList();
    }
}

通过上述步骤,实现了一个自定义的decider,它在商品库存更新时,不仅检查版本号,还确保库存数量不会出现负数。

性能与调优

  1. 乐观并发控制的性能影响:乐观并发控制在高并发环境下通常具有较好的性能,因为它不需要在每次操作前获取锁,减少了锁争用的开销。然而,如果版本冲突频繁发生,会导致更新操作失败并需要重试,这会增加额外的开销。为了减少版本冲突,可以尽量批量处理操作,或者在应用层对数据进行分区,减少不同操作对同一文档的竞争。
  2. 自定义deciders的性能考量:自定义deciders可能会引入额外的计算和查询开销。例如,上述自定义库存更新decider需要先查询当前库存数量。为了优化性能,可以采用缓存机制,在应用层缓存部分常用的库存数据,减少对ElasticSearch的查询次数。同时,对于复杂的自定义decider逻辑,可以考虑异步执行部分操作,以避免阻塞主要的请求处理线程。

多集群环境下的并发控制

  1. 数据同步与并发问题:在多集群环境中,数据同步是一个重要的环节,同时也带来了新的并发控制挑战。当数据在不同集群之间同步时,可能会出现写入冲突。例如,一个文档在集群A中被更新,同时在集群B中也进行了更新,在同步过程中需要解决这些冲突。
  2. 解决方案:可以采用主从模式,其中一个集群作为主集群,其他作为从集群。主集群处理所有的写入操作,然后将变更同步到从集群。在同步过程中,可以利用版本号机制来确保数据的一致性。另外,一些分布式协调工具如ZooKeeper可以用于协调不同集群之间的操作,避免并发冲突。

结合其他ElasticSearch特性进行并发控制

  1. 索引模板与并发控制:索引模板可以定义索引的结构和设置,包括并发相关的设置。例如,可以通过索引模板设置每个分片的副本数量。适当增加副本数量可以提高读操作的并发性能,但同时也会增加写入操作的开销,因为每次写入都需要同步到所有副本。
  2. 路由与并发控制:ElasticSearch支持通过路由(routing)将文档分配到特定的分片。在并发控制中,可以利用路由机制将相关的文档分配到同一个分片,减少跨分片的并发冲突。例如,在一个订单系统中,可以根据订单所属的用户ID进行路由,将同一用户的订单相关文档分配到同一分片,这样在处理该用户订单的并发操作时,可以更有效地进行控制。

监控与故障处理

  1. 监控并发冲突:ElasticSearch提供了一些监控工具和API,可以用于监控并发冲突的发生频率。例如,可以通过ElasticSearch的集群健康API查看索引的状态,包括版本冲突的次数。通过监控这些指标,可以及时发现并发控制策略是否有效,并进行相应的调整。
  2. 故障处理:当出现并发冲突导致操作失败时,应用程序需要有合理的故障处理机制。对于乐观并发控制下的版本冲突,可以采用重试机制,在一定次数内重试更新操作。对于自定义decider导致的失败,需要根据具体的业务逻辑进行处理,例如提示用户库存不足等信息。

在ElasticSearch的并发控制中,deciders是核心组件。通过深入理解其原理、实践自定义deciders以及结合其他特性进行优化,可以有效地解决并发问题,确保系统的高性能和数据一致性。无论是单集群还是多集群环境,合理的并发控制策略都是ElasticSearch应用成功的关键。