MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

理解 ACID 之原子性在并发操作中的重要性

2022-08-051.1k 阅读

什么是 ACID 中的原子性

在数据库事务处理中,ACID 是一组保证数据库可靠性的关键属性,分别代表原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。原子性作为 ACID 的首要特性,它确保一个事务中的所有操作要么全部成功执行,要么全部不执行。就像原子一样,不可分割,是一个完整的整体。

从本质上讲,原子性防止了事务执行过程中部分完成的操作对数据库造成影响。如果没有原子性,在一个包含多个步骤的事务中,若其中某个步骤失败,而之前的步骤已经修改了数据库状态,就会导致数据处于不一致的状态。例如,在银行转账事务中,从账户 A 向账户 B 转账一定金额,这个事务包含两个关键操作:从账户 A 扣除相应金额,然后向账户 B 添加相同金额。原子性保证这两个操作要么都成功完成,使得转账顺利进行;要么因为某些原因(如账户 A 余额不足),两个操作都不执行,不会出现账户 A 扣了钱但账户 B 未收到的情况。

原子性在单线程环境中的体现

在单线程环境下,原子性相对容易理解和保证。由于一次只有一个操作在执行,不存在并发冲突的问题。以下是一个简单的 Java 代码示例,使用 JDBC 来模拟一个简单的银行转账事务(假设数据库表 accountsidbalance 字段):

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class SingleThreadedTransfer {
    public static void main(String[] args) {
        String jdbcUrl = "jdbc:mysql://localhost:3306/yourdatabase";
        String username = "yourusername";
        String password = "yourpassword";

        int fromAccountId = 1;
        int toAccountId = 2;
        double amount = 100.0;

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
            connection.setAutoCommit(false);

            // 从账户 A 扣除金额
            String updateFromAccountSql = "UPDATE accounts SET balance = balance -? WHERE id =?";
            try (PreparedStatement fromAccountStmt = connection.prepareStatement(updateFromAccountSql)) {
                fromAccountStmt.setDouble(1, amount);
                fromAccountStmt.setInt(2, fromAccountId);
                fromAccountStmt.executeUpdate();
            }

            // 向账户 B 添加金额
            String updateToAccountSql = "UPDATE accounts SET balance = balance +? WHERE id =?";
            try (PreparedStatement toAccountStmt = connection.prepareStatement(updateToAccountSql)) {
                toAccountStmt.setDouble(1, amount);
                toAccountStmt.setInt(2, toAccountId);
                toAccountStmt.executeUpdate();
            }

            connection.commit();
            System.out.println("转账成功");
        } catch (SQLException e) {
            // 如果出现异常,回滚事务
            try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
                connection.rollback();
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
            e.printStackTrace();
            System.out.println("转账失败");
        }
    }
}

在上述代码中,通过 connection.setAutoCommit(false) 关闭自动提交,将整个转账过程作为一个事务。如果在执行 UPDATE 语句过程中出现异常,通过 connection.rollback() 回滚事务,保证了要么两个 UPDATE 操作都执行,要么都不执行,从而体现了原子性。

原子性在并发操作中的挑战

然而,在分布式系统或多线程环境中,并发操作会给原子性带来巨大挑战。多个事务可能同时对相同的数据进行操作,若没有适当的控制,就可能破坏原子性。例如,假设有两个并发的转账事务,一个从账户 A 向账户 B 转账,另一个从账户 B 向账户 A 转账。如果没有同步机制,可能会出现部分操作完成,导致账户余额错误的情况。

考虑以下简化的场景,假设两个线程同时执行转账操作:

线程 1:从账户 A 向账户 B 转账 100 元 线程 2:从账户 B 向账户 A 转账 50 元

如果没有原子性保障,可能出现以下执行顺序:

  1. 线程 1 执行从账户 A 扣除 100 元的操作。
  2. 线程 2 执行从账户 B 扣除 50 元的操作。
  3. 线程 2 执行向账户 A 添加 50 元的操作。
  4. 线程 1 执行向账户 B 添加 100 元的操作。

这样最终账户 A 的余额减少了 50 元,账户 B 的余额增加了 50 元,与预期的两个独立转账结果不符,破坏了原子性。

并发操作中保证原子性的方法

