Postgres to Iceberg Data Sync

Crunchy Data Warehouse supports the ability to automatically replicate data from a Crunchy Bridge standard Postgres instance into a Crunchy Data Warehouse instance–converting data in the process to Iceberg format. This capability allows you to:

  • Use Postgres for your operational (OLTP) database while keeping data in sync with Crunchy Data Warehouse to enable fast analytical queries.
  • Test your Postgres data inside Crunchy Data Warehouse.
  • Migrate from an "accidental" Postgres data warehouse.

Support for automatic data sync into Iceberg builds on top of core PostgreSQL logical replication features in order to provide an easy and streamlined approach for setting up your Data warehouse.

Data sync concepts

Tables on the source server are grouped into a publication, basically a list of tables and policies as to what changes to replicate.

On the warehouse side, a subscription stores connection information to the publisher database, as well as which publications to replicate.

Logical replication in Postgres and Crunchy Data Warehouse operate in two main phases:

  • data sync (the copy_data phase)
  • change application (the apply phase)

The copy_data phase ensures that the data in the target table exists on the remote side as of the initial start, while the apply phase ensures that all changes made since the start of the copy_data phase get applied.

The apply worker applies the changes that are made on the publishing side to the Iceberg tables in batches every 30 seconds, which leads to an end-to-end lag which is typically under 60 seconds. The order of transactions is preserved, so you will always have a consistent view of the data.

Create replication from dashboard

The Crunchy Bridge dashboard has a Data Sync option on the left side cluster options to create replication from one of your Crunchy Bridge Postgres instances to Iceberg. This lets you select one or more database tables to send to Iceberg.

Create replication manually

Setup on the publisher

Publication setup

In order to use logical replication to Iceberg, you must define one or more publications on the source PostgreSQL server.

You can do this by explicitly defining the set of tables in the publication via explicit list:

CREATE PUBLICATION my_pub_name FOR TABLE
    table_1, table_2, table_3;

You can also define publications for entire schemas in the database like so:

CREATE PUBLICATION my_public_tables_pub FOR TABLES IN SCHEMA public;

Or even the entire database:

CREATE PUBLICATION my_all_tables_pub FOR ALL TABLES;

Note: creating ALL TABLES or TABLES IN SCHEMA publications require a superuser account to create the publication.

By default, publications will replicate insert, update, delete, and truncate against the source table. You can specify a different set of these, for instance just insert,truncate to define a publication as insert-only. See the postgres docs for more details here.

Table identity

Whether update/delete commands can be replicated depends on whether the table has a primary key and whether it uses REPLICA IDENTITY FULL. The following table outlines what is supported:

Primary KeyREPLICA IDENTITY FULLSupportedStorage use in warehouse
YesYesYes (preferred)Medium
YesNoYesHigh
NoYesOnly with (publish = ‘insert,truncate’)Low
NoNoYes (update/delete blocked on source)Low

If both a Primary Key and REPLICA IDENTITY FULL are set for a relation, Crunchy Data Warehouse can minimize data needed to track changes to the table, which is why this setting is preferred. However, it may increase the amount of WAL on the source database.

User setup

You will also need a user account that you can connect to that has permissions to read the tables that are being replicated, as well as having the REPLICATION privilege. This can (and probably should) be a dedicated account:

-- create a new user
CREATE USER my_user WITH REPLICATION PASSWORD 'abc123';

-- or modify an existing user
ALTER USER my_user WITH REPLICATION;

-- grant appropriate permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO my_user;

This user does not need write permissions on the tables.

Remote access settings

You will need to ensure that the replication user is allowed to connect to the source database using the pg_hba.conf settings or equivalent.

Setup on the subscriber

Once the publisher setup has been done, you will create a subscription on your Crunchy Data Warehouse instance.

Subscription setup

To start the replication, you use the regular CREATE SUBSCRIPTION command with some extra options.

-- Simple: create a subscription and auto-create Iceberg tables
CREATE SUBSCRIPTION my_sub_name
CONNECTION 'postgres://...'
PUBLICATION my_pub_name
WITH (create_tables_using = 'iceberg');

-- Advanced: Create a subscription for existing Iceberg tables, but replace their contents
-- Use advanced options to improve performance and reliability
CREATE SUBSCRIPTION my_sub_name
CONNECTION 'postgres://...'
PUBLICATION my_pub_name
WITH (overwrite_iceberg_data, streaming, binary, failover);

