Iceberg tables
Iceberg tables are transactional, columnar tables stored in object storage, optimized for fast analytics. By default, they are stored in the managed object storage that comes with the cluster. They can also be hosted in a user-defined location as long as it is in the same region as the cluster.
Creating an Iceberg table
Iceberg tables can be created via the regular CREATE TABLE
syntax by appending
using iceberg
(or its alias using crunchy_iceberg
).
For instance, here’s how to create an Iceberg table and insert a row:
-- create a new Iceberg table in managed storage
create table measurements (
station_name text not null,
measurement double precision not null
)
using iceberg;
insert into measurements values ('Istanbul', 18.5);
You can also create an Iceberg table in your own S3 bucket, as long as it is in the same region as your Crunchy Data Warehouse cluster. Note that you must have set your credentials to be able to access your private bucket.
-- create a table in your own S3 bucket (must be in same region)
create table measurements (
station_name text not null,
measurement double precision not null
)
using iceberg with (location = 's3://mybucket/measurements/');
-- or, configure the default_location_prefix for the session or the user (requires superuser)
set crunchy_iceberg.default_location_prefix to 's3://mybucket/iceberg';
create table measurements (
station_name text not null,
measurement double precision not null
)
using iceberg;
Creating an Iceberg table from a query result is also supported:
-- create a table in a specific bucket from a query result
create table measurements
using iceberg with (location = 's3://mybucket/measurements/');
as select md5(s::text), s from generate_series(1,1000000) s;
Finally, you can create an Iceberg table from a file using the load_from
or
definition_from
option:
-- Convert a file directly into an Iceberg table
create table taxi_yellow ()
using iceberg
with (load_from = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet');
-- or, inherit the columns from a file, but do not load any data (yet)
create table taxi_yellow ()
using iceberg
with (definition_from = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet');
Iceberg tables support the following options when creating the table:
Option | Description |
---|---|
location | URL prefix for the Iceberg table (e.g. 's3://mybucket/measurements' ) |
Additionally, when creating the Iceberg table from a file, the following options are supported along with the format-specific options listed in the Data lake formats section:
Option | Description |
---|---|
format | The format of the file to load and/or infer columns from |
definition_from | URL of a file to infer the columns from |
load_from | URL of a file to infer the columns from and immediately load into the table |
When creating an Iceberg table, you can use various advanced PostgreSQL features, including:
- Serial types
- Identity columns
- Check and not null constraints
- Generated columns
- Custom composite types
- Custom functions in expressions
- Triggers
- Partitioning (not recommended)
- Inheritance (not recommended)
- Collations (not recommended)
- PostGIS geometry type
Consult the PostgreSQL documentation on how to use each feature.
There are a few limitations to be aware of:
- Collations, partitioning, and inheritance work as expected, but may result in suboptimal query plans.
- Numeric types that do not specify a precision and scale are treated as numeric(38,16).
- Numeric values cannot be NaN or infinite.
- Some types are not yet supported as column types in Iceberg tables:
- Numeric with precision > 38
- Intervals
- Other tables used as column types.
- Not all geometry types are supported in Iceberg tables (only point, linestring, polygon, multipoint, multilinestring, multipolygon, geometrycollection), and geometry types cannot be nested in an array or composite type.
- Custom base types other than geometry are stored using their text representation, which may lead to suboptimal performance.
Loading data into an Iceberg table
There are a few different ways of loading data into an existing Iceberg table:
COPY ... FROM '<url>'
- Load data from S3 or a http(s) URLCOPY ... FROM STDIN
- Load data from the client (\copy
in psql)INSERT INTO ... SELECT
- Load data from a query resultINSERT INTO ... VALUES
- Insert individual row values
It is recommended to load data in batches. Each statement creates one or more new Parquet files and adds them to the Iceberg table. Doing single row inserts will result in many small Parquet files, which can slow down queries.
If your applications needs to do single row inserts, it is recommended to create a staging table and periodically flush new batches into the Iceberg table:
-- create a staging table
create table measurements_staging (like measurements);
-- do fast inserts on a staging table
insert into measurements_staging values ('Haarlem', 9.3);
-- periodically move all the rows from a queue table into Iceberg using pg_cron
select cron.schedule('flush-queue', '* * * * *', $$
with new_rows as (
delete from measurements_staging returning *
)
insert into measurements select * from new_rows;
$$);
Whether or not you use batches or a staging table, it is important to regularly vacuum your table to optimize the file sizes.
Query pushdown with Iceberg tables
Crunchy Data Warehouse extends PostgreSQL with a vectorized query engine designed to accelerate analytics tasks. Vectorized execution improves efficiency by processing data in batches, improving computational throughput. Performance is improved by pushing down query computation to this engine when possible.
How query pushdown works
When computations are pushed down, they are processed directly within the vectorized query engine. However, if certain computations cannot be handled by the vectorized engine, they are executed normally in PostgreSQL instead.
Monitoring query pushdown
To monitor which parts of your query are pushed down, you can use
the EXPLAIN
command with the VERBOSE
option. This command shows how
the query is executed, showing whether elements are processed by the vectorized
engine or on the Postgres server. Look for Vectorized SQL
in the output to
see what is executed by the vectorized engine. If you
see Custom Scan (Query Pushdown)
in the output, you are all set, the
computation is delegated to the vectorized engine.
Full pushdown example
Here is an example where the entire computation is pushed down:
EXPLAIN (VERBOSE) SELECT inv_warehouse_sk, count(*) FROM inventory GROUP BY inv_warehouse_sk ORDER BY count(*) DESC;
QUERY PLAN
-------------------------------------------------------------------------------------------------------
Custom Scan (Query Pushdown) (cost=0.00..0.00 rows=0 width=0)
Output: pushdown_query.inv_warehouse_sk, pushdown_query.count
Engine: DuckDB
Vectorized SQL: SELECT inv_warehouse_sk,
count(*) AS count
FROM public.inventory inventory(inv_date_sk, inv_item_sk, inv_warehouse_sk, inv_quantity_on_hand)
GROUP BY inv_warehouse_sk
ORDER BY (count(*)) DESC
-> PROJECTION
Estimated Cardinality: 66555000
-> ORDER_BY
Order By: count_star() DESC
-> PROJECTION
Estimated Cardinality: 66555000
-> PROJECTION
Estimated Cardinality: 66555000
-> PERFECT_HASH_GROUP_BY
Groups: #0
Aggregates: count_star()
-> PROJECTION
Projections: inv_warehouse_sk
Estimated Cardinality: 133110000
-> PROJECTION
Projections: __internal_compress_integral_utinyint(#0, 1)
Estimated Cardinality: 133110000
-> READ_PARQUET
Function: READ_PARQUET
Projections: inv_warehouse_sk
Estimated Cardinality: 133110000
Query Identifier: -4465180302191104329
(30 rows)
Partial pushdown example
In this example not all computations are pushed down:
EXPLAIN (VERBOSE) SELECT count(*) FROM inventory WHERE width_bucket(inv_item_sk, 0.024, 10.06, 5) > 19 ;
QUERY PLAN
------------------------------------------------------------------------------------------------------
Aggregate (cost=128.33..128.34 rows=1 width=8)
Output: count(*)
-> Foreign Scan on public.inventory (cost=100.00..127.50 rows=333 width=0)
Output: inv_date_sk, inv_item_sk, inv_warehouse_sk, inv_quantity_on_hand
Filter: (width_bucket((inventory.inv_item_sk)::numeric, 0.024, 10.06, 5) > 19)
Engine: DuckDB
Vectorized SQL: SELECT inv_item_sk
FROM public.inventory inventory(inv_date_sk, inv_item_sk, inv_warehouse_sk, inv_quantity_on_hand)
-> READ_PARQUET
Function: READ_PARQUET
Projections: inv_item_sk
Estimated Cardinality: 133110000
In the example above, the width_bucket
function is not yet supported by the
vectorized engine, hence it is executed on the Postgres server.
Crunchy Data Warehouse currently enables vectorized query execution for nearly
all SQL, with a few exceptions. One gap in pushing computations down to the
vectorized engine involves certain functions and operators, such as
the width_bucket
function.
Future versions of Data Warehouse aim to support more SQL functions and operators within the vectorized engine. Reach out to Support if there are specific functions or operators you need for your use case.
Making Iceberg the default table format
It is possible to make all create table
statements automatically use Iceberg:
set default_table_access_method to 'iceberg';
-- automatically created as Iceberg
create table users (userid bigint, username text, email text);
It can be useful to assign default_table_access_method
to a specific database
user for tools that are unaware of Iceberg (see below). However, some PostgreSQL
features such as temporary and unlogged tables stop working unless you
explicitly add using heap
.
Copy external PostgreSQL tables to Iceberg
A big advantage of being able to change the default_table_access_method
to
Iceberg is that it can significantly simplify data migrations.
For instance, you can connect to your Crunchy Data Warehouse cluster as the
postgres superuser and
(temporarily) make the following configuration change to a regular user to make
all create table
statements use Iceberg by default:
alter user application set default_table_access_method to 'iceberg';
You can then replicate a PostgreSQL table from a regular database cluster on
Crunchy Bridge or a third party service (e.g. Amazon RDS, Azure Database for
PostgreSQL, Google Cloud SQL) into Iceberg using pg_dump
and psql
:
pg_dump --table=my_table --section=pre-data --section=data --no-table-access-method --no-owner --clean \
| psql postgres://[email protected]:5432/postgres
Several important pg_dump
options are shown in the above command:
--section=pre-data
and--section=data
are required to ensure that we get thecreate table
statement and acopy
statement with data, but not other objects like indexes, which are not supported on Iceberg.--no-table-access-method
is required to prevent pg_dump from setting the default_table_access_method to the original value (usually heap), which would override the user-level setting (Iceberg).--no-owner
is not required, but useful if database users and access controls do not match between source and destination.--clean
is needed only if you want to replace the original table.
With these options, you can replicate from any heap table on any PostgreSQL server to Iceberg managed by Crunchy Data Warehouse in a single step.
Viewing the Iceberg catalog
After creating a table, the Iceberg table appears in the iceberg_tables
view:
-- query the main Iceberg catalog table
select catalog_name, table_namespace, table_name, metadata_location from iceberg_tables;
┌──────────────┬─────────────────┬──────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ catalog_name │ table_namespace │ table_name │ metadata_location │
├──────────────┼─────────────────┼──────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ postgres │ public │ measurements │ s3://marco-crunchy-data/iceberg/postgres/public/measurements/metadata/00003-6403833e-0766-4496-ad47-ec9641ee965f.metadata.json │
└──────────────┴─────────────────┴──────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
The schema of this view matches the expectations of the Iceberg catalog JDBC driver, and the SQL catalog drivers of pyiceberg and iceberg-rust. That means that Spark and other Iceberg tools can directly access the latest version of your Iceberg tables by connecting to Crunchy Data Warehouse as a catalog.
For Iceberg tables created via PostgreSQL, the catalog_name
value will always
match the database name. If the database is renamed, the catalog_name
will
change as well. External drivers cannot yet write to Iceberg tables created via
PostgreSQL. They can create Iceberg tables with other catalog names, but those
do not have a corresponding PostgreSQL table in the database.
Inspecting an Iceberg table
Iceberg tables are internally represented by foreign tables, but they can mostly be used as though they were regular tables.
If you show all the tables in psql
using \d+
, the Iceberg table will show
up with Type “foreign table”. It will also show the total size of the data files
that are currently part of the table.
postgres=> \d+
List of relations
┌────────┬─────────────────────────┬───────────────┬───────────────────┬─────────────┬───────────────┬────────────┬─────────────┐
│ Schema │ Name │ Type │ Owner │ Persistence │ Access method │ Size │ Description │
├────────┼─────────────────────────┼───────────────┼───────────────────┼─────────────┼───────────────┼────────────┼─────────────┤
│ public │ measurements │ foreign table │ application │ permanent │ │ 5238 MB │ │
│ public │ taxi_yellow │ foreign table │ application │ permanent │ │ 92 MB │ │
└────────┴─────────────────────────┴───────────────┴───────────────────┴─────────────┴───────────────┴────────────┴─────────────┘
You can also get the size from the pg_table_size
function,
thoughpg_total_relation_size
is not yet implemented and returns 0.
postgres=> select pg_table_size('measurements');
┌───────────────┐
│ pg_table_size │
├───────────────┤
│ 5492615447 │
└───────────────┘
(1 row)
You can also inspect the columns of the table with \d table_name
when using
psql
.
postgres=> \d measurements
Foreign table "public.measurements"
┌──────────────┬──────────────────┬───────────┬──────────┬─────────┬─────────────┐
│ Column │ Type │ Collation │ Nullable │ Default │ FDW options │
├──────────────┼──────────────────┼───────────┼──────────┼─────────┼─────────────┤
│ station_name │ text │ │ not null │ │ │
│ measurement │ double precision │ │ not null │ │ │
└──────────────┴──────────────────┴───────────┴──────────┴─────────┴─────────────┘
Server: crunchy_iceberg
FDW options: (location 's3://marco-crunchy-data/iceberg/postgres/public/measurements')
To inspect the Iceberg
table metadata, you can use
the crunchy_iceberg.metadata(url text)
function:
-- Returns metadata JSONB according to the Iceberg spec
select crunchy_iceberg.metadata(metadata_location) metadata from iceberg_tables
where table_name = 'measurements';
{ ... }
-- For instance, see the current schema in the Iceberg metadata
with ice as (select crunchy_iceberg.metadata(metadata_location) metadata from iceberg_tables where table_name = 'measurements')
select jsonb_pretty(metadata->'schemas'->(metadata->>'current-schema-id')::int) from ice;
┌─────────────────────────────────────┐
│ jsonb_pretty │
├─────────────────────────────────────┤
│ { ↵│
│ "type": "struct", ↵│
│ "fields": [ ↵│
│ { ↵│
│ "id": 1, ↵│
│ "name": "station_name",↵│
│ "type": "string", ↵│
│ "required": true ↵│
│ }, ↵│
│ { ↵│
│ "id": 2, ↵│
│ "name": "measurement", ↵│
│ "type": "double", ↵│
│ "required": true ↵│
│ } ↵│
│ ], ↵│
│ "schema-id": 2 ↵│
│ } │
└─────────────────────────────────────┘
(1 row)
To see which files are currently part of the table, you can use the
crunchy_iceberg.files(metadata_url text)
function:
-- list all the data files in an Iceberg table
select
manifest_path,
content,
file_path,
file_format,
spec_id,
record_count,
file_size_in_bytes
from
iceberg_tables,
crunchy_iceberg.files(metadata_location) f
where
table_name = 'measurements';
Update/delete on an Iceberg table
Crunchy Data Warehouse can perform advanced update/delete queries in Iceberg tables, including modifying CTEs and updates/deletes with joins or subqueries.
-- delete rows from one table and move them to another table
with deleted_rows as (
delete from user_assets where userid = 319931 returning *
)
insert into deleted_assets select * from deleted_rows;
Queries and transaction blocks involving multiple tables (Iceberg or heap) always preserve PostgreSQL's ACID properties.
An update/delete command locks the table, such that only one update/delete can run at a time.
Some related modification queries are not supported on Iceberg tables, namely:
- Queries with system columns (
ctid
) SELECT ... FOR UPDATE
MERGE
INSERT ... ON CONFLICT
Altering an Iceberg table
You can change the schema of your Iceberg tables via the ALTER TABLE command. You can add, change, rename, and drop columns. Several other commands related to triggers and ownership are also supported.
-- Add a column to an Iceberg table
alter table measurements add column measurement_tim timestamptz;
-- Set the default for new values
alter table measurements alter column measurement_tim set default now();
-- Rename a column
alter table measurements rename column measurement_tim to measurement_time;
-- Drop a column
alter table measurements drop column measurement_time;
-- Change owner to another postgres user
alter table measurements owner to oceanographer;
-- Set schema
create schema ocean;
alter table measurements set schema ocean;
When adding a column that is meant to have a default value, there is a
difference between specifying it in ADD COLUMN ... DEFAULT ...
or setting on
an existing column using ALTER COLUMN ... SET DEFAULT ...
. When adding a
column with a default, all existing rows are implicitly assigned the default
value. Currently, you can only use a constant, which does not require rewriting
the table. However, you can still set the default for new rows to an expression.
-- allowed: add a column with all existing rows being assigned a constant
alter table measurements add column last_update_time timestamptz default '2024-01-01 00:00:00';
-- disallowed: add a column with all existing rows being assigned an expression
alter table measurements add column last_update_time timestamptz default now();
ERROR: ALTER TABLE ADD COLUMN with default expression command not supported for crunchy_iceberg tables
-- allowed: set the default expression of a column
alter table measurements alter column last_update_time set default now();
It is worth noting that psql
does not perform auto-completion for Iceberg
tables using the above syntax because they are recognized as foreign tables. You
can use ALTER FOREIGN TABLE
if you need auto-completion.
There are a few additional limitations regarding ALTER TABLE
compared to
regular PostgreSQL that will be resolved over time:
- Adding a column with a generated value, (big)serial pseudotype, or constraint
- Changing the type of a column (
ALTER COLUMN ... SET TYPE ...
) - Adding or validating constraints
- Adding a column with an unsupported types (nested geometry, tables as types, numeric with precision >38)
Some advanced table management features such as partitioning and inheritance are supported. Do be careful with those features, since the PostgreSQL planner does not always know how to efficiently perform aggregations across Iceberg (read: foreign) partitions.
Vacuuming an Iceberg table
Each insert/update/delete operation on an Iceberg table results in additional data files being written. Quite often, this can result in the table being made up of many small files. The solution to that is to vacuum the table, similar to (auto)vacuum for regular heap tables. VACUUM on Iceberg tables does the following tasks:
- Data compaction: When you INSERT rows into your Iceberg tables, the data is stored as Parquet files. Iceberg data compaction merges small data files into larger ones to improve read performance and reduce metadata overhead.
- Managing expired metadata and data files: Modifying Iceberg tables generates new data and metadata files, leaving some older files unused and inaccessible. The VACUUM operation ensures these obsolete files are cleaned up, maintaining storage efficiency and preventing unnecessary accumulation of unused files.
VACUUM is a very critical component of Iceberg tables. Unfortunately, autovacuum doesn't pick-up foreign tables. The following syntax will run VACUUM on all Iceberg tables:
VACUUM (ICEBERG);
The above will find all the Iceberg tables and run VACUUM on them one by one.
Automating VACUUM with pg_cron
We suggest that users regularly VACUUM iceberg tables, taking advantage of
pg_cron
to schedule it to run at regular intervals:
SELECT cron.schedule('iceberg_vacuum_job', '*/30 * * * *', $$VACUUM (ICEBERG);$$);
The above example schedules VACUUM to be run every 30 minutes. As with heap tables, the frequency of VACUUM depends a lot on the use case/pattern of use. If the tables are not frequently modified, every 30 minutes could be a good default. If the tables are frequently modified, we suggest scheduling VACUUM to run more frequently, up to every five minutes.
Iceberg interoperability
Crunchy Data Warehouse acts as its own Iceberg catalog. When updating Iceberg tables,
it also updates its catalog tables as part of the ongoing transaction. Once
created, your Iceberg table will appear in the iceberg_tables
view, which can
be used by third-party tools such as
Spark,
pyiceberg, or
iceberg-rust. These tools can discover
and access your warehouse tables via their SQL/JDBC catalog implementations.
Accessing Iceberg tables through Crunchy Data Warehouse
You can use Spark to access the Iceberg tables created on Crunchy Data
Warehouse. The following commands will provide access to the Iceberg tables
in Crunchy Data Warehouse, or use the connection information in
the dashboard. Note that the database name and catalog is postgres
.
Here's an example of the Crunchy Data Warehouse connection information needed:
export PGHOST="db host"
export PGDATABASE="db name"
export PGUSER="user name"
export PGPASSWORD="your password"
export AWS_REGION="cdw instance region"
export JDBC_CONN_STR="jdbc:postgresql://${PGHOST}/${PGDATABASE}?user=${PGUSER}&password=${PGPASSWORD}"
This is an example of Spark connection configurations:
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.1 \
--conf spark.sql.catalog.postgres=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.postgres.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \
--conf spark.sql.catalog.postgres.uri=$JDBC_CONN_STR \
--conf spark.sql.catalog.postgres.warehouse=s3:// \
--conf spark.sql.catalog.postgres.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.postgres.s3.endpoint=https://s3.${AWS_REGION}.amazonaws.com \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
For example, a sample table created on Crunchy Data Warehouse:
CREATE TABLE public.crunchy_dw_iceberg_table
USING iceberg
AS SELECT id
FROM generate_series(0,1000) id;
Once connected to Spark, you can query the table directly via Spark-sql:
spark-sql (default)> SELECT avg(id) FROM postgres.public.crunchy_dw_iceberg_table;
500.0
Time taken: 7.444 seconds, Fetched 1 row(s)
Dropping an Iceberg table
You can drop Iceberg tables using the regular DROP TABLE
command:
drop table measurements;
When you drop an Iceberg table, the files that make up the Iceberg table will be
added to the “deletion queue”, but are not immediately deleted. This allows for
restoring a backup if you need to revert to a previous point in time
(see Point-in-time recoveries). When you run
VACUUM (iceberg);
, any files that have been in the deletion queue for more
than 10 days will be deleted.
Backups
When an Iceberg table is modified, the files that make up the previous version of the Iceberg table remain in object storage until they are explicitly deleted (by VACUUM). Our default policy is to keep files for at least 10 days to allow past versions to be restored.
Crunchy Data Warehouse does not support table-level restores yet, but you can simulate it by creating an “external Iceberg table” from an old (dereferenced) metadata.json file.
do $$
begin
execute format('create foreign table iceberg_old () server crunchy_lake_analytics options (path %L)', (
select path
from crunchy_query_engine.deletion_queue
where table_name = 'iceberg'::regclass and orphaned_at < now() - interval '3 days' and path like '%.metadata.json'
order by orphaned_at desc limit 1
));
end;
$$;
Crunchy Bridge also provides a server-level restore experience that restores your heap and Iceberg tables to the same (transactionally-consistent) point on a new server.
When the new server is restored, the Iceberg catalog tables will be at a past state, pointing to older metadata.json files. Since the metadata.json file and the files it references are kept for at least 10 days, you can still query the old version of the table. However, the original cluster might still be around and the Iceberg tables are stored in the managed storage belonging to that cluster, hence the new cluster cannot manage or make modifications to the Iceberg tables.
When you fork or restore a Crunchy Data Warehouse cluster, the Iceberg tables will be in read-only mode. When the referenced files are more than 10 days old, they may be deleted by a VACUUM on the parent. It's therefore recommended to copy data into new tables if you want to continue using the restored cluster.