Вторым интересным аспектом является способ организации записи, где важным элементом является упомянутая выше спецификация партицирования. Опять-таки в рамках данной статьи рассмотрим только интересующие нас вопросы записи строк в партицированные Iceberg-таблицы, которую должен поддерживать двунаправленный коннектор.
Представим, что нужная нам таблица, в которую нужно вставить новые записи, имеет следующую схему (в синтаксисе Apache Impala):
[hdfs-node1:21050] ggdb> CREATE TABLE ggdb.events_ice (event_id INT, event_ts TIMESTAMP, event_details STRING, subsystem_id INT)
PARTITIONED BY SPEC (MONTH(event_ts), IDENTITY(subsystem_id))
STORED AS ICEBERG;
Как мы видим, в строке 2 объявлена следующая схема партицирования: сначала идет трансформация MONTH, которая, как нетрудно догадаться, извлекает порядковый номер месяца из поля event_ts. Следом идет партицирование по некоторому целочисленному идентификатору (в данном случае, не по "хешу от", а по значению).
Если посмотреть на структуру каталогов, то мы увидим следующее дерево для некоторого множества строк в таблице, распределенных по двум уровням:
|---events_ice
|-----data
|-------event_ts_month=2025-09
|---------subsystem_id=110
|-----------bd4e94ee19f89fe5-ddfbdc8700000000_782720467_data.0.parq
|-------event_ts_month=2025-10
|---------subsystem_id=101
|-----------d640261450465339-6fc6d78b00000000_1549314526_data.0.parq
|-------event_ts_month=2025-11
|---------subsystem_id=100
|-----------a1485ebd281a0665-b003f13700000000_458826306_data.0.parq
Схему партицирования можно изменить. Например, мы поняли, что с ростом числа событий гранулярность с точностью до месяца нас перестала устраивать и мы решили уточнить ее до календарного дня. Обновим параметры партицирования и рассмотрим, что же произошло с данными:
[hdfs-node1:21050] ggdb> ALTER TABLE ggdb.events_ice SET PARTITION SPEC (DAY(event_ts), IDENTITY(subsystem_id));
Query: ALTER TABLE events_ice SET PARTITION SPEC (DAY(event_ts), IDENTITY(subsystem_id))
+
| summary |
+
| Updated partition spec. |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Начиная с даты 2026-01-20 схема каталогов была перестроена, и на первый уровень партицирования Iceberg вынес поле subsystem_id, а из поля event_ts начал извлекаться календарный день.
Из этого следует первый очевидный вывод: с точки зрения записи, вопросы организации структуры физического хранения проще делегировать Iceberg-реализации (хотя всегда остается вариант записи напрямую в Parquet-файлы с последующим append этих файлов, но при этом много чего придется "изобрести" заново).
Какие средства нам предлагает базовая библиотека?
Библиотека реализует 4 стратегии записи, представленные следующими классами:
-
FanoutWriter;
-
PartitionedFanoutWriter;
-
ClusteredDataWriter;
-
PartitionedWriter.
Данная стратегия записи подразумевает наличие нескольких классов-писателей для каждой спецификации партицирования (PartitionSpec). Поддерживается сценарии записи в таблицы с несколькими спецификациями. Например, в случае если пользователь со временем определил несколько схем, то это называется эволюцией спецификации партицирования.
Для каждой комбинации спецификаций при выполнении записи создается свой экземпляр класса. Запись распараллеливается.
Как можно предположить, в случае разнородных данных (строки попадают в разные партиции) данная стратегия характеризуется большими накладными расходами, так как для каждого экземпляра класса требуется своя часть ресурсов (например, память под буферы и т.п.).
С другой стороны, для данной стратегии отпадает необходимость предагрегации данных, строки на вход можно подать любые — FanoutWriter самостоятельно "раскидает" их по нужным партициям.
Стратегия PartitionedFanoutWriter
Эта стратегия во многом пересекается с FanoutWriter-стратегией, но в отличие от последней не поддерживает схему с эволюцией партиций. Поддерживается только некоторая фиксированная спецификация с перенаправлением в экземпляр класса-писателя на базе некоторого ключа, представленного экземпляром класса PartitionKey.
Стратегия ClusteredDataWriter
ClusteredWriter предполагает, что строки сгруппированы согласно спецификации партицирования, однако поддерживает множественные спецификации. В противном случае (если строки не сгруппированы согласно спецификации) вставка завершается ошибкой.
Данная стратегия подходит для сгруппированных для вставки данных, при этом для множественных спецификаций вставка записей "вразнобой" не поддерживается — то есть строки должны быть сгруппированы по ключу(ам) в рамках каждой спецификации и идти последовательно. Характеризуется экономным потреблением ресурсов и высокой пропускной способностью.
Стратегия PartitionedWriter
Предусловием использования стратегии PartitionedWriter является сгруппированные согласно одной спецификации партицирования данные, что и отличает ее от ClusteredDataWriter.
От стратегий реализации записи согласно спецификациям партицирования вернемся к структуре каталогов и, точнее, к параметрам трансформаций.
Второй вывод базируется на том, что на данный момент в Greengage нет возможности смоделировать трансформации, которые пользователи могут задействовать в Iceberg-таблицах.
Вывод такой: в общем случае, с точки зрения подготовки данных для загрузки, Greengage может частично облегчить жизнь классам, отвечающим за запись в Iceberg-таблицы, но полностью подготовить данные, которые бы "легли" в нужную партицию без каких-либо ухищрений (создание колонок, моделирующих трансформации) пока выглядит нереализуемым.
И вот тут нам в помощь выступает та самая фича распределения вставки foreign-таблиц. Redistribute Motion может в случае трансформаций наподобие IDENTITY сгруппировать по нужному ключу (по сути, по значению в колонке), тем самым помочь, например, классу PartitionedFanoutWriter тем, что ему на вход придут уже сгруппированные данные. На выходе есть возможность получить ощутимо меньшее число файлов.
Проиллюстрируем это на примере, но перед этим создадим таблицу только с IDENTITY-трансформацией:
[hdfs-node1:21050] ggdb> CREATE TABLE events_ice (event_id INT, event_ts TIMESTAMP, event_details STRING, subsystem_id INT)
PARTITIONED BY SPEC (IDENTITY(subsystem_id))
STORED AS ICEBERG;
Сначала объявим таблицы на стороне Greengage без учета спецификации партицирования на стороне Iceberg:
postgres=# CREATE FOREIGN TABLE events_ice_ft(event_id INT, event_ts TIMESTAMP WITH TIME ZONE, event_details TEXT, subsystem_id INT)
SERVER iceberg_server OPTIONS (
catalog_impl 'org.apache.iceberg.hive.HiveCatalog',
catalog_uri 'thrift://hdfs-node1:9083',
warehouse_location '/test-warehouse',
resource 'ggdb.events_ice'
);
postgres=# CREATE TABLE events_ice_local (event_id INT, event_ts TIMESTAMP WITH TIME ZONE, event_details TEXT, subsystem_id INT) DISTRIBUTED BY (event_id);
CREATE TABLE
postgres=# EXPLAIN (COSTS OFF) INSERT INTO events_ice_ft SELECT * FROM events_ice_local;
QUERY PLAN
Insert on events_ice_ft
-> Seq Scan on events_ice_local
Optimizer: Postgres-based planner
(3 rows)
Если посмотреть на план (строки, отмеченные цифрами 1 и 2), то можно увидеть вставку с каждого сегмента напрямую, но вставка не будет учитывать ожидаемое распределение на стороне Iceberg.
Сделаем вставку 1 миллиона строк из 8 сегментов, где домен атрибута subsystem_id состоит из 4 уникальных значений. Какая картина будет на HDFS после вставки?
|---events_ice
|-----data
|-------subsystem_id=1
|---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00001.parquet
|---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00001.parquet
|---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00001.parquet
|---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00001.parquet
|---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00001.parquet
|---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00001.parquet
|---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00001.parquet
|---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00001.parquet
|-------subsystem_id=2
|---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00002.parquet
|---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00002.parquet
|---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00002.parquet
|---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00002.parquet
|---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00002.parquet
|---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00002.parquet
|---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00002.parquet
|---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00002.parquet
|-------subsystem_id=3
|---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00003.parquet
|---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00003.parquet
|---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00003.parquet
|---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00003.parquet
|---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00003.parquet
|---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00003.parquet
|---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00003.parquet
|---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00003.parquet
|-------subsystem_id=4
|---------00000-1-cb022581-f5f5-461f-b766-8f62e75224e7-00004.parquet
|---------00001-1-94117a7e-854a-401c-baa9-0dd17a4f7a89-00004.parquet
|---------00002-1-8647f759-d531-44d6-b937-056d6e482200-00004.parquet
|---------00003-1-f6401bed-7246-4f64-99ea-56afd6551f13-00004.parquet
|---------00004-1-1740b692-39f0-4c29-bf17-34bbec3b0a92-00004.parquet
|---------00005-1-f99657e1-165f-4eb1-8e9d-e8e3a67db0c1-00004.parquet
|---------00006-1-50444f2c-b7ef-4f38-ac30-c8ad16c36939-00004.parquet
|---------00007-1-86be9734-dd3e-4ade-9c59-0214eff6c549-00004.parquet
Для каждого Parquet-файла пятизначный префикс от 00000 до 00007 означает номер сегмента (поле content), который осуществлял вставку в файл. Можно увидеть, что для каждой партиции записью "отметились" все 8 сегментов.
Очистим таблицу-приемник, поменяем настройки перераспределения для foreign-таблицы и повторим вставку:
postgres=# ALTER FOREIGN TABLE events_ice_ft ALTER COLUMN subsystem_id OPTIONS (ADD insert_dist_by_key 'true');
ALTER FOREIGN TABLE
postgres=# EXPLAIN (COSTS OFF) INSERT INTO events_ice_ft SELECT * FROM events_ice_local;
QUERY PLAN
Insert on events_ice_ft
-> Redistribute Motion 8:8 (slice1; segments: 8)
Hash Key: events_ice_local.subsystem_id
-> Seq Scan on events_ice_local
Optimizer: Postgres-based planner
(5 rows)
Как поменялась ситуация на HDFS:
|---events_ice
|-----data
|-------subsystem_id=1
|---------00004-1-58954502-1c91-457a-b7bb-2c945e8229c1-00001.parquet
|-------subsystem_id=2
|---------00003-1-bf3cf4e5-7507-4f3e-ac0e-52f9932ba495-00001.parquet
|-------subsystem_id=3
|---------00007-1-5c2ba531-77a9-4240-b639-37522eaefb37-00001.parquet
|-------subsystem_id=4
|---------00000-1-dd1445b5-d5d8-4d04-9248-ff70cfa490fb-00001.parquet
В случае выборки данных, при прочих равных, придется читать меньшее количество файлов, загрузка выполняется быстрее. Да, есть некоторые накладные расходы на узел перераспределения Redistribute Motion, но, похоже, выгода от предварительного агрегирования перевешивает потери на перераспределении.
В дальнейшем, по мере прогресса разработки коннектора и его нагрузочного тестирования, мы поделимся с вами результатами сравнительных тестов.
Однако открытым вопросом остаются "вычислительные" трансформации Iceberg-спецификаций партицирования. Возможным вариантом таких трансформаций является расширение списка опций foreign-таблиц функциями трансформации.
Последним, крайне важным вопросом реализации вставки я бы назвал вопрос организации запроса на commit и фактической фиксации вставки.
Как мы помним, каждая вставка порождает новый файл с данными, новый снимок, новые файлы метаданных. Например, три операции вставки дадут нам следующий набор сущностей.
Результат операции вставки
Если представить, что сегментов в кластере сотни, как это бывает в больших кластерах, то число снимков во-первых, будет зависеть от числа сегментов (при условии, что каждому достались его строки для вставки). Чем больше таких вставок, тем больше будет снимков в списке в файле метаданных, это приведет к разрастанию метаданных, что в будущем может создать проблемы.
Например, для запроса из примера без использования промежуточной группировки по полю events_ice_local.subsystem_id при вставке будет создано 8 новых снимков (изначально таблица была пуста):
[hdfs-node1:21050] ggdb> SELECT snapshot_id, committed_at, operation, parent_id FROM events_ice.snapshots;
+
| snapshot_id | committed_at | operation | parent_id |
+
| 6936299145864004987 | 2025-11-22 16:29:58.197000000 | append | NULL |
| 4635881949092382796 | 2025-11-22 16:29:58.788000000 | append | 6936299145864004987 |
| 6620167082491361265 | 2025-11-22 16:29:59.370000000 | append | 4635881949092382796 |
| 9127144169729220665 | 2025-11-22 16:29:59.793000000 | append | 6620167082491361265 |
| 2335157739194732622 | 2025-11-22 16:30:00.214000000 | append | 9127144169729220665 |
| 7107683723762847370 | 2025-11-22 16:30:00.554000000 | append | 2335157739194732622 |
| 8876670218399049190 | 2025-11-22 16:30:00.933000000 | append | 7107683723762847370 |
| 7213472037626658727 | 2025-11-22 16:30:01.198000000 | append | 8876670218399049190 |
+
Во-вторых, неприятным моментом является видимость отдельных снимков по мере того, как какие-то сегменты завершили вставку, а какие-то еще в процессе. Да, сама архитектура Iceberg в плане организации работы через снимки подразумевает возможность видеть разные срезы, но в данном случае подобная частичная вставка вместе с созданием снимков под каждую загрузку может быть не совсем тем, что ожидает пользователь.
Добавим искусственную задержку для одного из сегментов при вставке. Остальные семь сегментов закончат вставку раньше. Однако в какой-то момент мы может увидеть не 1 миллион строк, а меньшее количество:
[hdfs-node1:21050] ggdb> SELECT snapshot_id, committed_at, operation, parent_id FROM ggdb.events_ice.snapshots;
+
| snapshot_id | committed_at | operation | parent_id |
+
| 7768755069134084319 | 2025-11-22 16:40:05.673000000 | append | NULL |
| 1280842592617033420 | 2025-11-22 16:40:06.563000000 | append | 7768755069134084319 |
| 3036780713488633888 | 2025-11-22 16:40:07.693000000 | append | 1280842592617033420 |
| 1477877215760239613 | 2025-11-22 16:40:07.944000000 | append | 3036780713488633888 |
| 1377726043692182832 | 2025-11-22 16:40:08.324000000 | append | 1477877215760239613 |
| 8464938146211865619 | 2025-11-22 16:40:08.596000000 | append | 1377726043692182832 |
| 662259372451030286 | 2025-11-22 16:40:09.017000000 | append | 8464938146211865619 |
+
Fetched 7 row(s) in 0.15s
[hdfs-node1:21050] ggdb> SELECT COUNT(*) FROM ggdb.events_ice;
+
| count(*) |
+
| 874576 |
+
Fetched 1 row(s) in 0.11s
Коннектор, в целях поддержания свойства атомарности вставки, должен решать и эту проблему. И не исключено, что не без помощи самого PXF…
На этом на сегодня предлагаю завершить обзор того, что уже вышло в свежем релизе Greengage и находится в процессе активной разработки для выхода в одном из следующих релизов.
Оставайтесь с нами, будет интересно.