Most CREATE SUBSCRIPTION options are supported. We few options that are worth considering are:

  • failover - if the source is Postgres 17 or above, resume replication after automatic failover
  • binary - use binary instead of text encoding in the wire protocol, often improves performance
  • streaming - avoids buffering transactions on the source, though requires more storage in warehouse

Replicating to the same Iceberg table from multiple subscriptions is currently not supported.

Target table creation

To replicate data, you must have target tables. These can either be created ahead of time, or you can use the create_tables_using = 'iceberg' option to automatically create Iceberg tables for all tables in the publication that do not exist yet.

You can also replicate into a mixture of heap and Iceberg tables, but heap tables must be created manually.

User setup

You will need a user that has permissions to create subscriptions. In Postgres, this is granted by the pg_create_subscription permission.

For instance, you can connect to your Crunchy Data Warehouse as the superuser (postgres) and grant permission to the application user:

GRANT pg_create_subscription TO application;

The Iceberg table can be owned by users other than the subscription owner, but tables created via create_tables_using = 'iceberg' will have the same owner as the subscription.

Subscription options

The following are custom subscription options that can be provided in either a CREATE SUBSCRIPTION .. WITH () or ALTER SUBSCRIPTION ... WITH () statements.

OptionValid ValuesNotes
create_tables_using'iceberg'Creates any missing iceberg tables
overwrite_iceberg_datatrueOverwrites existing data in an iceberg table

For other options consult the postgres documentation.

Monitoring

A user with the pg_monitor privilege can monitor the status of replication into Crunchy Data Warehouse with the following views.

Subscription monitoring via crunchy_iceberg.stat_subscription

The crunchy_iceberg.stat_subscription view displays the following fields, reflecting the state of each individual iceberg subscription:

NameTypeNotes
subidoidThe postgres subscription id
create_timetimestamptzTimestamp when this subscription was first created
last_flush_timetimestamptzTimestamp that the flush process last completed successfully (for any relation)
last_flush_durationintervalHow long the last flush took to complete (in seconds)
last_flush_lsnpg_lsnThe LSN of the publisher for the last flushed changes. This is the “current LSN” view of the iceberg table, not counting changes that are queued to be added to the individual iceberg tables.

Individual table monitoring via crunchy_iceberg.stat_subscription_rel

The crunchy_iceberg.stat_subscription_rel view displays monitoring information specific to each individual table in the subscription. The fields here are:

NameTypeNotes
subidoidThe postgres subscription id
table_nameregclassThe iceberg table which is the replication target
last_flush_timetimestamptzThe last time this specific table had changes flushed
unflushed_insertsbigintThe number of rows that are queued for insert
unflushed_deletesbigintThe number of rows that are queued for delete
rows_insertedbigintThe total (cumulative) number of rows that have been inserted into this table
rows_deletedbigintThe total (cumulative) number of rows that have been deleted from this table
copy_startedtimestamptzThe timestamp when the last copy on this table started
copy_finishedtimestamptzThe timestamp when the last copy on this table finished, or NULL if the copy is still in progress

Note that for the purposes of this accounting, an UPDATE to a table counts as both an INSERT and a DELETE.

Initial data copy monitoring

You can monitor the progress of the initial copy via the standard pg_stat_progress_copy view with some caveats. Although the bytes_processed field gets to reflect the ongoing copy, the bytes_total field is always 0. This is because the data is stored in the Iceberg table and not inside Postgres itself.

Sample monitoring queries

Monitoring the initial copy

The initial copy is a large portion of the initial setup. You can see the status of the initial copy via the normal pg_stat_progress_copy and the iceberg stat_subscription_rel views:

SELECT table_name, bytes_processed, copy_started 
FROM crunchy_iceberg.stat_subscription_rel 
JOIN pg_stat_progress_copy ON relid = table_name;

While an initial copy is happening this will show the number of upstream bytes read/processed; once the copy has finished this will no longer show up in this query.

You can verify that a copy has completed by looking at the copy_finished field for the row in the crunchy_iceberg.stat_subscription_rel:

SELECT table_name, copy_started, copy_finished 
FROM crunchy_iceberg.stat_subscription_rel 
ORDER BY 1;

Monitoring flush to Iceberg

The flush frequency to Iceberg is an important metric to monitor. Finding the proper balance for a workload between write frequency and flush frequency on being able to measure each. If you're having issues with the flush and your write workload, reach out to support for help.

