Postgres to Iceberg Data Sync
Crunchy Data Warehouse supports the ability to automatically replicate data from a Crunchy Bridge standard Postgres instance into a Crunchy Data Warehouse instance–converting data in the process to Iceberg format. This capability allows you to:
- Use Postgres for your operational (OLTP) database while keeping data in sync with Crunchy Data Warehouse to enable fast analytical queries.
- Test your Postgres data inside Crunchy Data Warehouse.
- Migrate from an "accidental" Postgres data warehouse.
Support for automatic data sync into Iceberg builds on top of core PostgreSQL logical replication features in order to provide an easy and streamlined approach for setting up your Data warehouse.
Data sync concepts
Tables on the source server are grouped into a publication, basically a list of tables and policies as to what changes to replicate.
On the warehouse side, a subscription stores connection information to the publisher database, as well as which publications to replicate.
Logical replication in Postgres and Crunchy Data Warehouse operate in two main phases:
- data sync (the
copy_data
phase) - change application (the
apply
phase)
The copy_data
phase ensures that the data in the target table exists on the remote side as of the initial start, while the apply
phase ensures that all changes made since the start of the copy_data
phase get applied.
The apply
worker applies the changes that are made on the publishing side to the Iceberg tables in batches every 30 seconds, which leads to an end-to-end lag which is typically under 60 seconds. The order of transactions is preserved, so you will always have a consistent view of the data.
Create replication from dashboard
The Crunchy Bridge dashboard has a Data Sync option on the left side cluster options to create replication from one of your Crunchy Bridge Postgres instances to Iceberg. This lets you select one or more database tables to send to Iceberg.
Create replication manually
Setup on the publisher
Publication setup
In order to use logical replication to Iceberg, you must define one or more publications on the source PostgreSQL server.
You can do this by explicitly defining the set of tables in the publication via explicit list:
CREATE PUBLICATION my_pub_name FOR TABLE
table_1, table_2, table_3;
You can also define publications for entire schemas in the database like so:
CREATE PUBLICATION my_public_tables_pub FOR TABLES IN SCHEMA public;
Or even the entire database:
CREATE PUBLICATION my_all_tables_pub FOR ALL TABLES;
Note: creating ALL TABLES
or TABLES IN SCHEMA
publications require a superuser account to create the publication.
By default, publications will replicate insert
, update
, delete
, and truncate
against the source table. You can specify a different set of these, for instance just insert,truncate
to define a publication as insert-only. See the postgres docs for more details here.
Table identity
Whether update/delete commands can be replicated depends on whether the table has a primary key and whether it uses REPLICA IDENTITY FULL
. The following table outlines what is supported:
Primary Key | REPLICA IDENTITY FULL | Supported | Storage use in warehouse |
---|---|---|---|
Yes | Yes | Yes (preferred) | Medium |
Yes | No | Yes | High |
No | Yes | Only with (publish = ‘insert,truncate’) | Low |
No | No | Yes (update/delete blocked on source) | Low |
If both a Primary Key and REPLICA IDENTITY FULL
are set for a relation, Crunchy Data Warehouse can minimize data needed to track changes to the table, which is why this setting is preferred. However, it may increase the amount of WAL on the source database.
User setup
You will also need a user account that you can connect to that has permissions to read the tables that are being replicated, as well as having the REPLICATION
privilege. This can (and probably should) be a dedicated account:
-- create a new user
CREATE USER my_user WITH REPLICATION PASSWORD 'abc123';
-- or modify an existing user
ALTER USER my_user WITH REPLICATION;
-- grant appropriate permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO my_user;
This user does not need write permissions on the tables.
Remote access settings
You will need to ensure that the replication user is allowed to connect to the source database using the pg_hba.conf
settings or equivalent.
Setup on the subscriber
Once the publisher setup has been done, you will create a subscription on your Crunchy Data Warehouse instance.
Subscription setup
To start the replication, you use the regular CREATE SUBSCRIPTION command with some extra options.
-- Simple: create a subscription and auto-create Iceberg tables
CREATE SUBSCRIPTION my_sub_name
CONNECTION 'postgres://...'
PUBLICATION my_pub_name
WITH (create_tables_using = 'iceberg');
-- Advanced: Create a subscription for existing Iceberg tables, but replace their contents
-- Use advanced options to improve performance and reliability
CREATE SUBSCRIPTION my_sub_name
CONNECTION 'postgres://...'
PUBLICATION my_pub_name
WITH (overwrite_iceberg_data, streaming, binary, failover);
Most CREATE SUBSCRIPTION options are supported. We few options that are worth considering are:
failover
- if the source is Postgres 17 or above, resume replication after automatic failoverbinary
- use binary instead of text encoding in the wire protocol, often improves performancestreaming
- avoids buffering transactions on the source, though requires more storage in warehouse
Replicating to the same Iceberg table from multiple subscriptions is currently not supported.
Target table creation
To replicate data, you must have target tables. These can either be created ahead of time, or you can use the create_tables_using = 'iceberg'
option to automatically create Iceberg tables for all tables in the publication that do not exist yet.
You can also replicate into a mixture of heap and Iceberg tables, but heap tables must be created manually.
User setup
You will need a user that has permissions to create subscriptions. In Postgres, this is granted by the pg_create_subscription
permission.
For instance, you can connect to your Crunchy Data Warehouse as the superuser (postgres
) and grant permission to the application user:
GRANT pg_create_subscription TO application;
The Iceberg table can be owned by users other than the subscription owner, but tables created via create_tables_using = 'iceberg'
will have the same owner as the subscription.
Subscription options
The following are custom subscription options that can be provided in either a CREATE SUBSCRIPTION .. WITH ()
or ALTER SUBSCRIPTION ... WITH ()
statements.
Option | Valid Values | Notes |
---|---|---|
create_tables_using | 'iceberg' | Creates any missing iceberg tables |
overwrite_iceberg_data | true | Overwrites existing data in an iceberg table |
For other options consult the postgres documentation.
Monitoring
A user with the pg_monitor
privilege can monitor the status of replication into Crunchy Data Warehouse with the following views.
Subscription monitoring via crunchy_iceberg.stat_subscription
The crunchy_iceberg.stat_subscription
view displays the following fields, reflecting the state of each individual iceberg subscription:
Name | Type | Notes |
---|---|---|
subid | oid | The postgres subscription id |
create_time | timestamptz | Timestamp when this subscription was first created |
last_flush_time | timestamptz | Timestamp that the flush process last completed successfully (for any relation) |
last_flush_duration | interval | How long the last flush took to complete (in seconds) |
last_flush_lsn | pg_lsn | The LSN of the publisher for the last flushed changes. This is the “current LSN” view of the iceberg table, not counting changes that are queued to be added to the individual iceberg tables. |
Individual table monitoring via crunchy_iceberg.stat_subscription_rel
The crunchy_iceberg.stat_subscription_rel
view displays monitoring information specific to each individual table in the subscription. The fields here are:
Name | Type | Notes |
---|---|---|
subid | oid | The postgres subscription id |
table_name | regclass | The iceberg table which is the replication target |
last_flush_time | timestamptz | The last time this specific table had changes flushed |
unflushed_inserts | bigint | The number of rows that are queued for insert |
unflushed_deletes | bigint | The number of rows that are queued for delete |
rows_inserted | bigint | The total (cumulative) number of rows that have been inserted into this table |
rows_deleted | bigint | The total (cumulative) number of rows that have been deleted from this table |
copy_started | timestamptz | The timestamp when the last copy on this table started |
copy_finished | timestamptz | The timestamp when the last copy on this table finished, or NULL if the copy is still in progress |
Note that for the purposes of this accounting, an UPDATE
to a table counts as both an INSERT
and a DELETE
.
Initial data copy monitoring
You can monitor the progress of the initial copy via the standard pg_stat_progress_copy
view with some caveats. Although the bytes_processed
field gets to reflect the ongoing copy, the bytes_total
field is always 0. This is because the data is stored in the Iceberg table and not inside Postgres itself.
Sample monitoring queries
Monitoring the initial copy
The initial copy is a large portion of the initial setup. You can see the status of the initial copy via the normal pg_stat_progress_copy
and the iceberg stat_subscription_rel
views:
SELECT table_name, bytes_processed, copy_started
FROM crunchy_iceberg.stat_subscription_rel
JOIN pg_stat_progress_copy ON relid = table_name;
While an initial copy is happening this will show the number of upstream bytes read/processed; once the copy has finished this will no longer show up in this query.
You can verify that a copy has completed by looking at the copy_finished
field for the row in the crunchy_iceberg.stat_subscription_rel
:
SELECT table_name, copy_started, copy_finished
FROM crunchy_iceberg.stat_subscription_rel
ORDER BY 1;
Monitoring flush to Iceberg
The flush frequency to Iceberg is an important metric to monitor. Finding the proper balance for a workload between write frequency and flush frequency on being able to measure each. If you're having issues with the flush and your write workload, reach out to support for help.
For individual subscription you can get a good overview of the iceberg subscription performance via:
SELECT subname, last_flush_time, last_flush_duration, last_flush_lsn
FROM crunchy_iceberg.stat_subscription
JOIN pg_subscription ON pg_subscription.oid = subid;
This will tell you when the last flush occurred and how long it took. If you are not getting the expected data showing up in your table this is the first place to check. If there are unsuccessful flushes these fields will not advance.
If the last_flush_duration
is too high relative to the flush interval, you will not have enough time to apply changes to the table. You can get some details about an individual table. The stat_subscription
.last_flush_time
is updated for every subscription flush. The individual tables' last_flush_time
is only updates when that specific table has been flushed.
You can see the buildup of rows and how they are propagated into the iceberg table via the following query:
SELECT table_name, last_flush_time, unflushed_inserts, unflushed_deletes, rows_inserted, rows_deleted
FROM crunchy_iceberg.stat_subscription_rel
WHERE table_name = 'public.my_table';
As rows are flushed to Iceberg, the counters will move from "unflushed_inserts" to "rows_inserted" which is a running total of rows that have ever been inserted into the Iceberg table. This does not include the initial data copy, only rows which have been replicated.
Considerations for automated data sync
While Crunchy Data Warehouse stores Iceberg tables in remote storage, but it uses regular disk storage to stage changes and data, and for regular heap tables. We recommend that you use the same disk size on your Crunchy Data Warehouse instance as on the source PostgreSQL instance.
There are some limitations compared to regular logical replication to be aware of:
- Multiple subscriptions per table are not supported
- Generated columns are not yet replicated
- Iceberg tables must be empty or use
overwrite_iceberg_data
when a subscription is created - You cannot perform schema changes or other writes to an Iceberg table while replication is active.
Connecting Postgres to Crunchy Data Warehouse with a foreign data wrapper
In some cases, you may want to connect from Postgres to a Warehouse instance. The core of Crunchy Data Warehouse is still pure PostgreSQL, just with
extensions adding the Iceberg file handling and vectorized query engine
features. You can connect your transactional Postgres instances to your
warehouse cluster using the
Postgres foreign data wrapper
(postgres_fdw
) without the need for any third party integrations.
postgres_fdw
lets you query an external Postgres resource from the active
database you're connected to. From the perspective of this example the two sides
we'll look at are:
- Foreign instance: the one that has the standard Postgres data, perhaps a database in Crunchy Bridge. (It can alternatively be another warehouse instance)
- Destination instance: the warehouse database querying that foreign data to analyze it.
Starting on the Foreign instance, create a new user for the destination server to connect as:
CREATE USER fdw_user WITH PASSWORD 'pizza1';
GRANT SELECT, INSERT, UPDATE,
DELETE ON TABLE your_table_name TO fdw_user;
On the Crunchy Data Warehouse destination, first create the FDW extension to allow you to connect to other databases, then create the foreign server, which tells Postgres where to connect for the remote data:
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE SERVER foreigndb_fdw
FOREIGN DATA WRAPPER
postgres_fdw
OPTIONS (host 'p.vbjrfujvlw725gvi3i.db.postgresbridge.com',
port '5432', dbname 'postgres', sslmode 'require');
Next create the user mapping. This tells Postgres which user on the foreign side to connect as. In this case, all users on the destination side will connect as the same user:;
CREATE USER MAPPING for PUBLIC SERVER
foreigndb_fdw OPTIONS (user 'fdw_user', password 'pizza1');
Finally on the destination side, import the schema and limit it to the table names you want. This gives Postgres a local table definition that matches the remote table's definition and can be queried on the destination server.
IMPORT FOREIGN SCHEMA "public"
LIMIT TO(your_table_name)
FROM SERVER foreigndb_fdw INTO public;
Now that you have a foreign data wrapper configured, you can use that foreign data wrapper connection to shadow data from a standard operational database into a warehouse instance backed by optimized object storage, then start running data analytics on it with the vectorized engine:
COPY (SELECT * FROM your_table_name WHERE time = '2023-01-01')
TO 's3://cdwtestdatasets/logs_demo/log_2023_01_01.parquet'
WITH (format 'parquet');
CREATE FOREIGN TABLE analytics_demo ()
SERVER crunchy_lake_analytics options (path 's3://cdwtestdatasets/logs_demo/*.parquet');