锁机制

  1. 悲观锁:悲观锁假定在并发操作中,数据冲突的可能性很高。因此,在对数据进行操作之前,先获取锁,确保同一时间只有一个事务能够访问和修改数据。数据库层面常见的悲观锁实现方式有 SELECT... FOR UPDATE 语句。

以下是使用 MySQL 的 SELECT... FOR UPDATE 实现悲观锁的 Java 代码示例:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class PessimisticLockTransfer {
    public static void main(String[] args) {
        String jdbcUrl = "jdbc:mysql://localhost:3306/yourdatabase";
        String username = "yourusername";
        String password = "yourpassword";

        int fromAccountId = 1;
        int toAccountId = 2;
        double amount = 100.0;

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
            connection.setAutoCommit(false);

            // 使用 SELECT... FOR UPDATE 获取悲观锁
            String selectFromAccountSql = "SELECT balance FROM accounts WHERE id =? FOR UPDATE";
            try (PreparedStatement fromAccountStmt = connection.prepareStatement(selectFromAccountSql)) {
                fromAccountStmt.setInt(1, fromAccountId);
                ResultSet fromAccountRs = fromAccountStmt.executeQuery();
                if (fromAccountRs.next()) {
                    double fromBalance = fromAccountRs.getDouble("balance");
                    if (fromBalance < amount) {
                        throw new RuntimeException("余额不足");
                    }
                }
            }

            // 从账户 A 扣除金额
            String updateFromAccountSql = "UPDATE accounts SET balance = balance -? WHERE id =?";
            try (PreparedStatement fromAccountStmt = connection.prepareStatement(updateFromAccountSql)) {
                fromAccountStmt.setDouble(1, amount);
                fromAccountStmt.setInt(2, fromAccountId);
                fromAccountStmt.executeUpdate();
            }

            // 使用 SELECT... FOR UPDATE 获取悲观锁
            String selectToAccountSql = "SELECT balance FROM accounts WHERE id =? FOR UPDATE";
            try (PreparedStatement toAccountStmt = connection.prepareStatement(selectToAccountSql)) {
                toAccountStmt.setInt(1, toAccountId);
                ResultSet toAccountRs = toAccountStmt.executeQuery();
                if (toAccountRs.next()) {
                    // 可以进行一些余额检查等操作
                }
            }

            // 向账户 B 添加金额
            String updateToAccountSql = "UPDATE accounts SET balance = balance +? WHERE id =?";
            try (PreparedStatement toAccountStmt = connection.prepareStatement(updateToAccountSql)) {
                toAccountStmt.setDouble(1, amount);
                toAccountStmt.setInt(2, toAccountId);
                toAccountStmt.executeUpdate();
            }

            connection.commit();
            System.out.println("转账成功");
        } catch (SQLException e) {
            // 如果出现异常,回滚事务
            try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
                connection.rollback();
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
            e.printStackTrace();
            System.out.println("转账失败");
        }
    }
}

在上述代码中,通过 SELECT... FOR UPDATE 语句在读取账户余额时获取锁,其他事务在该事务提交或回滚之前无法修改这些数据,从而保证了原子性。

  1. 乐观锁:乐观锁则假定数据冲突的可能性较低。它不预先锁定数据,而是在事务提交时检查数据是否被其他事务修改。如果数据未被修改,则提交事务;否则,回滚事务。乐观锁通常通过版本号或时间戳机制实现。