For individual subscription you can get a good overview of the iceberg subscription performance via:

SELECT subname, last_flush_time, last_flush_duration, last_flush_lsn 
FROM crunchy_iceberg.stat_subscription 
JOIN pg_subscription ON pg_subscription.oid = subid;

This will tell you when the last flush occurred and how long it took. If you are not getting the expected data showing up in your table this is the first place to check. If there are unsuccessful flushes these fields will not advance.

If the last_flush_duration is too high relative to the flush interval, you will not have enough time to apply changes to the table. You can get some details about an individual table. The stat_subscription.last_flush_time is updated for every subscription flush. The individual tables' last_flush_time is only updates when that specific table has been flushed.

You can see the buildup of rows and how they are propagated into the iceberg table via the following query:

SELECT table_name, last_flush_time, unflushed_inserts, unflushed_deletes, rows_inserted, rows_deleted 
FROM crunchy_iceberg.stat_subscription_rel 
WHERE table_name = 'public.my_table';

As rows are flushed to Iceberg, the counters will move from "unflushed_inserts" to "rows_inserted" which is a running total of rows that have ever been inserted into the Iceberg table. This does not include the initial data copy, only rows which have been replicated.

Considerations for automated data sync

While Crunchy Data Warehouse stores Iceberg tables in remote storage, but it uses regular disk storage to stage changes and data, and for regular heap tables. We recommend that you use the same disk size on your Crunchy Data Warehouse instance as on the source PostgreSQL instance.

There are some limitations compared to regular logical replication to be aware of:

  • Multiple subscriptions per table are not supported
  • Generated columns are not yet replicated
  • Iceberg tables must be empty or use overwrite_iceberg_data when a subscription is created
  • You cannot perform schema changes or other writes to an Iceberg table while replication is active.

Connecting Postgres to Crunchy Data Warehouse with a foreign data wrapper

In some cases, you may want to connect from Postgres to a Warehouse instance. The core of Crunchy Data Warehouse is still pure PostgreSQL, just with extensions adding the Iceberg file handling and vectorized query engine features. You can connect your transactional Postgres instances to your warehouse cluster using the Postgres foreign data wrapper (postgres_fdw) without the need for any third party integrations.

postgres_fdw lets you query an external Postgres resource from the active database you're connected to. From the perspective of this example the two sides we'll look at are:

  • Foreign instance: the one that has the standard Postgres data, perhaps a database in Crunchy Bridge. (It can alternatively be another warehouse instance)
  • Destination instance: the warehouse database querying that foreign data to analyze it.

Starting on the Foreign instance, create a new user for the destination server to connect as:

CREATE USER fdw_user WITH PASSWORD 'pizza1';
GRANT SELECT, INSERT, UPDATE,
DELETE ON TABLE your_table_name TO fdw_user;

On the Crunchy Data Warehouse destination, first create the FDW extension to allow you to connect to other databases, then create the foreign server, which tells Postgres where to connect for the remote data:

CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE SERVER foreigndb_fdw
FOREIGN DATA WRAPPER
postgres_fdw
OPTIONS (host 'p.vbjrfujvlw725gvi3i.db.postgresbridge.com',
port '5432', dbname 'postgres', sslmode 'require');

Next create the user mapping. This tells Postgres which user on the foreign side to connect as. In this case, all users on the destination side will connect as the same user:;

CREATE USER MAPPING for PUBLIC SERVER
foreigndb_fdw OPTIONS (user 'fdw_user', password 'pizza1');

Finally on the destination side, import the schema and limit it to the table names you want. This gives Postgres a local table definition that matches the remote table's definition and can be queried on the destination server.

IMPORT FOREIGN SCHEMA "public"
LIMIT TO(your_table_name)
FROM SERVER foreigndb_fdw INTO public;

Now that you have a foreign data wrapper configured, you can use that foreign data wrapper connection to shadow data from a standard operational database into a warehouse instance backed by optimized object storage, then start running data analytics on it with the vectorized engine:

COPY (SELECT * FROM your_table_name WHERE time = '2023-01-01')
TO 's3://cdwtestdatasets/logs_demo/log_2023_01_01.parquet'
WITH (format 'parquet');

CREATE FOREIGN TABLE analytics_demo ()
SERVER crunchy_lake_analytics options (path 's3://cdwtestdatasets/logs_demo/*.parquet');