The second interesting aspect is the method of organizing writes, where an important element is the partition specification mentioned earlier. Again, within this article we will consider only the issues that interest us — writing rows into partitioned Iceberg tables, which must be supported by a bidirectional connector.
Assume that the target table, into which we need to insert new records, has the following schema (in Apache Impala syntax):
[hdfs-node1:21050] ggdb> CREATE TABLE ggdb.events_ice (event_id INT, event_ts TIMESTAMP, event_details STRING, subsystem_id INT)
PARTITIONED BY SPEC (MONTH(event_ts), IDENTITY(subsystem_id))
STORED AS ICEBERG;
As we can see, on line 2 the partitioning schema is as follows: first comes the MONTH transform, which, as you can guess, extracts the month number from event_ts, followed by partitioning by some integer identifier (in this case, not by "hash of", but by the value itself).
If we look at the directory structure, we will see the following tree for some set of rows in the table distributed across two levels:
|---events_ice
|-----data
|-------event_ts_month=2025-09
|---------subsystem_id=110
|-----------bd4e94ee19f89fe5-ddfbdc8700000000_782720467_data.0.parq
|-------event_ts_month=2025-10
|---------subsystem_id=101
|-----------d640261450465339-6fc6d78b00000000_1549314526_data.0.parq
|-------event_ts_month=2025-11
|---------subsystem_id=100
|-----------a1485ebd281a0665-b003f13700000000_458826306_data.0.parq
The partitioning spec can be changed. For example, we realized that the number of events drastically increased, so monthly partitioning no longer satisfies us, and we decided to use a daily parititioning. Let’s update the partitioning spec and examine what happened to the data:
[hdfs-node1:21050] ggdb> ALTER TABLE ggdb.events_ice SET PARTITION SPEC (DAY(event_ts), IDENTITY(subsystem_id));
Query: ALTER TABLE events_ice SET PARTITION SPEC (DAY(event_ts), IDENTITY(subsystem_id))
+
| summary |
+
| Updated partition spec. |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Starting from 2026-01-20 (event_ts_day=2026-01-20), the folder structure changed and Iceberg now placed the subsystem_id field at the top partitioning level, and began extracting a calendar day from event_ts placing Parquet files in a daily partition level.
We can make the first obvious conclusion: from the write perspective, it’s simplier to delegate organizing physical storage structure to the Iceberg implementation (though writing directly to Parquet files is always an option, but much would have to be reinvented).
Which means does the base library offer us?
The library implements four write strategies, represented by the following classes:
-
FanoutWriter;
-
PartitionedFanoutWriter;
-
ClusteredDataWriter;
-
PartitionedWriter.
This write strategy implies the presence of several writer classes for each partition specification (PartitionSpec). Scenarios of writing into tables with multiple specifications are supported. For example, if over time the user defines several schemas, this is called partition spec evolution.
For each combination of specifications a separate instance of the writer class is created during write. The write is parallelized.
As one might expect, in the case of heterogeneous data (rows falling into different partitions), this strategy comes with high overhead, since each instance consumes its own resources (for example, memory for buffers, etc.).
On the other hand, there is no need to pre-aggregate data with this strategy — FanoutWriter will distribute them among the required partitions on its own.
PartitionedFanoutWriter strategy
This strategy overlaps significantly with FanoutWriter, but unlike the latter, it does not support partition spec evolution. Only a fixed specification is supported with redirection into a writer instance based on a key represented by a PartitionKey instance.
ClusteredDataWriter strategy
ClusteredWriter assumes that rows are grouped according to the partition specification, but supports multiple specifications. Otherwise (if rows are not grouped according to the specification), the insert ends with an error.
This strategy is suitable for pre-grouped rows, while for multiple specs, inserting "mixed" data is not supported — rows must be grouped by key(s) within each spec and inserted sequentially. It is characterized by efficient resource usage and high throughput.
PartitionedWriter strategy
A precondition for using PartitionedWriter is that rows are grouped according to a single partition spec, which distinguishes it from ClusteredDataWriter.
Let’s return from write strategies to folders structure and, more precisely, to partition transform(s).
The second conclusion is based on the fact that at the moment, Greengage has no way to model Iceberg transforms as is.
The conclusion is that in general case, Greengage can assist classes, responsible for writing rows into Iceberg tables, but transparently preparing data that would be inserted as is (without re-partitioning) appears currently impossible.
This is where the above-mentioned feature of distributed insert into foreign tables helps us. Redistribute Motion can, in the case of transforms such as IDENTITY, group rows by the needed key (essentially, by column value), thereby helping the PartitionedFanoutWriter class by ensuring that the input rows arrives already grouped. As a result, this can yield a noticeably smaller number of files.
Let’s illustrate this, but first create a table with only the IDENTITY transform:
[hdfs-node1:21050] ggdb> CREATE TABLE events_ice (event_id INT, event_ts TIMESTAMP, event_details STRING, subsystem_id INT)
PARTITIONED BY SPEC (IDENTITY(subsystem_id))
STORED AS ICEBERG;
First, we declare tables on the Greengage side without considering the Iceberg-side partition spec:
postgres=# CREATE FOREIGN TABLE events_ice_ft(event_id INT, event_ts TIMESTAMP WITH TIME ZONE, event_details TEXT, subsystem_id INT)
SERVER iceberg_server OPTIONS (
catalog_impl 'org.apache.iceberg.hive.HiveCatalog',
catalog_uri 'thrift://hdfs-node1:9083',
warehouse_location '/test-warehouse',
resource 'ggdb.events_ice'
);
postgres=# CREATE TABLE events_ice_local (event_id INT, event_ts TIMESTAMP WITH TIME ZONE, event_details TEXT, subsystem_id INT) DISTRIBUTED BY (event_id);
CREATE TABLE
postgres=# EXPLAIN (COSTS OFF) INSERT INTO events_ice_ft SELECT * FROM events_ice_local;
QUERY PLAN
Insert on events_ice_ft
-> Seq Scan on events_ice_local
Optimizer: Postgres-based planner
(3 rows)
If we look at the plan, specifically at lines marked with 1 and 2, we can see insertion from each segment directly, but the insert will not account for the expected distribution on the Iceberg side.
Let’s insert 1 million rows from 8 segments, where the subsystem_id column contains 4 unique values. What do we have in HDFS after the insert?
|---events_ice
|-----data
|-------subsystem_id=1
|---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00001.parquet
|---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00001.parquet
|---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00001.parquet
|---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00001.parquet
|---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00001.parquet
|---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00001.parquet
|---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00001.parquet
|---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00001.parquet
|-------subsystem_id=2
|---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00002.parquet
|---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00002.parquet
|---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00002.parquet
|---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00002.parquet
|---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00002.parquet
|---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00002.parquet
|---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00002.parquet
|---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00002.parquet
|-------subsystem_id=3
|---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00003.parquet
|---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00003.parquet
|---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00003.parquet
|---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00003.parquet
|---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00003.parquet
|---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00003.parquet
|---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00003.parquet
|---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00003.parquet
|-------subsystem_id=4
|---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00004.parquet
|---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00004.parquet
|---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00004.parquet
|---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00004.parquet
|---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00004.parquet
|---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00004.parquet
|---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00004.parquet
|---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00004.parquet
For each Parquet file, the five-digit prefix from 00000 to 00007 indicates the segment number (the content field) that performed the insert. You can see that for each partition all 8 segments performed its write.
Let’s truncate the target table, change the distribution settings for the foreign table and repeat the insert:
postgres=# ALTER FOREIGN TABLE events_ice_ft ALTER COLUMN subsystem_id OPTIONS (ADD insert_dist_by_key 'true');
ALTER FOREIGN TABLE
postgres=# EXPLAIN (COSTS OFF) INSERT INTO events_ice_ft SELECT * FROM events_ice_local;
QUERY PLAN
Insert on events_ice_ft
-> Redistribute Motion 8:8 (slice1; segments: 8)
Hash Key: events_ice_local.subsystem_id
-> Seq Scan on events_ice_local
Optimizer: Postgres-based planner
(5 rows)
How did the situation on HDFS change:
|---events_ice
|-----data
|-------subsystem_id=1
|---------00004-1-58954502-1c91-457a-b7bb-2c945e8229c1-00001.parquet
|-------subsystem_id=2
|---------00003-1-bf3cf4e5-7507-4f3e-ac0e-52f9932ba495-00001.parquet
|-------subsystem_id=3
|---------00007-1-5c2ba531-77a9-4240-b639-37522eaefb37-00001.parquet
|-------subsystem_id=4
|---------00000-1-dd1445b5-d5d8-4d04-9248-ff70cfa490fb-00001.parquet
So, in the case of read scenario, fewer files will have to be read, therefore scans in general will perform faster. Yes, there is some overhead on the Redistribute Motion node, but the benefit of pre-aggregation seems to outweigh the redistribution cost.
In the future, as the connector development progresses and load testing is performed, we will share with you the results of comparative tests.
However, "computational" transforms (get year/month/day from timestamp, truncate column value, etc.) of Iceberg partition specifications couldn’t be implemented as is right now. A possible option of such transforms is extending the list of foreign table parameters with transform functions, but it’s an area for future implementation.
One of the important questions for the inserts is how to implement distributed commit of the insert.
As mentioned before, each insert generates a new data file, a new snapshot, new metadata files. For example, three inserts will give us the following set of entities.
Result of the insert operation
If we imagine that the cluster has hundreds of segments, as on large clusters, then the number of snapshots will depend on the number of segments (assuming each got its own rows to insert). The more such inserts, the more snapshots will be in the metadata file’s list, which will lead to metadata bloat and may cause problems later.
For example, for the query without intermediate grouping by events_ice_local.subsystem_id, 8 new snapshots will be created during the insert (initially the table was empty):
[hdfs-node1:21050] ggdb> SELECT snapshot_id, committed_at, operation, parent_id FROM events_ice.snapshots;
+
| snapshot_id | committed_at | operation | parent_id |
+
| 6936299145864004987 | 2025-11-22 16:29:58.197000000 | append | NULL |
| 4635881949092382796 | 2025-11-22 16:29:58.788000000 | append | 6936299145864004987 |
| 6620167082491361265 | 2025-11-22 16:29:59.370000000 | append | 4635881949092382796 |
| 9127144169729220665 | 2025-11-22 16:29:59.793000000 | append | 6620167082491361265 |
| 2335157739194732622 | 2025-11-22 16:30:00.214000000 | append | 9127144169729220665 |
| 7107683723762847370 | 2025-11-22 16:30:00.554000000 | append | 2335157739194732622 |
| 8876670218399049190 | 2025-11-22 16:30:00.933000000 | append | 7107683723762847370 |
| 7213472037626658727 | 2025-11-22 16:30:01.198000000 | append | 8876670218399049190 |
+
Second, an unpleasant moment is visibility of individual snapshots as some segments finish the insert and others are still in progress. The Iceberg architecture, largely based on snapshots and its metadata, implies the ability to see different snapshots (time travel queries as an example), but in this case such partial insert along with snapshot creation for each load may not be exactly what the user expects.
Let’s add an artificial delay for one of the segments during insert. The remaining seven segments will finish earlier. At some point we may see not 1 million rows, but fewer:
[hdfs-node1:21050] ggdb> SELECT snapshot_id, committed_at, operation, parent_id FROM ggdb.events_ice.snapshots;
+
| snapshot_id | committed_at | operation | parent_id |
+
| 7768755069134084319 | 2025-11-22 16:40:05.673000000 | append | NULL |
| 1280842592617033420 | 2025-11-22 16:40:06.563000000 | append | 7768755069134084319 |
| 3036780713488633888 | 2025-11-22 16:40:07.693000000 | append | 1280842592617033420 |
| 1477877215760239613 | 2025-11-22 16:40:07.944000000 | append | 3036780713488633888 |
| 1377726043692182832 | 2025-11-22 16:40:08.324000000 | append | 1477877215760239613 |
| 8464938146211865619 | 2025-11-22 16:40:08.596000000 | append | 1377726043692182832 |
| 662259372451030286 | 2025-11-22 16:40:09.017000000 | append | 8464938146211865619 |
+
Fetched 7 row(s) in 0.15s
[hdfs-node1:21050] ggdb> SELECT COUNT(*) FROM ggdb.events_ice;
+
| count(*) |
+
| 874576 |
+
Fetched 1 row(s) in 0.11s
The connector, in order to preserve atomicity of the insert, must handle this problem as well. And possibly, not without the help from PXF itself…
I propose to conclude today’s overview of what has already been released in the latest Greengage release and what is actively being developed for inclusion in one of the upcoming releases.
Stay tuned, we will be back soon!