以下是使用版本号实现乐观锁的 Java 代码示例(假设数据库表 accounts 新增 version 字段):

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class OptimisticLockTransfer {
    public static void main(String[] args) {
        String jdbcUrl = "jdbc:mysql://localhost:3306/yourdatabase";
        String username = "yourusername";
        String password = "yourpassword";

        int fromAccountId = 1;
        int toAccountId = 2;
        double amount = 100.0;

        try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
            connection.setAutoCommit(false);

            // 获取账户 A 的余额和版本号
            String selectFromAccountSql = "SELECT balance, version FROM accounts WHERE id =?";
            try (PreparedStatement fromAccountStmt = connection.prepareStatement(selectFromAccountSql)) {
                fromAccountStmt.setInt(1, fromAccountId);
                ResultSet fromAccountRs = fromAccountStmt.executeQuery();
                if (fromAccountRs.next()) {
                    double fromBalance = fromAccountRs.getDouble("balance");
                    int fromVersion = fromAccountRs.getInt("version");
                    if (fromBalance < amount) {
                        throw new RuntimeException("余额不足");
                    }

                    // 尝试更新账户 A 的余额和版本号
                    String updateFromAccountSql = "UPDATE accounts SET balance = balance -?, version = version + 1 WHERE id =? AND version =?";
                    try (PreparedStatement updateFromAccountStmt = connection.prepareStatement(updateFromAccountSql)) {
                        updateFromAccountStmt.setDouble(1, amount);
                        updateFromAccountStmt.setInt(2, fromAccountId);
                        updateFromAccountStmt.setInt(3, fromVersion);
                        int updatedRows = updateFromAccountStmt.executeUpdate();
                        if (updatedRows == 0) {
                            throw new RuntimeException("数据已被其他事务修改,操作失败");
                        }
                    }
                }
            }

            // 获取账户 B 的余额和版本号
            String selectToAccountSql = "SELECT balance, version FROM accounts WHERE id =?";
            try (PreparedStatement toAccountStmt = connection.prepareStatement(selectToAccountSql)) {
                toAccountStmt.setInt(1, toAccountId);
                ResultSet toAccountRs = toAccountStmt.executeQuery();
                if (toAccountRs.next()) {
                    int toVersion = toAccountRs.getInt("version");

                    // 尝试更新账户 B 的余额和版本号
                    String updateToAccountSql = "UPDATE accounts SET balance = balance +?, version = version + 1 WHERE id =? AND version =?";
                    try (PreparedStatement updateToAccountStmt = connection.prepareStatement(updateToAccountSql)) {
                        updateToAccountStmt.setDouble(1, amount);
                        updateToAccountStmt.setInt(2, toAccountId);
                        updateToAccountStmt.setInt(3, toVersion);
                        int updatedRows = updateToAccountStmt.executeUpdate();
                        if (updatedRows == 0) {
                            throw new RuntimeException("数据已被其他事务修改,操作失败");
                        }
                    }
                }
            }

            connection.commit();
            System.out.println("转账成功");
        } catch (SQLException e) {
            // 如果出现异常,回滚事务
            try (Connection connection = DriverManager.getConnection(jdbcUrl, username, password)) {
                connection.rollback();
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
            e.printStackTrace();
            System.out.println("转账失败");
        }
    }
}

在上述代码中,每次更新数据时都检查版本号,如果版本号匹配则更新成功,否则说明数据已被其他事务修改,回滚事务,以此保证原子性。

事务管理器

  1. 两阶段提交(2PC):两阶段提交是一种常用的分布式事务解决方案,用于协调多个节点上的事务操作。它分为两个阶段:准备阶段(Prepare)和提交阶段(Commit)。

在准备阶段,事务管理器向所有参与事务的节点发送 PREPARE 消息,每个节点执行事务操作并记录日志,但不提交事务。节点向事务管理器回复 YES 表示准备成功,或 NO 表示准备失败。

在提交阶段,如果所有节点在准备阶段都回复 YES,事务管理器向所有节点发送 COMMIT 消息,节点收到后正式提交事务;如果有任何一个节点回复 NO,事务管理器向所有节点发送 ROLLBACK 消息,节点回滚事务。

以下是一个简化的两阶段提交示例代码,使用 Java 和 RMI(远程方法调用)来模拟分布式环境:

节点代码(Node.java)

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface Node extends Remote {
    boolean prepare() throws RemoteException;
    void commit() throws RemoteException;
    void rollback() throws RemoteException;
}

节点实现代码(NodeImpl.java)

import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

public class NodeImpl extends UnicastRemoteObject implements Node {
    private boolean isPrepared = false;

    protected NodeImpl() throws RemoteException {
        super();
    }

    @Override
    public boolean prepare() throws RemoteException {
        // 执行本地事务操作,例如更新数据库等
        // 这里简单模拟成功
        isPrepared = true;
        return isPrepared;
    }

    @Override
    public void commit() throws RemoteException {
        if (isPrepared) {
            // 正式提交事务
            System.out.println("节点提交事务");
        }
    }

