Iceberg tables

Iceberg tables are transactional, columnar tables stored in object storage, optimized for fast analytics. By default, they are stored in the managed object storage that comes with the cluster. They can also be hosted in a user-defined location as long as it is in the same region as the cluster.

Creating an Iceberg table

Iceberg tables can be created via the regular CREATE TABLE syntax by appending using iceberg (or its alias using crunchy_iceberg).

For instance, here’s how to create an Iceberg table and insert a row:

-- create a new Iceberg table in managed storage
create table measurements (
  station_name text not null,
  measurement double precision not null
)
using iceberg;

insert into measurements values ('Istanbul', 18.5);

You can also create an Iceberg table in your own S3 bucket, as long as it is in the same region as your Crunchy Data Warehouse cluster. Note that you must have set your credentials to be able to access your private bucket.

-- create a table in your own S3 bucket (must be in same region)
create table measurements (
  station_name text not null,
  measurement double precision not null
)
using iceberg with (location = 's3://mybucket/measurements/');

-- or, configure the default_location_prefix for the session or the user (requires superuser)
set crunchy_iceberg.default_location_prefix to 's3://mybucket/iceberg';

create table measurements (
  station_name text not null,
  measurement double precision not null
)
using iceberg;

Creating an Iceberg table from a query result is also supported:

-- create a table in a specific bucket from a query result
create table measurements
using iceberg with (location = 's3://mybucket/measurements/');
as select md5(s::text), s from generate_series(1,1000000) s;

Finally, you can create an Iceberg table from a file using the load_from or definition_from option:

-- Convert a file directly into an Iceberg table
create table taxi_yellow ()
using iceberg
with (load_from = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet');

-- or, inherit the columns from a file, but do not load any data (yet)
create table taxi_yellow ()
using iceberg
with (definition_from = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet');

Iceberg tables support the following options when creating the table:

OptionDescription
locationURL prefix for the Iceberg table (e.g. 's3://mybucket/measurements')

Additionally, when creating the Iceberg table from a file, the following options are supported along with the format-specific options listed in the Data lake formats section:

OptionDescription
formatThe format of the file to load and/or infer columns from
definition_fromURL of a file to infer the columns from
load_fromURL of a file to infer the columns from and immediately load into the table

When creating an Iceberg table, you can use various advanced PostgreSQL features, including:

Consult the PostgreSQL documentation on how to use each feature.

There are a few limitations to be aware of:

  • Collations, partitioning, and inheritance work as expected, but may result in suboptimal query plans.
  • Numeric types that do not specify a precision and scale are treated as numeric(38,9).
  • Numeric values cannot be NaN or infinite.
  • Some types are not yet supported as column types in Iceberg tables:
    • Numeric with precision > 38
    • Intervals
    • Other tables used as column types.
    • Not all geometry types are supported in Iceberg tables (only point, linestring, polygon, multipoint, multilinestring, multipolygon, geometrycollection), and geometry types cannot be nested in an array or composite type.
  • Custom base types other than geometry are stored using their text representation, which may lead to suboptimal performance.

Loading data into an Iceberg table

There are a few different ways of loading data into an existing Iceberg table:

  1. COPY ... FROM '<url>' - Load data from S3 or a http(s) URL
  2. COPY ... FROM STDIN - Load data from the client (\copy in psql)
  3. INSERT INTO ... SELECT - Load data from a query result
  4. INSERT INTO ... VALUES - Insert individual row values

It is recommended to load data in batches. Each statement creates one or more new Parquet files and adds them to the Iceberg table. Doing single row inserts will result in many small Parquet files, which can slow down queries.

If your applications needs to do single row inserts, it is recommended to create a staging table and periodically flush new batches into the Iceberg table:

-- create a staging table
create table measurements_staging (like measurements);

-- do fast inserts on a staging table
insert into measurements_staging values ('Haarlem', 9.3);

-- periodically move all the rows from a queue table into Iceberg using pg_cron
select cron.schedule('flush-queue', '* * * * *', $$
  with new_rows as (
    delete from measurements_staging returning *
  )
  insert into measurements select * from new_rows;
$$);

Whether or not you use batches or a staging table, it is important to regularly vacuum your table to optimize the file sizes.

Query pushdown with Iceberg tables

Crunchy Data Warehouse extends PostgreSQL with a vectorized query engine designed to accelerate analytics tasks. Vectorized execution improves efficiency by processing data in batches, improving computational throughput. Performance is improved by pushing down query computation to this engine when possible.

How query pushdown works

When computations are pushed down, they are processed directly within the vectorized query engine. However, if certain computations cannot be handled by the vectorized engine, they are executed normally in PostgreSQL instead.

Monitoring query pushdown

To monitor which parts of your query are pushed down, you can use the EXPLAIN command with the VERBOSE option. This command shows how the query is executed, showing whether elements are processed by the vectorized engine or on the Postgres server. Look for Vectorized SQL in the output to see what is executed by the vectorized engine. If you see Custom Scan (Query Pushdown) in the output, you are all set, the computation is delegated to the vectorized engine.

Full pushdown example

Here is an example where the entire computation is pushed down:

EXPLAIN (VERBOSE) SELECT inv_warehouse_sk, count(*) FROM inventory GROUP BY inv_warehouse_sk ORDER BY count(*) DESC;
                                              QUERY PLAN
-------------------------------------------------------------------------------------------------------
 Custom Scan (Query Pushdown)  (cost=0.00..0.00 rows=0 width=0)
   Output: pushdown_query.inv_warehouse_sk, pushdown_query.count
   Engine: DuckDB
   Vectorized SQL:  SELECT inv_warehouse_sk,
     count(*) AS count
    FROM public.inventory inventory(inv_date_sk, inv_item_sk, inv_warehouse_sk, inv_quantity_on_hand)
   GROUP BY inv_warehouse_sk
   ORDER BY (count(*)) DESC
   ->  PROJECTION
         Estimated Cardinality: 66555000
         ->  ORDER_BY
               Order By: count_star() DESC
               ->  PROJECTION
                     Estimated Cardinality: 66555000
                     ->  PROJECTION
                           Estimated Cardinality: 66555000
                           ->  PERFECT_HASH_GROUP_BY
                                 Groups: #0
                                 Aggregates: count_star()
                                 ->  PROJECTION
                                       Projections: inv_warehouse_sk
                                       Estimated Cardinality: 133110000
                                       ->  PROJECTION
                                             Projections: __internal_compress_integral_utinyint(#0, 1)
                                             Estimated Cardinality: 133110000
                                             ->  READ_PARQUET
                                                   Function: READ_PARQUET
                                                   Projections: inv_warehouse_sk
                                                   Estimated Cardinality: 133110000
 Query Identifier: -4465180302191104329
(30 rows)

Partial pushdown example

In this example not all computations are pushed down:

EXPLAIN (VERBOSE) SELECT count(*) FROM inventory WHERE width_bucket(inv_item_sk, 0.024, 10.06, 5)  > 19 ;
                                              QUERY PLAN
------------------------------------------------------------------------------------------------------
 Aggregate  (cost=128.33..128.34 rows=1 width=8)
   Output: count(*)
   ->  Foreign Scan on public.inventory  (cost=100.00..127.50 rows=333 width=0)
         Output: inv_date_sk, inv_item_sk, inv_warehouse_sk, inv_quantity_on_hand
         Filter: (width_bucket((inventory.inv_item_sk)::numeric, 0.024, 10.06, 5) > 19)
         Engine: DuckDB
         Vectorized SQL:  SELECT inv_item_sk
    FROM public.inventory inventory(inv_date_sk, inv_item_sk, inv_warehouse_sk, inv_quantity_on_hand)
         ->  READ_PARQUET
               Function: READ_PARQUET
               Projections: inv_item_sk
               Estimated Cardinality: 133110000

In the example above, the width_bucket function is not yet supported by the vectorized engine, hence it is executed on the Postgres server.

Crunchy Data Warehouse currently enables vectorized query execution for nearly all SQL, with a few exceptions. One gap in pushing computations down to the vectorized engine involves certain functions and operators, such as the width_bucket function.

Future versions of Data Warehouse aim to support more SQL functions and operators within the vectorized engine. Reach out to Support if there are specific functions or operators you need for your use case.

Making Iceberg the default table format

It is possible to make all create table statements automatically use Iceberg:

set default_table_access_method to 'iceberg';

-- automatically created as Iceberg
create table users (userid bigint, username text, email text);

It can be useful to assign default_table_access_method to a specific database user for tools that are unaware of Iceberg (see below). However, some PostgreSQL features such as temporary and unlogged tables stop working unless you explicitly add using heap.

Copy external PostgreSQL tables to Iceberg

A big advantage of being able to change the default_table_access_method to Iceberg is that it can significantly simplify data migrations.

For instance, you can connect to your Crunchy Data Warehouse cluster as the postgres superuser and (temporarily) make the following configuration change to a regular user to make all create table statements use Iceberg by default:

alter user application set default_table_access_method to 'iceberg';

You can then replicate a PostgreSQL table from a regular database cluster on Crunchy Bridge or a third party service (e.g. Amazon RDS, Azure Database for PostgreSQL, Google Cloud SQL) into Iceberg using pg_dump and psql:

pg_dump --table=my_table --section=pre-data --section=data --no-table-access-method --no-owner --clean \
  | psql postgres://[email protected]:5432/postgres

Several important pg_dump options are shown in the above command:

  • --section=pre-data and --section=data are required to ensure that we get the create table statement and a copy statement with data, but not other objects like indexes, which are not supported on Iceberg.
  • --no-table-access-method is required to prevent pg_dump from setting the default_table_access_method to the original value (usually heap), which would override the user-level setting (Iceberg).
  • --no-owner is not required, but useful if database users and access controls do not match between source and destination.
  • --clean is needed only if you want to replace the original table.

With these options, you can replicate from any heap table on any PostgreSQL server to Iceberg managed by Crunchy Data Warehouse in a single step.

Viewing the Iceberg catalog

After creating a table, the Iceberg table appears in the iceberg_tables view:

-- query the main Iceberg catalog table
select catalog_name, table_namespace, table_name, metadata_location from iceberg_tables;

┌──────────────┬─────────────────┬──────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ catalog_name │ table_namespace │  table_name  │                                                       metadata_location                                                        │
├──────────────┼─────────────────┼──────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ postgres     │ public          │ measurements │ s3://marco-crunchy-data/iceberg/postgres/public/measurements/metadata/00003-6403833e-0766-4496-ad47-ec9641ee965f.metadata.json │
└──────────────┴─────────────────┴──────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

The schema of this view matches the expectations of the Iceberg catalog JDBC driver, and the SQL catalog drivers of pyiceberg and iceberg-rust. That means that Spark and other Iceberg tools can directly access the latest version of your Iceberg tables by connecting to Crunchy Data Warehouse as a catalog.

For Iceberg tables created via PostgreSQL, the catalog_name value will always match the database name. If the database is renamed, the catalog_name will change as well. External drivers cannot yet write to Iceberg tables created via PostgreSQL. They can create Iceberg tables with other catalog names, but those do not have a corresponding PostgreSQL table in the database.

Inspecting an Iceberg table

Iceberg tables are internally represented by foreign tables, but they can mostly be used as though they were regular tables.

If you show all the tables in psql using \d+ , the Iceberg table will show up with Type “foreign table”. It will also show the total size of the data files that are currently part of the table.

postgres=> \d+
                                                        List of relations
┌────────┬─────────────────────────┬───────────────┬───────────────────┬─────────────┬───────────────┬────────────┬─────────────┐
│ Schema │          Name           │     Type      │       Owner       │ Persistence │ Access method │    Size    │ Description │
├────────┼─────────────────────────┼───────────────┼───────────────────┼─────────────┼───────────────┼────────────┼─────────────┤
│ public │ measurements            │ foreign table │ application       │ permanent   │               │ 5238 MB    │             │
│ public │ taxi_yellow             │ foreign table │ application       │ permanent   │               │ 92 MB      │             │
└────────┴─────────────────────────┴───────────────┴───────────────────┴─────────────┴───────────────┴────────────┴─────────────┘

You can also get the size from the pg_table_size function, thoughpg_total_relation_size is not yet implemented and returns 0.

postgres=> select pg_table_size('measurements');
┌───────────────┐
│ pg_table_size │
├───────────────┤
│    5492615447 │
└───────────────┘
(1 row)

You can also inspect the columns of the table with \d table_name when using psql.

postgres=> \d measurements
                       Foreign table "public.measurements"
┌──────────────┬──────────────────┬───────────┬──────────┬─────────┬─────────────┐
│    ColumnType       │ Collation │ Nullable │ Default │ FDW options │
├──────────────┼──────────────────┼───────────┼──────────┼─────────┼─────────────┤
│ station_name │ text             │           │ not null │         │             │
│ measurement  │ double precision │           │ not null │         │             │
└──────────────┴──────────────────┴───────────┴──────────┴─────────┴─────────────┘
Server: crunchy_iceberg
FDW options: (location 's3://marco-crunchy-data/iceberg/postgres/public/measurements')

To inspect the Iceberg table metadata, you can use the crunchy_iceberg.metadata(url text) function:

-- Returns metadata JSONB according to the Iceberg spec
select crunchy_iceberg.metadata(metadata_location) metadata from iceberg_tables
where table_name = 'measurements';

{ ... }

-- For instance, see the current schema in the Iceberg metadata
with ice as (select crunchy_iceberg.metadata(metadata_location) metadata from iceberg_tables where table_name = 'measurements')
select jsonb_pretty(metadata->'schemas'->(metadata->>'current-schema-id')::int) from ice;

┌─────────────────────────────────────┐
│            jsonb_pretty             │
├─────────────────────────────────────┤
│ {                                  ↵│
│     "type": "struct",              ↵│
│     "fields": [                    ↵│
│         {                          ↵│
│             "id": 1,               ↵│
│             "name": "station_name",↵│
│             "type": "string",      ↵│
│             "required": true       ↵│
│         },                         ↵│
│         {                          ↵│
│             "id": 2,               ↵│
│             "name": "measurement", ↵│
│             "type": "double",      ↵│
│             "required": true       ↵│
│         }                          ↵│
│     ],                             ↵│
│     "schema-id": 2                 ↵│
│ }                                   │
└─────────────────────────────────────┘
(1 row)

To see which files are currently part of the table, you can use the crunchy_iceberg.files(metadata_url text) function:

-- list all the data files in an Iceberg table
select
  manifest_path,
  content,
  file_path,
  file_format,
  spec_id,
  record_count,
  file_size_in_bytes
from
  iceberg_tables,
  crunchy_iceberg.files(metadata_location) f
where
  table_name = 'measurements';

Update/delete on an Iceberg table

Crunchy Data Warehouse can perform advanced update/delete queries in Iceberg tables, including modifying CTEs and updates/deletes with joins or subqueries.

-- delete rows from one table and move them to another table
with deleted_rows as (
   delete from user_assets where userid = 319931 returning *
)
insert into deleted_assets select * from deleted_rows;

Queries and transaction blocks involving multiple tables (Iceberg or heap) always preserve PostgreSQL's ACID properties.

An update/delete command locks the table, such that only one update/delete can run at a time.

Some related modification queries are not supported on Iceberg tables, namely:

  • Queries with system columns (ctid)
  • SELECT ... FOR UPDATE
  • MERGE
  • INSERT ... ON CONFLICT

Altering an Iceberg table

You can change the schema of your Iceberg tables via the ALTER TABLE command. You can add, change, rename, and drop columns. Several other commands related to triggers and ownership are also supported.

-- Add a column to an Iceberg table
alter table measurements add column measurement_tim timestamptz;

-- Set the default for new values
alter table measurements alter column measurement_tim set default now();

-- Rename a column
alter table measurements rename column measurement_tim to measurement_time;

-- Drop a column
alter table measurements drop column measurement_time;

-- Change owner to another postgres user
alter table measurements owner to oceanographer;

-- Set schema
create schema ocean;
alter table measurements set schema ocean;

When adding a column that is meant to have a default value, there is a difference between specifying it in ADD COLUMN ... DEFAULT ... or setting on an existing column using ALTER COLUMN ... SET DEFAULT .... When adding a column with a default, all existing rows are implicitly assigned the default value. Currently, you can only use a constant, which does not require rewriting the table. However, you can still set the default for new rows to an expression.

-- allowed: add a column with all existing rows being assigned a constant
alter table measurements add column last_update_time timestamptz default '2024-01-01 00:00:00';

-- disallowed: add a column with all existing rows being assigned an expression
alter table measurements add column last_update_time timestamptz default now();
ERROR:  ALTER TABLE ADD COLUMN with default expression command not supported for crunchy_iceberg tables

-- allowed: set the default expression of a column
alter table measurements alter column last_update_time set default now();

It is worth noting that psql does not perform auto-completion for Iceberg tables using the above syntax because they are recognized as foreign tables. You can use ALTER FOREIGN TABLE if you need auto-completion.

There are a few additional limitations regarding ALTER TABLE compared to regular PostgreSQL that will be resolved over time:

  • Adding a column with a generated value, (big)serial pseudotype, or constraint
  • Changing the type of a column (ALTER COLUMN ... SET TYPE ...)
  • Adding or validating constraints
  • Adding a column with an unsupported types (nested geometry, tables as types, numeric with precision >38)

Some advanced table management features such as partitioning and inheritance are supported. Do be careful with those features, since the PostgreSQL planner does not always know how to efficiently perform aggregations across Iceberg (read: foreign) partitions.

Vacuuming an Iceberg table

Each insert/update/delete operation on an Iceberg table results in additional data files being written. Quite often, this can result in the table being made up of many small files. The solution to that is to vacuum the table, similar to (auto)vacuum for regular heap tables. VACUUM on Iceberg tables does the following tasks:

  • Data compaction: When you INSERT rows into your Iceberg tables, the data is stored as Parquet files. Iceberg data compaction merges small data files into larger ones to improve read performance and reduce metadata overhead.
  • Managing expired metadata and data files: Modifying Iceberg tables generates new data and metadata files, leaving some older files unused and inaccessible. The VACUUM operation ensures these obsolete files are cleaned up, maintaining storage efficiency and preventing unnecessary accumulation of unused files.

VACUUM is a very critical component of Iceberg tables. Unfortunately, autovacuum doesn't pick-up foreign tables. The following syntax will run VACUUM on all Iceberg tables:

VACUUM (ICEBERG);

The above will find all the Iceberg tables and run VACUUM on them one by one.

Automating VACUUM with pg_cron

We suggest that users regularly VACUUM iceberg tables, taking advantage of pg_cron to schedule it to run at regular intervals:

SELECT cron.schedule('iceberg_vacuum_job', '*/30 * * * *', $$VACUUM (ICEBERG);$$);

The above example schedules VACUUM to be run every 30 minutes. As with heap tables, the frequency of VACUUM depends a lot on the use case/pattern of use. If the tables are not frequently modified, every 30 minutes could be a good default. If the tables are frequently modified, we suggest scheduling VACUUM to run more frequently, up to every five minutes.

Iceberg interoperability

Crunchy Data Warehouse acts as its own Iceberg catalog. When updating Iceberg tables, it also updates its catalog tables as part of the ongoing transaction. Once created, your Iceberg table will appear in the iceberg_tables view, which can be used by third-party tools such as Spark, pyiceberg, or iceberg-rust. These tools can discover and access your warehouse tables via their SQL/JDBC catalog implementations.

Accessing Iceberg tables through Crunchy Data Warehouse

You can use Spark to access the Iceberg tables created on Crunchy Data Warehouse. The following commands will provide access to the Iceberg tables in Crunchy Data Warehouse, or use the connection information in the dashboard. Note that the database name and catalog is postgres.

Here's an example of the Crunchy Data Warehouse connection information needed:

export PGHOST="db host"
export PGDATABASE="db name"
export PGUSER="user name"
export PGPASSWORD="your password"
export AWS_REGION="cdw instance region"
export JDBC_CONN_STR="jdbc:postgresql://${PGHOST}/${PGDATABASE}?user=${PGUSER}&password=${PGPASSWORD}"

This is an example of Spark connection configurations:

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1 \
--conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.postgres.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \
--conf spark.sql.catalog.postgres.uri=$JDBC_CONN_STR \
--conf spark.sql.catalog.postgres.warehouse=s3:// \
--conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.postgres.s3.endpoint=https://s3.${AWS_REGION}.amazonaws.com \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog

For example, a sample table created on Crunchy Data Warehouse:

CREATE TABLE public.crunchy_dw_iceberg_table
USING iceberg
AS SELECT id
FROM generate_series(0,1000) id;

Once connected to Spark, you can query the table directly via Spark-sql:

spark-sql (default)> SELECT avg(id) FROM postgres.public.crunchy_dw_iceberg_table;
500.0
Time taken: 7.444 seconds, Fetched 1 row(s)

Dropping an Iceberg table

You can drop Iceberg tables using the regular DROP TABLE command:

drop table measurements;

When you drop an Iceberg table, the files that make up the Iceberg table will be added to the “deletion queue”, but are not immediately deleted. This allows for restoring a backup if you need to revert to a previous point in time (see Point-in-time recoveries). When you run VACUUM (iceberg);, any files that have been in the deletion queue for more than 10 days will be deleted.

Backups

When an Iceberg table is modified, the files that make up the previous version of the Iceberg table remain in object storage until they are explicitly deleted (by VACUUM). Our default policy is to keep files for at least 10 days to allow past versions to be restored.

Crunchy Data Warehouse does not support table-level restores yet, but you can simulate it by creating an “external Iceberg table” from an old (dereferenced) metadata.json file.

do $$
begin
  execute format('create foreign table iceberg_old () server crunchy_lake_analytics options (path %L)',  (
    select path
    from crunchy_query_engine.deletion_queue
    where table_name = 'iceberg'::regclass and orphaned_at < now() - interval '3 days' and path like '%.metadata.json'
    order by orphaned_at desc limit 1
  ));
end;
$$;

Crunchy Bridge also provides a server-level restore experience that restores your heap and Iceberg tables to the same (transactionally-consistent) point on a new server.

When the new server is restored, the Iceberg catalog tables will be at a past state, pointing to older metadata.json files. Since the metadata.json file and the files it references are kept for at least 10 days, you can still query the old version of the table. However, the original cluster might still be around and the Iceberg tables are stored in the managed storage belonging to that cluster, hence the new cluster cannot manage or make modifications to the Iceberg tables.

When you fork or restore a Crunchy Data Warehouse cluster, the Iceberg tables will be in read-only mode. When the referenced files are more than 10 days old, they may be deleted by a VACUUM on the parent. It's therefore recommended to copy data into new tables if you want to continue using the restored cluster.