Flink CDC - Postgres
1)部署安装Postgres服务
jiangzhongzhou@ZBMac-C02CW08SM ~ % docker pull postgres
Using default tag: latest
latest: Pulling from library/postgres
latest: Pulling from library/postgres
24c63b8dcb66: Pull complete
2bb0b7dbd861: Pull complete
...
Digest: sha256:1bf73ccae25238fa555100080042f0b2f9be08eb757e200fe6afc1fc413a1b3c
Status: Downloaded newer image for postgres:latest
docker.io/library/postgres:latest
What's Next?
View a summary of image vulnerabilities and recommendations → docker scout quickview postgres
jiangzhongzhou@ZBMac-C02CW08SM ~ % docker run -d -p 5432:5432 --name postgresql -e POSTGRES_PASSWORD=pass123 postgres
5bad03668df7dd010079ff9499c1751f83b73eba802a4972dbf3008550fe3213
此处生成postgresql实例,账户postgres/pass123,通过IDEA创建Datasource数据源查看postgress的版本信息
> select version();
PostgreSQL 16.3 (Debian 16.3-1.pgdg120+1) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
> create database test;
> create table public.poc_cdc_src(
id serial PRIMARY KEY,
last_update bigint not null default -1,
payload varchar(200)
);
> ALTER ROLE postgres replication;
> ALTER TABLE poc_cdc_src REPLICA IDENTITY FULL;
2)开启Postgress wal日志
编辑/var/lib/postgresql/data/postgresql.conf文件,修改配置项

3)开启Flink建表映射
- 全量+增量
CREATE TABLE IF NOT EXISTS postgresql_source (
id INT,
last_update bigint,
payload string,
PRIMARY KEY (id) NOT ENFORCED
) COMMENT '表注释'
WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'pass123',
'schema-name' = 'public',
'database-name' = 'test',
'table-name' = 'poc_cdc_src',
'debezium.snapshot.mode' = 'never',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'cdcslot1'
)
- 增量
CREATE TABLE IF NOT EXISTS postgresql_source (
id INT,
last_update bigint,
payload string,
PRIMARY KEY (id) NOT ENFORCED
) COMMENT '表注释'
WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'pass123',
'schema-name' = 'public',
'database-name' = 'test',
'table-name' = 'poc_cdc_src',
'debezium.snapshot.mode' = 'never',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'cdcslot1',
'scan.incremental.snapshot.enabled' = 'true',
'scan.startup.mode' = 'latest-offset'
)