    @Override
    public void rollback() throws RemoteException {
        if (isPrepared) {
            // 回滚事务
            System.out.println("节点回滚事务");
        }
    }
}

事务管理器代码(TransactionManager.java)

import java.rmi.Naming;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;

public class TransactionManager {
    private Node[] nodes;

    public TransactionManager(Node[] nodes) {
        this.nodes = nodes;
    }

    public void twoPhaseCommit() {
        boolean allPrepared = true;
        try {
            // 准备阶段
            for (Node node : nodes) {
                if (!node.prepare()) {
                    allPrepared = false;
                    break;
                }
            }

            // 提交阶段
            if (allPrepared) {
                for (Node node : nodes) {
                    node.commit();
                }
            } else {
                // 回滚阶段
                for (Node node : nodes) {
                    node.rollback();
                }
            }
        } catch (RemoteException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        try {
            Registry registry = LocateRegistry.createRegistry(1099);
            Node node1 = new NodeImpl();
            Naming.rebind("rmi://localhost:1099/node1", node1);
            Node node2 = new NodeImpl();
            Naming.rebind("rmi://localhost:1099/node2", node2);

            Node[] nodes = {node1, node2};
            TransactionManager tm = new TransactionManager(nodes);
            tm.twoPhaseCommit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,通过两阶段提交机制,事务管理器协调多个节点的事务操作,保证要么所有节点都提交事务,要么都回滚事务,从而在分布式环境中实现原子性。

  1. 三阶段提交(3PC):三阶段提交是对两阶段提交的改进,它在两阶段提交的基础上增加了一个预询问阶段(CanCommit)。在预询问阶段,事务管理器向所有参与节点发送 CAN_COMMIT 消息,询问节点是否可以进行事务操作。节点回复 YESNO。如果所有节点都回复 YES,事务管理器进入准备阶段;否则,直接进入回滚阶段。

相比两阶段提交,三阶段提交减少了单点故障导致的阻塞问题,提高了系统的容错性。但由于增加了额外的通信阶段,也带来了更高的性能开销。

分布式锁服务

在分布式系统中,分布式锁服务可以用于保证原子性。常见的分布式锁服务有 ZooKeeper、Redis 等。

  1. 基于 ZooKeeper 的分布式锁:ZooKeeper 是一个分布式协调服务,它通过节点的创建和删除来实现分布式锁。当一个客户端想要获取锁时,它在 ZooKeeper 中创建一个临时顺序节点。所有客户端获取锁时创建的节点会按照顺序排列,序号最小的客户端获得锁。当持有锁的客户端释放锁时,删除其创建的节点,下一个序号最小的客户端获得锁。

以下是一个简单的基于 ZooKeeper 的分布式锁 Java 代码示例:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZookeeperDistributedLock implements Watcher {
    private static final String ZK_SERVERS = "localhost:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String LOCK_PATH = "/lock";
    private ZooKeeper zk;
    private String lockNode;
    private CountDownLatch latch;

    public ZookeeperDistributedLock() throws IOException, InterruptedException {
        this.latch = new CountDownLatch(1);
        this.zk = new ZooKeeper(ZK_SERVERS, SESSION_TIMEOUT, this);
        this.latch.await();
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
            latch.countDown();
        }
    }

    public void acquireLock() throws KeeperException, InterruptedException {
        lockNode = zk.create(LOCK_PATH + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        List<String> children = zk.getChildren(LOCK_PATH, false);
        Collections.sort(children);
        String smallestNode = LOCK_PATH + "/" + children.get(0);
        if (lockNode.equals(smallestNode)) {
            return;
        } else {
            int index = children.indexOf(lockNode.substring(LOCK_PATH.length() + 1));
            String previousNode = LOCK_PATH + "/" + children.get(index - 1);
            Stat stat = zk.exists(previousNode, true);
            if (stat == null) {
                acquireLock();
            } else {
                synchronized (this) {
                    wait();
                }
            }
        }
    }

    public void releaseLock() throws KeeperException, InterruptedException {
        zk.delete(lockNode, -1);
    }

    public void close() throws InterruptedException {
        zk.close();
    }

    public static void main(String[] args) {
        try {
            ZookeeperDistributedLock lock = new ZookeeperDistributedLock();
            lock.acquireLock();
            System.out.println("获取到锁,执行事务操作");
            // 执行事务操作
            lock.releaseLock();
            System.out.println("释放锁");
            lock.close();
        } catch (IOException | InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}

在上述代码中,通过在 ZooKeeper 中创建临时顺序节点,并根据节点顺序来获取和释放锁,保证了同一时间只有一个客户端能够执行事务操作,从而在分布式环境中保证原子性。

  1. 基于 Redis 的分布式锁:Redis 可以通过 SETNX(SET if Not eXists)命令来实现分布式锁。当一个客户端执行 SETNX key value 命令时,如果 key 不存在,则设置 keyvalue 并返回 1,表示获取锁成功;如果 key 已存在,则返回 0,表示获取锁失败。为了防止死锁,通常会给锁设置一个过期时间。

以下是一个简单的基于 Redis 的分布式锁 Java 代码示例,使用 Jedis 客户端:

import redis.clients.jedis.Jedis;

public class RedisDistributedLock {
    private static final String LOCK_KEY = "distributed_lock";
    private static final String LOCK_VALUE = "locked";
    private static final int EXPIRE_TIME = 10; // 锁过期时间,单位秒

    public static boolean acquireLock(Jedis jedis) {
        Long result = jedis.setnx(LOCK_KEY, LOCK_VALUE);
        if (result == 1) {
            jedis.expire(LOCK_KEY, EXPIRE_TIME);
            return true;
        }
        return false;
    }

    public static void releaseLock(Jedis jedis) {
        jedis.del(LOCK_KEY);
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        if (acquireLock(jedis)) {
            System.out.println("获取到锁,执行事务操作");
            // 执行事务操作
            releaseLock(jedis);
            System.out.println("释放锁");
        } else {
            System.out.println("获取锁失败");
        }
        jedis.close();
    }
}

在上述代码中,通过 SETNX 命令获取锁,并设置过期时间,保证了在分布式环境中同一时间只有一个客户端能够获取锁执行事务操作,从而保证原子性。

原子性与其他 ACID 属性的关系

原子性与一致性

原子性是实现一致性的基础。一致性要求事务执行前后,数据库始终处于合法的状态,符合业务规则。如果没有原子性,事务中的部分操作成功,部分失败,就会导致数据库状态不一致。例如,在银行转账事务中,原子性保证了要么转账操作完整执行,使得账户余额的变化符合转账的逻辑,从而维持数据库的一致性;要么操作全部回滚,数据库状态保持不变,同样保证了一致性。只有原子性得到满足,才能进一步讨论一致性的其他方面,如数据约束的满足等。

原子性与隔离性

隔离性确保并发执行的事务之间相互隔离,不会相互干扰。原子性与隔离性密切相关,隔离性的实现有助于保证原子性。例如,通过锁机制实现隔离性时,悲观锁在获取锁期间,其他事务无法访问被锁定的数据,从而保证了当前事务操作的原子性。同样,乐观锁在检查数据版本时,也是基于隔离性的原则,确保在事务执行期间数据没有被其他事务修改,进而保证原子性。如果没有隔离性,并发事务可能同时修改相同的数据,破坏原子性,导致数据不一致。

原子性与持久性

持久性保证一旦事务提交,其对数据库的修改将永久保存,即使系统发生故障。原子性是持久性的前提。只有当事务中的所有操作都原子性地执行完毕,才能确保提交的事务是完整的,进而可以被持久化。如果事务没有原子性,部分操作的结果可能已经被持久化,而部分未持久化,这将导致数据库处于不一致的状态。例如,在一个包含文件写入和数据库更新的事务中,如果文件写入成功但数据库更新失败,且没有原子性保证,文件写入的结果可能已经持久化,而数据库状态未更新,这与持久性的要求相悖。只有保证原子性,才能正确地实现持久性,确保事务对数据库的修改是可靠且永久的。

综上所述,原子性在并发操作中起着至关重要的作用,它是保证数据库事务可靠性的基础。通过合理运用锁机制、事务管理器和分布式锁服务等方法,可以有效地在并发环境中保证原子性,进而与其他 ACID 属性协同工作,确保数据库系统的稳定性和数据的一致性。