Flink CDC - Postgres

1)部署安装Postgres服务

jiangzhongzhou@ZBMac-C02CW08SM ~ % docker pull postgres 
Using default tag: latest
latest: Pulling from library/postgres
latest: Pulling from library/postgres
24c63b8dcb66: Pull complete 
2bb0b7dbd861: Pull complete 
...
Digest: sha256:1bf73ccae25238fa555100080042f0b2f9be08eb757e200fe6afc1fc413a1b3c
Status: Downloaded newer image for postgres:latest
docker.io/library/postgres:latest

What's Next?
  View a summary of image vulnerabilities and recommendations → docker scout quickview postgres
  
jiangzhongzhou@ZBMac-C02CW08SM ~ % docker run -d -p 5432:5432 --name postgresql -e POSTGRES_PASSWORD=pass123 postgres

5bad03668df7dd010079ff9499c1751f83b73eba802a4972dbf3008550fe3213

此处生成postgresql实例,账户postgres/pass123,通过IDEA创建Datasource数据源查看postgress的版本信息

> select version();
PostgreSQL 16.3 (Debian 16.3-1.pgdg120+1) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
> create database test;
> create table public.poc_cdc_src(
    id serial PRIMARY KEY,
    last_update bigint not null default -1,
    payload varchar(200)
);
> ALTER ROLE postgres replication;
> ALTER TABLE poc_cdc_src REPLICA IDENTITY FULL;

2)开启Postgress wal日志
编辑/var/lib/postgresql/data/postgresql.conf文件,修改配置项
wal日志逻辑
3)开启Flink建表映射

  • 全量+增量
CREATE TABLE IF NOT EXISTS postgresql_source (
              id INT,
              last_update bigint,
              payload string,
              PRIMARY KEY (id) NOT ENFORCED
) COMMENT '表注释'
WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'localhost',
    'port' = '5432',
    'username' = 'postgres',
    'password' = 'pass123',
    'schema-name' = 'public',
    'database-name' = 'test',
    'table-name' = 'poc_cdc_src',
    'debezium.snapshot.mode' = 'never',
    'decoding.plugin.name' = 'pgoutput',
    'slot.name' = 'cdcslot1'
)
  • 增量
CREATE TABLE IF NOT EXISTS postgresql_source (
              id INT,
              last_update bigint,
              payload string,
              PRIMARY KEY (id) NOT ENFORCED
) COMMENT '表注释'
WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'localhost',
    'port' = '5432',
    'username' = 'postgres',
    'password' = 'pass123',
    'schema-name' = 'public',
    'database-name' = 'test',
    'table-name' = 'poc_cdc_src',
    'debezium.snapshot.mode' = 'never',
    'decoding.plugin.name' = 'pgoutput',
    'slot.name' = 'cdcslot1',
    'scan.incremental.snapshot.enabled' = 'true',
    'scan.startup.mode' = 'latest-offset'
)