PostgreSQL逻辑复制发布与订阅流程详解
PostgreSQL逻辑复制概述
PostgreSQL 的逻辑复制是一种基于逻辑层面的数据复制技术,它与物理复制不同。物理复制是基于块级别的数据复制,而逻辑复制是基于数据库对象(如表、行等)的更改进行复制。这意味着逻辑复制可以更细粒度地控制复制哪些数据,对于复杂的业务场景,如只需要复制特定表或者特定行的数据,逻辑复制提供了更大的灵活性。
逻辑复制基于发布 - 订阅模型。发布者(Publisher)将数据库中的数据更改以逻辑日志的形式发布出去,订阅者(Subscriber)通过订阅这些发布来获取数据更改并应用到自己的数据库中。这种模型使得不同数据库实例之间的数据同步变得更加灵活和可定制。
逻辑复制环境准备
安装与配置 PostgreSQL
首先,确保在发布者和订阅者节点上都安装了 PostgreSQL 数据库。假设我们使用的是基于 Linux 的系统,以 Ubuntu 为例,可以通过以下步骤安装:
- 添加 PostgreSQL 官方源:
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
- 安装 PostgreSQL:
sudo apt update
sudo apt install postgresql-13
安装完成后,需要对 PostgreSQL 进行一些基本配置。编辑 postgresql.conf
文件,通常位于 /etc/postgresql/13/main/
目录下,修改以下参数:
wal_level = logical # 开启逻辑日志记录
max_replication_slots = 10 # 设置最大复制槽数量,可根据实际需求调整
max_wal_senders = 10 # 设置最大 WAL 发送者数量,可根据实际需求调整
同时,编辑 pg_hba.conf
文件,同样位于 /etc/postgresql/13/main/
目录,添加以下内容以允许订阅者连接到发布者:
host replication replica 192.168.1.0/24 md5 # 假设订阅者位于 192.168.1.0/24 网段,replica 为复制用户
重启 PostgreSQL 服务使配置生效:
sudo systemctl restart postgresql
创建发布者和订阅者所需用户
在发布者端,创建用于复制的用户:
CREATE USER replica WITH REPLICATION LOGIN CONNECTION LIMIT 10 PASSWORD 'password';
在订阅者端,也可以创建相同的用户,或者使用已有具有合适权限的用户。确保该用户具有连接到发布者和在订阅者端执行必要操作的权限。
发布者配置
创建发布
在发布者数据库中,我们可以创建一个发布来定义要复制的数据。假设我们有一个示例数据库 testdb
,其中有一张表 employees
,表结构如下:
CREATE TABLE employees (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
salary DECIMAL(10, 2)
);
要创建一个发布,将 employees
表的更改发布出去,可以使用以下命令:
-- 创建一个名为 my_publication 的发布
CREATE PUBLICATION my_publication FOR TABLE employees;
如果要发布多个表,可以在 FOR TABLE
子句中列出多个表名,例如:
CREATE PUBLICATION my_publication FOR TABLE employees, departments;
此外,还可以使用 FOR ALL TABLES
选项发布数据库中的所有表:
CREATE PUBLICATION my_publication FOR ALL TABLES;
需要注意的是,CREATE PUBLICATION
语句不会立即开始复制数据,它只是定义了要发布的数据集合。
创建复制槽
复制槽用于跟踪发布者的逻辑日志位置,确保订阅者能够准确地获取到需要的更改。在发布者端,为订阅者创建一个复制槽:
-- 创建一个名为 my_slot 的物理复制槽
SELECT * FROM pg_create_physical_replication_slot('my_slot');
如果要创建逻辑复制槽,使用以下命令:
-- 创建一个名为 my_logical_slot 的逻辑复制槽
SELECT * FROM pg_create_logical_replication_slot('my_logical_slot', 'pgoutput');
这里的 pgoutput
是逻辑解码插件,它将逻辑日志转换为一种订阅者可以理解的格式。PostgreSQL 还提供了其他逻辑解码插件,如 wal2json
,可以将逻辑日志转换为 JSON 格式。
订阅者配置
创建订阅
在订阅者数据库中,创建一个订阅来连接到发布者并接收数据。假设订阅者数据库也为 testdb
,执行以下命令:
-- 创建一个名为 my_subscription 的订阅,连接到发布者的 my_publication 发布
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=192.168.1.100 port=5432 user=replica password=password dbname=testdb'
PUBLICATION my_publication;
在 CONNECTION
参数中,指定发布者的主机地址、端口、用户名、密码和数据库名。这里假设发布者的 IP 地址为 192.168.1.100
。
初始化订阅数据
创建订阅后,默认情况下,订阅者不会自动获取发布者已有的数据,只会接收创建订阅之后的更改。如果需要初始化订阅者的数据,使其与发布者的当前状态一致,可以使用 IMPORT MAPPED
选项:
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=192.168.1.100 port=5432 user=replica password=password dbname=testdb'
PUBLICATION my_publication
WITH (IMPORT MAPPED);
这将从发布者导入已有的数据,并在订阅者上创建相应的表结构和数据。
数据操作与复制验证
发布者端数据操作
在发布者端的 employees
表中插入一些数据:
INSERT INTO employees (name, salary) VALUES ('Alice', 5000.00), ('Bob', 6000.00);
更新数据:
UPDATE employees SET salary = 5500.00 WHERE name = 'Alice';
删除数据:
DELETE FROM employees WHERE name = 'Bob';
订阅者端数据验证
在订阅者端查询 employees
表,应该能够看到与发布者端相应的数据更改:
SELECT * FROM employees;
如果配置正确,订阅者将及时接收到发布者的数据更改并应用到本地表中。
高级逻辑复制配置
过滤数据
有时候,我们可能只需要复制特定条件的数据。在发布者端,可以通过创建一个基于查询的发布来实现数据过滤。例如,只发布 salary
大于 5000 的员工数据:
-- 创建一个基于查询的发布
CREATE PUBLICATION filtered_publication
FOR TABLE employees
WHERE (salary > 5000);
在订阅者端,创建订阅连接到这个过滤后的发布:
CREATE SUBSCRIPTION filtered_subscription
CONNECTION 'host=192.168.1.100 port=5432 user=replica password=password dbname=testdb'
PUBLICATION filtered_publication;
这样,订阅者只会接收到满足 salary > 5000
条件的员工数据。
多订阅者与级联复制
PostgreSQL 支持多个订阅者连接到同一个发布者。每个订阅者可以独立配置,并且可以根据自身需求选择不同的发布。例如,我们可以有一个订阅者只关注 employees
表,而另一个订阅者关注 departments
表。
此外,还可以实现级联复制,即一个订阅者作为另一个订阅者的发布者。假设我们有三个节点:节点 A 是主发布者,节点 B 是第一个订阅者同时作为第二个发布者,节点 C 是第二个订阅者。
在节点 A 上创建发布:
CREATE PUBLICATION a_publication FOR TABLE employees;
在节点 B 上创建订阅连接到节点 A 的发布,并创建自己的发布:
-- 创建订阅连接到节点 A
CREATE SUBSCRIPTION b_subscription
CONNECTION 'host=192.168.1.100 port=5432 user=replica password=password dbname=testdb'
PUBLICATION a_publication;
-- 创建节点 B 的发布
CREATE PUBLICATION b_publication FOR TABLE employees;
在节点 C 上创建订阅连接到节点 B 的发布:
CREATE SUBSCRIPTION c_subscription
CONNECTION 'host=192.168.1.101 port=5432 user=replica password=password dbname=testdb'
PUBLICATION b_publication;
这样就实现了级联复制,节点 C 通过节点 B 间接获取到节点 A 的数据更改。
处理冲突
在逻辑复制过程中,可能会出现数据冲突的情况,例如订阅者在应用数据更改时,本地已经存在相同主键但数据不同的记录。PostgreSQL 提供了一些机制来处理这些冲突。
一种常见的方法是使用 ON CONFLICT
子句。在订阅者端创建表时,可以定义冲突处理策略。例如,对于 employees
表,如果出现主键冲突,我们选择更新其他列:
CREATE TABLE employees (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
salary DECIMAL(10, 2)
) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, salary = EXCLUDED.salary;
这样,当订阅者接收到的数据与本地数据发生主键冲突时,会按照定义的策略进行更新操作。
监控与维护
监控复制状态
在发布者端,可以使用 pg_stat_replication
视图来监控复制连接状态:
SELECT * FROM pg_stat_replication;
该视图会显示当前正在进行的复制连接信息,包括连接的订阅者、复制槽、发送和接收的字节数等。
在订阅者端,可以使用 pg_stat_subscription
视图来监控订阅状态:
SELECT * FROM pg_stat_subscription;
此视图提供了订阅的详细信息,如最后一次同步时间、延迟等。
维护复制槽
复制槽会占用发布者的资源,因此需要定期维护。如果一个订阅者不再使用,应该删除相应的复制槽。在发布者端,可以使用以下命令删除复制槽:
-- 删除名为 my_slot 的复制槽
SELECT pg_drop_replication_slot('my_slot');
同样,如果要删除逻辑复制槽:
-- 删除名为 my_logical_slot 的逻辑复制槽
SELECT pg_drop_replication_slot('my_logical_slot');
另外,如果发布者上的复制槽长时间处于非活动状态,可能是订阅者出现了问题。可以通过监控复制槽状态来及时发现并解决这些问题。
故障恢复
在逻辑复制过程中,如果发布者或订阅者出现故障,需要进行相应的恢复操作。
如果发布者出现故障,在恢复后,需要确保逻辑日志的连续性。可以通过检查 pg_wal
目录下的日志文件,并使用 pg_receivewal
工具来同步日志。假设发布者的 WAL 日志目录为 /var/lib/postgresql/13/main/pg_wal
,执行以下命令:
pg_receivewal -D /var/lib/postgresql/13/main/pg_wal -h 192.168.1.100 -U replica -S my_slot
这里 -h
指定发布者的主机地址,-U
指定复制用户,-S
指定复制槽。
如果订阅者出现故障,在恢复后,可以通过重新创建订阅或者从备份中恢复数据,然后重新同步来恢复复制。例如,删除并重新创建订阅:
-- 删除订阅
DROP SUBSCRIPTION my_subscription;
-- 重新创建订阅
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=192.168.1.100 port=5432 user=replica password=password dbname=testdb'
PUBLICATION my_publication
WITH (IMPORT MAPPED);
通过以上步骤,可以有效地配置、管理和维护 PostgreSQL 的逻辑复制,确保数据在不同数据库实例之间的准确同步。无论是简单的单发布者 - 单订阅者场景,还是复杂的多订阅者、级联复制场景,PostgreSQL 的逻辑复制功能都能提供强大而灵活的解决方案。同时,合理的监控和维护措施可以保证复制的稳定性和可靠性,及时发现并解决可能出现的问题。