Greenplum vs Citus. Часть 1

Андрей Савицкий19 сентября 2024
GREENPLUM Citus
Этой статьей мы открываем цикл материалов, посвященных сравнительному обзору технологий распределенных СУБД. Начнем со сравнения Greenplum с Citus

Этой статьей мы открываем цикл материалов, посвященных сравнительному обзору технологий распределенных СУБД.

Для такого обзора нашей командой был выбран ряд распределенных СУБД, которые будут сравниваться с точки зрения как реализации схожих функциональных блоков с различным уровнем погружения в детали, так и высокоуровневого сопоставления по разным критериям.

Выбор этих СУБД продиктован нашими текущими интересами и потребностями в сравнении. Хотя некоторые из технологий можно отнести, скорее, к распределенным SQL-движкам (например, Apache Impala и Apache Trino).

Так как наша команда разработки занимается Greenplum, то прицел будет на сравнение с данной СУБД. Особенностью этих статей будет сравнительно глубокое погружение в детали тех аспектов, которые будут сравниваться.

Цели цикла статей

  • Дать представление пользователям Greenplum о схожих технологиях. Статья ориентирована на опытных пользователей, так как в пояснениях будут даваться детали по сравниваемым технологиям; подразумевается, что читатель знаком с деталями реализации схожей функциональности в Greenplum.

  • Дать представление, что лежит в основе сравниваемых технологий, какие есть особенности, преимущества и, возможно, недостатки.

Полнота покрытия

  • Будут подсвечены те аспекты, которые заинтересовали меня как автора, обладающего знаниями о тех или иных частях Greenplum и исследующего устройство схожих частей в рассматриваемых технологиях. Я не претендую на звание эксперта в этих технологиях, рассматривайте мои тезисы как результат исследований.

  • Где это возможно, будут даваться сравнительные пояснения, как схожие вещи реализованы в рассматриваемой технологии.

  • Перепечатывать readme из GitHub и прочих источников не имело смысла, но какие-то базовые понятия ввести требовалось. Эти понятия изложены своими словами в соответствии с моим пониманием предмета. За деталями добро пожаловать в специализированные ресурсы.

Цикл статей начнём со сравнения с таким решением, как Citus.

Почему Citus?

  1. Концептуально Citus и Greenplum довольно близки. Да, подход реализации Citus, как расширения PostgreSQL, против "пропатченного" PostgreSQL в Greenplum приносит Citus свои сильные черты, но в то же время и ограничивает его возможности. Об этом мы поговорим далее.

  2. Судя по всему, изначально создатели Citus ориентировались на реализацию задачи горизонтального масштабирования существующих решений у конечных пользователей PostgreSQL: добавь шарды, создай распределенные по кластеру/шардам таблицы, перераспредели данные между ними и получи выгоду без переноса имеющегося решения на какой-либо другой технологический стек. Эти идеи можно даже проследить по видам планировщиков Citus: от сравнительно простых, ориентированных на перенаправление запросов в определенные шарды, до более продвинутых, заточенных под аналитику. Вот как раз более продвинутые планировщики Citus, нацеленные и на аналитическую нагрузку, представляют интерес для сравнения их в общей с Greenplum нише.

  3. Greenplum и Citus довольно просто сравнивать даже исходя из близости понятий: распределенных и справочных (reference в Citus, replicated в Greenplum) таблиц, чтения планов выполнения и, в целом, нюансов работы планировщика (planner/optimizer) и непосредственного выполнения запросов (executor).

Citus с пользовательской точки зрения

Для пользователя из мира PostgreSQL данная технология представляет собой расширение (PostgreSQL extension), устанавливаемое на отдельные экземпляры сервера PostgreSQL. Расширение предоставляет возможности преобразовать отдельные экземпляры PostgreSQL в распределенный кластер, где будут размещаться части данных таблиц (шарды), которые пользователь может распределить по некоторому признаку (ключу шардирования).

Расширение также добавляет колоночный тип хранения в виде отдельного расширения. Работа по управлению всем этим хозяйством осуществляется с помощью доступных пользователям UDF-функций.

Если сравнивать Citus с Greenplum в плане пользовательского восприятия, то некоторые аналогии прослеживаются, но подходы, по сути, совершенно разные.

Citus предоставляет распределенный движок поверх изначально самостоятельных экземпляров PostgreSQL. Как и кто администрирует эти экземпляры, по большому счету вне скоупа интересов и ответственности самого Citus.

Greenplum, аналогично используя отдельные инстансы на сегментах и мастере, всё-таки считает сегментные экземпляры до известной степени закрытыми, но и несёт за них большую ответственность (хотя бы с точки зрения встроенного HA).

В Citus можно подключиться к любой ноде и начать работать с распределёнными таблицами (за счет синхронизации метаданных в рамках 2PC). Исключение составляют операции, которые могут выполняться только с координатора. Это различные манипуляции с таблицами самого Citus, например, создание distributed-таблиц, некоторые операции ALTER, операции со схемами и т.п.

В Greenplum в плане соединений мы ограничены мастером. Возможность подключения, например, в рамках специальной сессии с отдельным сегментом в данном случае не в счёт.

С точки зрения пользователя я бы назвал это сильной чертой Citus. Впрочем, у подобного подхода и его конкретной реализации в Citus есть и обратная сторона, но об этом подробнее дальше.

Несомненное преимущество с точки зрения использования — это доступность версий PostgreSQL для Citus. Ввиду его реализации как PostgreSQL-расширения, ему доступны самые свежие версии ядра. На текущий момент для Greenplum это только PostgreSQL 12-ой версии в 7X.

К особенностям использования Citus я бы отнёс и необходимость знать о наличии скрытых от пользователей служебных таблиц шардов, которые размещаются в тех же схемах, что и распределённые таблицы. По факту системный каталог pg_class содержит заведомо больше строк, чем отображается пользователям.

У наших крупных корпоративных пользователей число таблиц может исчисляться и сотнями тысяч (в основном это партицированные таблицы). Разбиение таблиц на шарды ещё больше увеличит число файлов, как и итоговый объём метаданных. Кроме того, в общем случае расширению приходится скрывать эти таблицы, в том числе в pg_class. Причём делать это довольно низкоуровнево при разборе дерева запроса, отбрасывая такие таблицы. Разработчики утверждают, что "[…​] so you might expect to see them when connecting to a worker node and running \d. While this was previously the case, it caused confusion among users and also breaks tools like pg_dump ".

Например, для исходной партицированной таблицы с двумя партициями при создании распределённой таблицы с числом шардов по умолчанию будет создано 66 таблиц шардов (распределённых по нодам кластера):

\dt
               List of relations
 Schema |       Name        | Type  |  Owner
--------+-------------------+-------+----------
 andrey | part_table_part01 | table | postgres
 andrey | part_table_part02 | table | postgres
(2 rows)
SELECT create_distributed_table('part_table', 'id');
 create_distributed_table
--------------------------

(1 row)
SELECT logicalrelid, COUNT(*) AS shard_table_count FROM pg_dist_shard GROUP BY logicalrelid;
   logicalrelid    | shard_table_count
-------------------+-------------
 part_table        |          32
 part_table_part01 |          32
 part_table_part02 |          32
(3 rows)

Архитектура Citus и основные понятия

Citus работает с несколькими видами таблиц:

  • Таблицы, управляемые Citus. Как только Citus понимает, что это управляемая им распределённая таблица (а понимает он это по наличию записи во внутренней таблице метаданных pg_dist_partition, см. ниже пример такой записи), планирование запросов к этим таблицам осуществляется самостоятельно.

  • Исходные, так называемые shell-таблицы. Пользователь может преобразовать PostgreSQL-таблицу в Citus-таблицу путём вызова функций расширения Citus (например, create_distributed_table).

Пример записи в pg_dist_partition:

SELECT logicalrelid, partmethod, partkey, colocationid FROM pg_dist_partition LIMIT 1;
 logicalrelid | partmethod |                                                         partkey                                                          | colocationid
--------------+------------+--------------------------------------------------------------------------------------------------------------------------+--------------
 dist_table   | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} |            1
(1 row)

В данном случае имя логической таблицы (бывшей "обычной" локальной таблицы PostgreSQL) содержится в поле logicalrelid. Для данной таблицы распределение осуществляется по хешу (колонка partmethod), в колонке partkey содержится служебная информация по ключу распределения.

В Greenplum такого разделения не существует. Для таблиц append optimized существуют так называемые aux-таблицы, но они никак не скрываются и видны пользователям и/или администратору базы данных. Как следствие, нет необходимости и в явном преобразовании и неявном копировании исходных данных вызовом каких-либо специальных функций.

Таблицы, управляемые Citus, имеют следующие типы:

  • Шелл-таблица (shell table). Если пользователь преобразует обычную PostgreSQL-таблицу в Citus-таблицу, то эта исходная таблица становится скрытой от пользователя shell-таблицей. Такая таблица вместе с метаданными Citus будет использоваться при планировании, чтобы преобразовать запросы к локальным таблицам в распределённый план, работающий в конечном итоге с таблицами шардов Citus.

  • Таблица шарда (shard table). По сути, обычная таблица PostgreSQL, которая содержит данные из shell-таблицы. Данные из shell-таблицы копируются на этапе создания Citus-таблицы. По умолчанию shell-таблица разбивается на 32 таблицы шардов, но число шардов можно поменять. Как уже было упомянуто ранее, таблицы шардов не видны пользователям (в том числе и в запросах к каталогу), но заявляется наличие параметра GUC, который управляет этой видимостью: citus.override_table_visibility.

  • Распределённая по некоторому ключу шардирования таблица (distributed table). Выбор ключа шардирования — основной (и, как мне представляется, ограничивающий) элемент эффективной работы с Citus. Ключ шардирования определяет размещение записи в определённом шарде. С шардом связан некоторый диапазон, который определяет какие ключи попадут на этот шард (эта информация содержится в служебной таблице pg_dist_shard, см. пример далее). Есть несколько схем шардирования, но для краткости рассмотрим распределение по хешу (hash-distributed tables). Таблицы с одинаковым диапазоном будут размещаться на одних и тех же нодах для обеспечения так называемого co-located размещения. Шарды с одинаковым диапазоном хешей размещаются на одних и тех же узлах, чтобы обеспечить локальность операций соединений таблиц по ключу/хешу. Это условие соблюдается и в случае перераспределения данных (rebalance).

    Шардирование по хешу
    Шардирование по хешу

    Такое размещение шардов позволит Citus реализовать наиболее эффективную схему соединений без перераспределения данных таблиц и предоставить пользователю наиболее полный набор возможных операций с данными в рамках SQL-запросов. Что происходит в случае, когда пользователь не думает/не хочет/не может поддержать такую схему размещения на практике, мы рассмотрим в отдельном разделе, посвященном деталям планирования и выполнения запросов. Да, ключ распределения может состоять только из одной колонки not null, так как функция create_distributed_table принимает аргумент distribution_column, подразумевающий одну колонку. Также, для некоторым образом связанных таблиц (логически) можно указать одну группу размещения.

  • Справочная таблица (reference table). Таблица с одним шардом, который реплицируется на все узлы кластера. Как следует из названия, такие таблицы подходят для измерений в терминах хранилищ данных. Прямой аналог replicated-таблиц в Greenplum.

  • Локальная таблица (local table). Не путать с обычной таблицей PostgreSQL. Для поддержки возможности запроса пользователя с любого узла кластера, который использует PostgreSQL-таблицу на узле координатора, Citus требует наличия метаданных этой таблицы на остальных узлах. Также используется в сценариях ограничений по FK между локальными таблицами и reference-таблицами.

  • Таблица одного шарда (single shard tables). Применяется для шардирования schema-based, подробно рассматриваться в рамках этой статьи не будет.

Особенности планирования запросов

В Citus реализован так называемый многоуровневый подход к планированию запросов. Citus встраивается в процесс планирования за счёт использования доступных расширениям функций обратного вызова planner_hook, set_rel_pathlist_hook, get_relation_info_hook, set_join_pathlist_hook.

Входная точка планирования с точки зрения Citus — реализация distributed_planner. Для того, чтобы планировщик Citus включился в процесс, в запросе должна использоваться хотя бы одна таблица Citus. Является ли таблица соответствующей этому признаку определяется по внутренней системной таблице pg_dist_partition.

Планировщик разделяется на несколько уровней, которые задействуются в зависимости от структуры (можно даже сказать, от условной "сложности") запроса:

  • fast path planner

  • router planner

  • recursive planner

  • logical planner/optimizer

Каждый следующий уровень реализует более сложные либо высокоуровневые (группировки, сортировки и т.п.) подходы к планированию запросов.

Fast path planner

Fast path planner ограничен возможностями планирования запросов, по сути, затрагивающих один шард (либо использующих reference-таблицы).

Также запрос:

  • не должен быть INSERT …​ SELECT;

  • не должен использовать CTE, подзапросы, операции типа UNION/UNION ALL;

  • предикат выбора шарда должен фигурировать только в условии WHERE и быть единственным условием в сравнении вида dist_key = const, где dist_key — ключ распределения;

  • для свежих версий PostgreSQL также в эти ограничения попадают MERGE-запросы.

Этот тип планировщика ориентирован на OLTP-нагрузку, когда запрос можно перенаправить в один шард. В этом варианте планирования не происходит вызова standard_planner. Судя по документации, тем самым разработчики хотели избежать лишних накладных расходов, связанных с планированием стандартным планировщиком PostgreSQL. Такой вариант хорошо подходит для схем с использованием шардирования по некоторому ключу для так называемых Multi-tenant приложений. Например, когда все операции с таблицами ограничены каким-то определенным шардом, который соответствует id пользователя.

Router planner

Router planner дополняет fast path planner возможностью запрашивать несколько таблиц, но всё ещё находясь в границах одного шарда согласно условиям фильтрации по dist_key. Как было описано, Citus располагает реальные данные таблиц в отдельных таблицах шардов, которые в общем случае не видны пользователям. На этом этапе планировщик подменяет исходную, видимую пользователю распределённую таблицу на таблицы шардов с учётом их распределения по нодам. Данный тип планировщика уже умеет планировать подзапросы и CTE.

Например, для случая, когда таблицы a и b шардированы по одной колонке c1, следующий запрос может быть спланирован так:

EXPLAIN (COSTS OFF, VERBOSE ON) SELECT a.c1,
  (SELECT COUNT(*) FROM b
   WHERE b.c1 = 1 AND b.c1 = a.c1)
FROM a WHERE a.c1 = 1;
                           QUERY PLAN
----------------------------------------------------------------
Custom Scan (Citus Adaptive)
   Output: remote_scan.c1, remote_scan.count
   Task Count: 1 
   Tasks Shown: All
   ->  Task
         Query: SELECT c1, (SELECT count(*) AS count FROM public.b_102332 b WHERE ((b.c1 OPERATOR(pg_catalog.=) 1) AND (b.c1 OPERATOR(pg_catalog.=) a.c1))) AS count FROM public.a_102300 a WHERE (c1 OPERATOR(pg_catalog.=) 1)
         Node: host=citus_host port=5432 dbname=citus_db
         ->  Seq Scan on public.a_102300 a
               Output: a.c1, (SubPlan 1)
               Filter: (a.c1 = 1)
               SubPlan 1 
                 ->  Aggregate
                       Output: count(*)
                       ->  Result
                             One-Time Filter: (a.c1 = 1)
                             ->  Seq Scan on public.b_102332 b 
                                   Output: b.c1, b.c2, b.c3
                                   Filter: (b.c1 = 1)

На что можно обратить внимание:

В строке, отмеченной цифрой 3, можно увидеть Seq Scan по реальной физической таблице public.b_102332 (данные шарда). Как мы говорили ранее, эта таблица в общем случае не видна пользователям, но в плане она фигурирует как один из 32 шардов таблицы b.

Так как данные таблицы b шардированы по колонке c1 и в фильтре указано условие b.c1 = 1, то планировщик может задействовать router planner (fast path planner задействовать нельзя, так как в запросе присутствует подзапрос) и перенаправить запрос только в один шард.

В строке с номером 2 можно увидеть, что эта часть запроса планируется в виде подплана (SubPlan 1) и с точки зрения реального выполнения запроса может быть выполнена одним заданием (строка с номером 1 Task Count: 1). О заданиях (Task) подробнее поговорим в разделе про следующий тип планировщика recursive planning.

Если проводить сравнение схожего запроса в Greenplum, то можно выделить следующие сходства и различия:

                          QUERY PLAN
----------------------------------------------------------------
 Gather Motion 32:1  (slice1; segments: 32) 
   Output: a.c1, (COALESCE((count()), '0'::bigint))
   ->  Result
         Output: a.c1, COALESCE((count()), '0'::bigint)
         ->  Result
               Output: (count()), a.c1
               ->  Hash Left Join 
                     Output: a.c1, (count())
                     Hash Cond: (a.c1 = b.c1)
                     ->  Seq Scan on tpcds.a
                           Output: a.c1
                           Filter: (a.c1 = 1)
                     ->  Hash 
                           Output: (count()), b.c1
                           ->  GroupAggregate
                                 Output: count(), b.c1 
                                 Group Key: b.c1
                                 ->  Sort
                                       Output: b.c1
                                       Sort Key: b.c1
                                       ->  Seq Scan on tpcds.b 
                                             Output: b.c1
                                             Filter: (b.c1 = 1) 
 Optimizer: Pivotal Optimizer (GPORCA)

Greenplum не выделяет этот сценарий с точки зрения перенаправления в один сегмент — в плане присутствует Gather Motion 32:1 (строка, отмеченная цифрой 1), который собирает данные с сегментов от нижележащих узлов. За счёт применения фильтра (строка с номером 6) объём данных в данном случае может быть сокращен, но формально план не будет отличаться для любого количества строк. Таблицы в Greenplum для этого запроса распределены по a.c1 и b.c1, и это позволяет планировщику, собрав данные от узла Seq Scan (строка 5), сразу же вычислить и агрегирующую функцию count(), b.c1 (строка 4), далее собрать по этим данным хеш-таблицу (строка 3) и осуществить локальное (в рамках сегмента) соединение Hash Left Join (строка 2).

По факту, план Citus выглядит более оптимальным для данного сценария, так как в его случае запрос сразу же перенаправляется в нужный шард (а это по умолчанию 1/32 от общего объёма данных всей таблицы), никакого ожидания остальных нод не происходит. Разумеется, это отражается на времени выполнения:

// Citus

$ time psql -d citus_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 = 1 AND b.c1 = a.c1) FROM a WHERE a.c1 = 1;'
real	0m0.577s
user	0m0.003s
sys	0m0.005s

// Greenplum

$ time psql -d greenplum_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 = 1 AND b.c1 = a.c1) FROM a WHERE a.c1 = 1;'
real	0m1.378s
user	0m0.001s
sys	0m0.004s

Однако картина сильно меняется в случае такого запроса (в данном случае уже используется более продвинутый планировщик recursive planner, который мы рассмотрим в следующем разделе):

SELECT a.c1,
  (SELECT COUNT(*)
   FROM b
   WHERE b.c1 > 100 AND b.c1 = a.c1)
FROM a WHERE a.c1 > 19990000;
                           QUERY PLAN
----------------------------------------------------------------
Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=12) (actual time=18601.915..18603.189 rows=10000 loops=1)
   Task Count: 32
   Tuple data received from nodes: 117 kB
   Tasks Shown: One of 32
   ->  Task
         Tuple data received from node: 3852 bytes
         Node: host=citus_host port=5432 dbname=citus_db
         ->  Seq Scan on a_102300 a  (cost=0.00..3948635.00 rows=308 width=12) (actual time=187.565..18594.736 rows=321 loops=1)
               Filter: (c1 > 19990000)
               Rows Removed by Filter: 626133
               SubPlan 1 
                 ->  Aggregate  (cost=12783.81..12783.82 rows=1 width=8) (actual time=57.572..57.572 rows=1 loops=321) 
                       ->  Seq Scan on b_102332 b  (cost=0.00..12783.81 rows=1 width=0) (actual time=57.549..57.567 rows=1 loops=321)
                             Filter: ((c1 > 100) AND (c1 = a.c1))
                             Rows Removed by Filter: 626453 
             Planning Time: 0.261 ms
             Execution Time: 18595.255 ms
 Planning Time: 3.022 ms
 Execution Time: 18604.370 ms

Greenplum строит более продвинутый план, стоит только обратить внимание на строки, отмеченные цифрами 1 и 3:

                                  QUERY PLAN
------------------------------------------------------------------------------
Gather Motion 32:1  (slice1; segments: 32) (actual time=116.323..406.430 rows=1010000 loops=1)
   Output: a.c1, (COALESCE((count()), '0'::bigint))
   ->  Result (actual time=116.722..141.599 rows=31929 loops=1)
         Output: a.c1, COALESCE((count()), '0'::bigint)
         ->  Result (actual time=116.722..137.731 rows=31929 loops=1)
               Output: (count()), a.c1
               ->  Hash Left Join (actual time=116.721..134.325 rows=31929 loops=1) 
                     Output: a.c1, (count())
                     Hash Cond: (a.c1 = b.c1)
                     Executor Memory: 39454kB  Segments: 32  Max: 1248kB (segment 13)
                     work_mem: 39454kB  Segments: 32  Max: 1248kB (segment 13)  Workfile: (0 spilling)
                     Extra Text: (seg13)  Hash chain length 1.1 avg, 4 max, using 30053 of 262144 buckets.Hash chain length 4.0 avg, 13 max, using 8027 of 8192 buckets; total 8 expansions.
                     ->  Seq Scan on tpcds.a (actual time=40.173..44.500 rows=31929 loops=1)
                           Output: a.c1
                           Filter: (a.c1 > 19990000)
                     ->  Hash (actual time=76.270..76.270 rows=31929 loops=1)
                           Output: (count()), b.c1
                           ->  HashAggregate (actual time=65.918..69.498 rows=31929 loops=1)
                                 Output: count(), b.c1 
                                 Group Key: b.c1 
                                 Executor Memory: 55335kB  Segments: 32  Max: 1738kB (segment 0)
                                 Extra Text: (seg13)  Hash chain length 4.0 avg, 13 max, using 8027 of 8192 buckets; total 8 expansions.
                                 ->  Seq Scan on tpcds.b (actual time=54.383..59.257 rows=31929 loops=1)
                                       Output: b.c1
                                       Filter: ((b.c1 > 100) AND (b.c1 > 19990000)) 
 Planning time: 6.827 ms
   (slice0)    Executor memory: 151K bytes.
   (slice1)    Executor memory: 7933K bytes avg x 32 workers, 7941K bytes max (seg0).  Work_mem: 1248K bytes max.
 Memory used:  122880kB
 Optimizer: Pivotal Optimizer (GPORCA)
 Execution time: 464.998 ms

Первое значимое отличие, что Greenplum рассчитал агрегат COUNT и сгруппировал результаты по колонке b.c1 (строки с номерами 2 и 3). Второе отличие, что GPORCA пробросила условие фильтрации (b.c1 > 19990000) вглубь плана (строка 4), что значимо сократило выборку данных из таблицы b. При этом для соединения был использован Hash Left Join (строка 1) — фактически проходя по таблице а с условием (a.c1 > 19990000) из хеш-таблицы (строка 3), получаем искомый результат.

Что делает Citus: фактически узел последовательного чтения Seq Scan по таблице a для каждой строки вызывает подплан SubPlan 1 (строка c номером 1), при этом условие фильтра (c1 > 100) чтения по таблице b (строка 3) приводит к полному чтению таблицы b (отброс первых 100 строк не в счёт). И так происходит 321 раз исходя из значения счетчика loops=321 (строка 2) для каждого из 32 шардов.

Нетрудно себе представить, что это приведёт к разительной разнице по времени выполнения. Даже для малого количества строк в таблицах a и b (по 21 млн. строк в каждой) разница во времени составила:

// Citus

$ time psql -d citus_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 >100 AND b.c1 = a.c1) FROM a WHERE a.c1 > 19990000;' >> /dev/null
real	0m16.556s
user	0m0.013s
sys	0m0.004s

// Greenplum

$ time psql -d greenplum_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 >100 AND b.c1 = a.c1) FROM a WHERE a.c1 > 19990000;' >> /dev/null
real	0m0.319s
user	0m0.002s
sys	0m0.004s

Если таблицы не шардированы по одной и той же колонке (в моём случае я пересоздал таблицы и шардировал одну по a.c1, вторую — по b.c2), то попытка выполнения такого запроса завершится неудачей:

SELECT a.c1,
  (SELECT COUNT(*) FROM b
   WHERE b.c1 = 1 AND b.c1 = a.c1)
FROM a
WHERE a.c1 = 1;
ERROR:  complex joins are only supported when all distributed tables are co-located and joined on their distribution columns

Greenplum в свою очередь не испытывает тут каких-либо трудностей, как мы говорили ранее, план будет в целом похож на исходный (для случая распределения по одной колонке), но добавляется узел Redistribute Motion (строка, отмеченная цифрой 1):

                                        QUERY PLAN
------------------------------------------------------------------------------------------
 Gather Motion 32:1  (slice2; segments: 32)
   Output: a.c1, (COALESCE((count()), '0'::bigint))
   ->  Result
         Output: a.c1, COALESCE((count()), '0'::bigint)
         ->  Result
               Output: (count()), a.c1
               ->  Hash Left Join
                     Output: a.c1, (count())
                     Hash Cond: (a.c1 = b.c1)
                     ->  Seq Scan on public.a
                           Output: a.c1
                           Filter: (a.c1 = 1)
                     ->  Hash
                           Output: (count()), b.c1
                           ->  GroupAggregate
                                 Output: count(), b.c1
                                 Group Key: b.c1
                                 ->  Sort
                                       Output: b.c1
                                       Sort Key: b.c1
                                       ->  Redistribute Motion 32:32  (slice1; segments: 32) 
                                             Output: b.c1
                                             Hash Key: b.c1
                                             ->  Seq Scan on public.b
                                                   Output: b.c1
                                                   Filter: (b.c1 = 1)
 Optimizer: Pivotal Optimizer (GPORCA)

Подводя итоги по планировщикам fast path planner и router planner:

  • Основным ограничением этих планировщиков является возможность либо планирования запросов только в рамках определенного шарда, либо — если возможно выполнение запроса по отдельности на каждом шарде — осуществления слияния (merge) результатов "наверху" плана. По сути, в этих запросах возможны подзапросы, которые могут быть выполнены независимо на шардах.

  • Сложные соединения не поддерживаются (например, outer join), подзапросы с соединениями также не поддерживаются.

Эти ограничения снимаются в следующем, более продвинутом типе планировщика recursive planner.

Recursive planner

Recursive planner реализует:

  • Возможность делать соединения таблиц, которые определены как colocated согласно их ключам распределения по шардам.

  • Возможность спланировать подзапросы/CTE отдельно (изолированно) таким образом, чтобы манипуляции с результатами выполнения этих подзапросов можно было бы свести к локальным операциям в рамках шарда.

Данный тип планировщика пытается рассмотреть каждую возможную часть запроса на предмет её изолированного выполнения в pushdown-варианте. Суть этого действия в следующем: условно, на вход планировщика попадает дерево запроса, которое анализируется вглубь. В ходе планирования все подзапросы и CTE проверяются на их возможность исполнения непосредственно на рабочих нодах (worker). Если для данного подзапроса/CTE это невозможно, то для него осуществляется вызов обычного планировщика PostgreSQL (planner). Результат добавляется в список планов подзапросов. Важный момент состоит в том, что эти подпланы запускаются до начала обработки распределённого плана выполнения (distributed plan). Результаты отработки этих подпланов записываются во временные файлы на диске. Мы рассмотрим этот процесс в подробностях чуть позднее.

Таким подходом разработчики решают основную задачу — возможность поддержки выполнения большинства видов запросов SQL (да, исключения есть: среди них отсутствие поддержки GROUPING SETS, CUBE, ROLLUP).

Получив и, по сути, материализовав промежуточные результаты подзапроса на каждой ноде/шарде, Citus может соединять эти данные с распределёнными таблицами по любому ключу.

Стоит отметить, что в отличие от Greenplum рекурсивные и модифицирующие CTE, если запрос не ограничен одним шардом, не поддерживаются:

WITH RECURSIVE h AS
(
  SELECT a.c1, a.c3, 0 AS level FROM a
  WHERE a.c2 IS NULL
  UNION ALL
  SELECT a.c1, a.c3, level + 1
  FROM a, h
  WHERE a.c1 = h.c1
)
SELECT * FROM h;
ERROR:  recursive CTEs are only supported when they contain a filter on the distribution column

Предлагаю рассмотреть пример работы данного планировщика на паре запросов:

SELECT COUNT(*)
FROM
  (SELECT *
   FROM a
   ORDER BY a.c3
   LIMIT 10000000) AS foo;
                                        QUERY PLAN
------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual time=3399.533..3399.535 rows=1 loops=1) 
   ->  Distributed Subplan 3_1 
         Subplan Duration: 38294.62 ms
         Intermediate Data Size: 238 MB
         Result destination: Write locally
         ->  Limit (actual time=22744.153..24169.593 rows=10000000 loops=1) 
               ->  Sort (actual time=22744.151..23669.158 rows=10000000 loops=1) 
                     Sort Key: remote_scan.c3
                     Sort Method: external merge  Disk: 430592kB
                     ->  Custom Scan (Citus Adaptive) (actual time=15420.113..17028.325 rows=20000000 loops=1)
                           Task Count: 32
                           Tuple data received from nodes: 210 MB
                           Tasks Shown: One of 32
                           ->  Task
                                 Tuple data received from node: 6713 kB
                                 Node: host=citus_host port=5432 dbname=citus_db
                                 ->  Limit (actual time=172.560..383.698 rows=624917 loops=1)
                                       ->  Sort (actual time=172.558..313.678 rows=624917 loops=1) 
                                             Sort Key: c3
                                             Sort Method: external merge  Disk: 13504kB
                                             ->  Seq Scan on a_102317 a (actual time=0.016..68.522 rows=624917 loops=1) 
                                     Planning Time: 0.045 ms
                                     Execution Time: 500.327 ms
         Planning Time: 0.000 ms
         Execution Time: 24575.273 ms
   Task Count: 1
   Tuple data received from nodes: 8 bytes
   Tasks Shown: All
   ->  Task
         Tuple data received from node: 8 bytes
         Node: host=citus_host port=5432 dbname=citus_db 
         ->  Aggregate (actual time=3357.556..3357.556 rows=1 loops=1)
               ->  Function Scan on read_intermediate_result intermediate_result (actual time=2233.802..2988.018 rows=10000000 loops=1) 
             Planning Time: 0.051 ms
             Execution Time: 3398.382 ms
 Planning Time: 1.654 ms
 Execution Time: 41694.215 ms

Recursive planner принимает решение спланировать подзапрос (строка, отмеченная цифрой 1) через подплан (строка с номером 2), собрать результат выполнения подплана на одной ноде и подсчитать итоговую агрегирующую функцию на основе промежуточных данных (строка 7).

Предлагаю разобрать, почему планировщик принимает такие решения и как осуществляется промежуточное хранение результатов.

Согласно правилам pushdown для подзапросов наличие LIMIT требует оформления подзапроса в виде отдельного подплана. Если рассматривать план снизу вверх, то строки от Seq Scan (строка 6) подаются на вход узлу сортировки Sort (строка 5), далее накладывается условие ограничение строк LIMIT. Эти действия выполняются параллельно на всех шардах таблицы (по умолчанию равных 32). Таким образом, на выходе этих узлов плана от каждого шарда приходят LIMIT N строк, отсортированных по a.c3. Однако, чтобы предоставить пользователю именно LIMIT N строк согласно ожидаемой пользователем сортировке, планировщику приходится упорядочить данные от всех шардов и только потом применить итоговое условие по LIMIT (строки 3 и 4). В противном случае пользователь может получить совсем не то, что он ожидает. Эти результаты кешируются на ноде-координаторе запроса и становятся доступны остальной части плана через функцию pg_catalog.read_intermediate_result. Механизм работы функции read_intermediate_result описан в разделе Особенности выполнения запросов.

Планировщик Greenplum планирует запрос так:

                      QUERY PLAN
------------------------------------------------------
Aggregate (actual time=4350.773..4350.773 rows=1 loops=1)
   ->  Limit (actual time=254.085..3491.350 rows=10000000 loops=1) 
         ->  Gather Motion 32:1  (slice1; segments: 32) (actual time=254.081..2457.549 rows=10000000 loops=1) 
               Merge Key: c3 
               ->  Limit (actual time=240.614..352.447 rows=657684 loops=1)
                     ->  Sort (actual time=240.609..285.729 rows=657684 loops=1)
                           Sort Key: c3 
                           Sort Method:  top-N heapsort  Memory: 1310496kB
                           ->  Seq Scan on a (actual time=0.078..110.649 rows=657714 loops=1)
 Planning time: 27.504 ms
   (slice0)    Executor memory: 2105K bytes.
   (slice1)    Executor memory: 40994K bytes avg x 32 workers, 40994K bytes max (seg0).  Work_mem: 40953K bytes max.
 Memory used:  122880kB
 Optimizer: Pivotal Optimizer (GPORCA)
 Execution time: 4400.946 ms

Тут стоит обратить внимание на шаг Gather Motion 32:1 и его параметризацию в виде Merge Key: c3 (строки 2 и 3). Планировщик GPORCA, зная, что результаты с каждого сегмента будут приходить упорядоченно согласно a.c3 (строка 4), имеет возможность сделать, по сути, соединение слиянием и применить LIMIT уже на результатах этого набора строк (строка 1). Этот шаг формально соответствует узлу Limit и Sort под ним (строки 3 и 4 в плане Citus).

Ключевое различие планов состоит в том, что Citus для каждого шарда выполняет распределённый подплан Distributed Subplan (строка 2), в рамках этого плана общий результат, полученный от шардов, сортируется (строка 4) и ограничивается узлом Limit (строка 3). Результаты собираются на координаторе запроса и материализуются в виде файлов, которые на координаторе вычитываются функцией read_intermediate_result (строка 8).

Greenplum в рамках Gather Motion делает это на лету и на практике показывает лучший результат:

// Citus

$ time psql -d citus_db -c 'SELECT COUNT(*) FROM (SELECT * FROM a ORDER BY a.c3 LIMIT 10000000) AS foo;' >> /dev/null

real	0m40.617s
user	0m0.002s
sys	0m0.003s

// Greenplum

$ time psql -d greenplum_db -c 'SELECT COUNT(*) FROM (SELECT * FROM a ORDER BY a.c3 LIMIT 10000000) AS foo;' >> /dev/null

real	0m2.892s
user	0m0.003s
sys	0m0.004s

Этот же тип планировщика позволяет спланировать запросы с HAVING в подзапросах, однако связанные (correlated) запросы ему в отличие от Greenplum уже не по зубам:

SELECT foo.c1, foo.cnt, a.c3
FROM (SELECT b.c1, COUNT(*) AS cnt FROM b GROUP BY b.c1) foo
INNER JOIN a
  ON a.c1 = foo.c1
GROUP BY foo.c1, foo.cnt, a.c3
HAVING foo.cnt >= (SELECT COUNT(*) FROM b WHERE b.c1 >= foo.cnt);
ERROR:  Subqueries in HAVING cannot refer to outer query

Greenplum же справляется с этой задаче без проблем:

                                          QUERY PLAN
-----------------------------------------------------------------------------------------------
Gather Motion 32:1  (slice2; segments: 32)
   ->  HashAggregate
         Group Key: b.c1, (count()), a.c3
         ->  Hash Join
               Hash Cond: (b.c1 = a.c1)
               ->  Result
                     Filter: ((count()) >= COALESCE(((SubPlan 1)), '0'::bigint))
                     ->  Result
                           ->  HashAggregate
                                 Group Key: b.c1
                                 ->  Seq Scan on b
                           SubPlan 1  (slice2; segments: 32)
                             ->  Aggregate
                                   ->  Result
                                         Filter: (b_1.c1 >= (count()))
                                         ->  Materialize
                                               ->  Broadcast Motion 32:32  (slice1; segments: 32)
                                                     ->  Seq Scan on b b_1
               ->  Hash
                     ->  Seq Scan on a
 Optimizer: Pivotal Optimizer (GPORCA)

Даже на довольно простых примерах Citus заставляет задумываться, а что мне нужно сделать со схемой и распределением данным, чтобы запрос всё-таки выполнялся. Планировщик GPORCA на практике выглядит более продвинутым.

Например, для коррелированных подзапросов:

CREATE TABLE sales(brand TEXT, size INT, price NUMERIC(10,2), store INT);
CREATE TABLE
SELECT COUNT(*)
FROM sales s1
WHERE  s1.size > 32 OR s1.price > (SELECT avg(s2.price) FROM sales s2 WHERE s2.brand = s1.brand);
 count
-------
     0
(1 row)
SELECT COUNT(*)
FROM sales s1
WHERE s1.seller
IN (SELECT s2.seller
        FROM sales s2
        WHERE s2.price = (SELECT min(s3.price) FROM sales s3 WHERE s3.brand = s1.brand));
ERROR:  complex joins are only supported when all distributed tables are co-located and joined on their distribution columns

Для первого запроса большие таблицы могут быть в дополнение к шардированию партицированы по каким-либо полям.

Logical Planner & Optimizer

Подробное рассмотрение этого уровня планирования выходит за рамки статьи. Задачи этого уровня отчасти сходны с решаемыми задачами groupping_planner в PostgreSQL: это операции GROUP BY, ORDER BY, LIMIT, агрегирующие функции.

Также на этом уровне осуществляется оптимизация операций соединений. Основная цель этой оптимизации состоит в сокращении числа соединений, ведущих к перераспределению данных. Судя по всему, ввиду специфики перераспределения на основе записи в файл промежуточных результатов и COPY-протокола (об этом далее), оценки cost-based не производится.

Особенности выполнения запросов

Citus строит свой план выполнения на основе реализации узла CustomScan.

Предлагаю разобрать выполнение запроса с точки зрения его процесса исполнения на простом запросе, но затрагивающем некоторые интересные моменты вроде выполнения подпланов, сохранения промежуточных результатов в виде файлов и их broadcast-копирования на ноды.

Для начала сам запрос и его план:

SELECT COUNT(DISTINCT b.c2)
FROM (SELECT DISTINCT a.c2 FROM a)
AS foo, b
WHERE foo.c2 = b.c2;
                                          QUERY PLAN
-----------------------------------------------------------------------------------------------
 Aggregate
   Output: count(DISTINCT remote_scan.count)
   ->  Custom Scan (Citus Adaptive) 
         Output: remote_scan.count
         ->  Distributed Subplan 24_1 
               ->  HashAggregate 
                     Output: remote_scan.c2
                     Group Key: remote_scan.c2
                     ->  Custom Scan (Citus Adaptive) 
                           Output: remote_scan.c2
                           Task Count: 32
                           Tasks Shown: One of 32
                           ->  Task
                                 Query: SELECT DISTINCT c2 FROM public.a_102809 a WHERE true
                                 Node: host=citus_host port=5432 dbname=citus_db
                                 ->  HashAggregate
                                       Output: c2
                                       Group Key: a.c2
                                       ->  Seq Scan on public.a_102809 a
                                             Output: c1, c2, c3
         Task Count: 32
         Tasks Shown: One of 32 
         ->  Task
               Query: SELECT worker_column_1 AS count FROM (SELECT b.c2 AS worker_column_1 FROM (SELECT intermediate_result.c2 FROM read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(c2 integer)) foo, public.b_102841 b WHERE (foo.c2 OPERATOR(pg_catalog.=) b.c2)) worker_subquery GROUP BY worker_column_1
               Node: host=citus_host port=5432 dbname=citus_db
               ->  HashAggregate
                     Output: b.c2 
                     Group Key: b.c2
                     ->  Hash Join
                           Output: b.c2  
                           Hash Cond: (intermediate_result.c2 = b.c2)
                           ->  Function Scan on pg_catalog.read_intermediate_result intermediate_result
                                 Output: intermediate_result.c2
                                 Function Call: read_intermediate_result('24_1'::text, 'binary'::citus_copy_format)
                           ->  Hash
                                 Output: b.c2
                                 ->  Seq Scan on public.b_102841 b
                                       Output: b.c2

Мы уже знаем, что такой запрос приводит к созданию и выполнению подплана (строка, отмеченная цифрой 1), использованию результатов этого подплана в Hash Join (строка с номером 6) и чтению промежуточных результатов в inner-части соединения (строка 7).

Citus встраивается в процесс выполнения запросов в таких точках расширения: ExecutorStart_hook, ExecutorRun_hook, ExplainOneQuery_hook, prev_ExecutorEnd, ExecutorEnd_hook.

Процесс выполнения запроса
Процесс выполнения запроса

Начнём наш отсчет от реализации ExecutorStart_hook функции обратного вызова CitusExecutorRun. Для такого запроса как наш, ядро PostgreSQL вызывает функцию CitusExecutorRun в ExecutorRun.

CitusExecutorRun помимо вспомогательных, специфичных для Citus задач вроде пропуска проверок ограничений (имеется в виду constraints check) на координаторе для запросов ALTER TABLE (это делегируется на ноды), решает основную задачу — поиск узлов "состояние сканирования" (CitusScanState), которые ранее были добавлены одним из планировщиков.

CitusScanState представляет собой структуру-"контейнер", которая включает в себя:

  • состояние узла CustomScan (CustomScanState);

  • указатель на распределённый план выполнения (DistributedPlan);

  • хранилище строк (кортежей) под результаты выполнения распределённого плана (Tuplestorestate);

  • указатель на функцию обратного вызова PreExecScan (об этом подробнее далее) и ряд других полей.

План (если быть точнее, то узлы PlanState) обходится вглубь, и найденные экземпляры CitusScanState добавляются в список.

Для нашего случая в этом списке будет один узел Custom Scan (Citus Adaptive) (строка 1) и ассоциированный с ним экземпляр CustomScanState. Для каждого из найденных узлов и ассоциированного с ним распределённого плана выполнения для распределённых таблиц берутся блокировки всех партиций, а также выполняются все подпланы, начиная с самого верхнего. В нашем случае это будет подплан Distributed Subplan 24_1.

Получение результатов на нодах осуществляется через экземпляр DestReceiver. Нода-координатор создаёт так называемый RemoteFileDestReceiver.

В его задачи входит:

  • установление соединений с нодами по протоколу libpq;

  • отправка команды COPY этим нодам (об этом подробнее далее);

  • приём кортежей;

  • подготовка данных кортежей под команду COPY и пересылка этих строк на ноды (BroadcastCopyData).

Как только RemoteFileDestReceiver инициализирован, Citus начинает выполнение подплана (строка 2). Для этого используется механизм порталов (Portal). В портал отправляется спланированная ранее часть запроса HashAggregate (строка 3). Сам портал принимает в качестве одного из своих аргументов экземпляр DestReceiver, которым и выступает RemoteFileDestReceiver.

В ходе обработки запроса порталом подзапрос опять попадает на планирование, происходит штатный вызов ExecutorStart_hook, который в свою очередь вызывает функцию CitusExecutorRun. Она по аналогии с основным запросом находит и выполняет вложенный в подзапрос узел CustomScan (строка 4).

Далее план попадает на вход функции ядра ExecutePlan. Так как верхнеуровневым узлом плана является узел агрегатной функции HashAggregate (строка 3, наш COUNT(DISTINCT)), то управление передается ядру (модуль nodeAgg.c), а оно в свою очередь вызывает нижележащий узел, которым является узел CustomScan Citus.

Для выполнения задач сканирования (то есть непосредственного получения строк) реализация CustomScan использует так называемый AdaptiveExecutor. Полноценное рассмотрение этого модуля займёт ещё одну такую же статью, рассмотрим только основные задачи, касающиеся нашего запроса.

Верхнеуровнево, основная задача AdaptiveExecutor — выполнение набора заданий (list of tasks). Примером задания может служить запрос к шарду (по сути, к таблице). В нашем случае планировщик определил 32 задания (по числу шардов таблицы).

Также AdaptiveExecutor создает TupleStore для временного хранения полученных результатов.

Как работает функция read_intermediate_result:

Задача read_intermediate_result — представить промежуточный результат в формате COPY, полученный в ходе broadcast-выполнения подплана либо шафлинга (Repartitioning, об этом в деталях далее) в виде набора строк. Файл с промежуточным результатами парсится и преобразуется в те форматы колонок, которые вызывающая сторона передала в качестве параметров. Например, в случае нашего запроса (строка 5) read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(c2 integer) первый аргумент 24_1 представляет собой идентификатор результирующего набора строк, второй аргумент задаёт формат ('binary'). Идентификатор далее используется для получения имени файла в каталоге pgsql_job_cache.

Далее функция ReadFileIntoTupleStore перегоняет строки из формата COPY (используется PostgreSQL-реализация модуля copy.c) в Tuplestorestate. Далее результаты уже представлены в виде кортежей.

Где хранятся промежуточные результаты:

Промежуточные результаты выполнения запросов хранятся в каталоге base/pgsql_job_cache на каждой ноде. Если эти результаты используются в распределённой транзакции, то это каталог base/pgsql_job_cache/<user id>_<coordinator node id>_<transaction number>/. Если распределённая транзакция не используется, то этот путь base/pgsql_job_cache/<user id>_<process id>/.

Пример размещения:

$ pwd
/var/lib/pgsql/14/data/base/pgsql_job_cache/10_0_237
$ ll
total 178660
-rw------- 1 postgres postgres 182944635 Aug 27 02:14 1_1.data

Repartitioning

Как Citus делает соединение не по ключу шардирования в том случае, когда таблицы не распределены по этому ключу?

ВАЖНО

Необходимым условием для выполнения таких соединений является выставленный в on GUC citus.enable_repartition_joins. Иначе Citus ругается следующим образом: ERROR: the query contains a join that requires repartitioning. По умолчанию enable_repartition_joins выставлен в off.

Рассмотрим следующий запрос для случая, когда таблица a распределена по ключу a.c1, а таблица b по ключу b.c3 (для упрощения разбора механизма предварительно распределим каждую таблицу по 2 шардам):

SELECT a.c2, COUNT(*)
FROM a
INNER JOIN b ON a.c1 = b.c1
GROUP BY a.c2
LIMIT 100;
                                             QUERY PLAN
----------------------------------------------------------------------------------------------------
 Limit (actual time=88944.560..88944.563 rows=1 loops=1)
   Output: remote_scan.c2, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))
   ->  HashAggregate (actual time=88944.559..88944.560 rows=1 loops=1)
         Output: remote_scan.c2, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)
         Group Key: remote_scan.c2
         Batches: 1  Memory Usage: 40kB
         ->  Custom Scan (Citus Adaptive) (actual time=88944.536..88944.539 rows=12 loops=1)
               Output: remote_scan.c2, remote_scan.count
               Task Count: 12
               Tuple data received from nodes: 132 bytes
               Tasks Shown: None, not supported for re-partition queries
               ->  MapMergeJob
                     Map Task Count: 2
                     Merge Task Count: 12
               ->  MapMergeJob
                     Map Task Count: 2
                     Merge Task Count: 12
 Planning Time: 3.422 ms
 Execution Time: 88944.605 ms

Для выполнения подобного соединения задействуется repartitioning-механизм, основанный на разбиении на задания (Jobs) с одноуровневой иерархией, где некоторый набор заданий зависим от основного. По сути, этот набор заданий представляет собой вариацию подпланов, результаты выполнения которых перераспределяются между нодами согласно ключам. Отличие от обычных подпланов в том, что в заданиях отсутствует этап объединения результатов (имеется в виду этап merge), в подпланах результат в свою очередь всегда полностью перераспределяется по нодам (broadcast). Задания включаются в распределённый план выполнения и выполняются в 2 этапа.

На первом этапе на каждый из 4 шардов обеих таблиц (для обслуживания используется отдельное libpq-соединение) приходит запрос вида:

SELECT
	partition_index,
	'repartition_2198637379588_1' || '_' || partition_index::text , 
	rows_written
FROM pg_catalog.worker_partition_query_result(
	'repartition_2198637379588_1',
	'SELECT c2 AS column1, c1 AS column2 FROM public.a_102080 a WHERE true', 
	1,
	'hash',
	'{-2147483648,-1789569707,-1431655766,-1073741825,-715827884,-357913943,-2,357913939,715827880,1073741821,1431655762,1789569703}'::text[], 
	'{-1789569708,-1431655767,-1073741826,-715827885,-357913944,-3,357913938,715827879,1073741820,1431655761,1789569702,2147483647}'::text[], 
	true,
	true,
	true
)
WHERE rows_written > 0

Самое важное в этих запросах:

  • Из строки, отмеченной цифрой 1, в дальнейшем формируется название файла, который будет содержать промежуточные результаты запроса из строки с номером 2.

  • Строки 3 и 4 содержат диапазоны хеш-значений, согласно которым будет производиться разбиение задачи на части (Merge Task Count). Можно увидеть, что в массивах partition_min_values (первый аргумент-массив) и partition_max_values (второй) как раз по 12 элементов.

Внутри это приводит к вызову функции worker_partition_query_result. Задача этой функции — получить результаты выполнения запроса и записать их в локальные (для ноды) файлы, согласно схеме шардирования и ключу шардирования. Для получения и записи результатов в промежуточные файлы используются экземпляры DestReceiver. Для каждого файла с результатами создаётся свой получатель. При этом общее число файлов для одного шарда задаётся значением параметра Merge Task Count, в нашем случае равным 12. По готовности получателей запускается портал (PortalRun), который и исполняет запрос.

После выполнения этой операции для 1 из 4 шардов содержимое каталога pgsql_job_cache для соответствующей распределённой транзакции будет примерно таким:

$ ll
total 56
drwx------ 2 citus citus 4096 sep 12 16:38 ./
drwx------ 3 citus citus 4096 sep 12 16:04 ../
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_0.data
-rw------- 1 citus citus 1653 sep 12 16:38 repartition_2198637379588_2_10.data
-rw------- 1 citus citus 1517 sep 12 16:38 repartition_2198637379588_2_11.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_1.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_2.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_3.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_4.data
-rw------- 1 citus citus   21 sep 12 16:38 repartition_2198637379588_2_5.data
-rw------- 1 citus citus 1262 sep 12 16:38 repartition_2198637379588_2_6.data
-rw------- 1 citus citus 1500 sep 12 16:38 repartition_2198637379588_2_7.data
-rw------- 1 citus citus 1347 sep 12 16:38 repartition_2198637379588_2_8.data
-rw------- 1 citus citus 1177 sep 12 16:38 repartition_2198637379588_2_9.data

Таким образом, для одной таблицы, участвующей в процессе repartitioning, число промежуточных файлов для данного этапа будет равно Map Task Count (Table Shard Count) * Merge Task Count.

При этом обратите внимание, что для одной и той же таблицы можно увидеть такое содержимое каталогов для 2 узлов:

  • node #1

    $ ll | grep 594_
    -rw------- 1 citus citus 1381 sep 12 17:15 repartition_2198637379594_1_0.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_10.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_11.data
    -rw------- 1 citus citus 1432 sep 12 17:15 repartition_2198637379594_1_1.data
    -rw------- 1 citus citus 1449 sep 12 17:15 repartition_2198637379594_1_2.data
    -rw------- 1 citus citus 1364 sep 12 17:15 repartition_2198637379594_1_3.data
    -rw------- 1 citus citus 1619 sep 12 17:15 repartition_2198637379594_1_4.data
    -rw------- 1 citus citus 1551 sep 12 17:15 repartition_2198637379594_1_5.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_6.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_7.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_8.data
    -rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_9.data
  • node #2

    $ ll | grep 594_
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_0.data
    -rw------- 1 citus citus 1653 sep 12 17:16 repartition_2198637379594_2_10.data
    -rw------- 1 citus citus 1517 sep 12 17:16 repartition_2198637379594_2_11.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_1.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_2.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_3.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_4.data
    -rw------- 1 citus citus   21 sep 12 17:16 repartition_2198637379594_2_5.data
    -rw------- 1 citus citus 1262 sep 12 17:16 repartition_2198637379594_2_6.data
    -rw------- 1 citus citus 1500 sep 12 17:16 repartition_2198637379594_2_7.data
    -rw------- 1 citus citus 1347 sep 12 17:16 repartition_2198637379594_2_8.data
    -rw------- 1 citus citus 1177 sep 12 17:16 repartition_2198637379594_2_9.data

Отсюда видно, что множество данных, судя по именам файлов и их содержанию, не пересекается и каждый каталог содержит свою порцию данных, которые на следующем этапе уже перераспределяются для возможности сделать локальное соединение по ключам.

На втором этапе задействуется функция fetch_intermediate_results, в задачи которой входит вычитка тех самых промежуточных результатов с других узлов с сохранением этих результатов уже как промежуточных локальных. Перед этим в каталог с промежуточными данными копируются данные для обеспечения локальных соединений. Например, после такого вызова:

SELECT bytes
FROM fetch_intermediate_results(ARRAY['repartition_2198637379594_2_2']::text[],'localhost',8003) bytes;

содержание промежуточного каталога для таблицы выше будет таким:

$ ll | grep 594_
-rw------- 1 citus citus 1381 sep 12 17:15 repartition_2198637379594_1_0.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_10.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_11.data
-rw------- 1 citus citus 1432 sep 12 17:15 repartition_2198637379594_1_1.data
-rw------- 1 citus citus 1449 sep 12 17:15 repartition_2198637379594_1_2.data
-rw------- 1 citus citus 1364 sep 12 17:15 repartition_2198637379594_1_3.data
-rw------- 1 citus citus 1619 sep 12 17:15 repartition_2198637379594_1_4.data
-rw------- 1 citus citus 1551 sep 12 17:15 repartition_2198637379594_1_5.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_6.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_7.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_8.data
-rw------- 1 citus citus   21 sep 12 17:15 repartition_2198637379594_1_9.data
-rw------- 1 citus citus 1500 sep 12 17:34 repartition_2198637379594_2_2.data   <====== данные другого шарда

Для соединения с другими узлами используется libpq-протокол и команда COPY. На этом этапе, собственно, и происходит соединение по уже ставшим локальными данным.

Далее эти результаты используются уже знакомой нам функцией read_intermediate_result.

Схематично описанный процесс выглядит следующим образом.

Repartitioning
Repartitioning

Распределённые транзакции, блокировки и изоляция

Мы подошли к интересной и сложной теме видимости изменений разными транзакциями, блокировкам и самим распределённым транзакциям. Тема обширная, с массой нюансов. В рамках этой статьи я хотел бы показать основные отличия на примере пары запросов.

Для начала общая архитектура Citus в этой части:

Citus для соединений между узлами использует стандартный libpq-протокол и, соответственно, libpq-соединения. Соединения кешируются, переиспользуются; для достижения лучшей степени параллелизации при исполнении команд, при возможности, используются несколько соединений с одним узлом.

Важный нюанс состоит в том, что при такой организации доступа к данным, например, для транзакций, затрагивающих несколько таблиц, после выполнения операции записи в шард все транзакции, работающие с шардами этой группы, должны использовать то же соединение.

В противном случае другие команды в рамках этой же транзакции не смогли бы увидеть ещё не закоммиченные данные. Схожее требование возникает и при записи в справочную (reference) таблицу, после которой выполняются команды чтения из этой таблицы, в том числе и в операциях соединения с распределёнными таблицами (distributed). Это не всегда возможно в принципе. В документации приведен пример такого случая, если GUC citus.multi_shard_modify_mode не задан как sequential:

BEGIN; 
DELETE FROM dist_table; 
INSERT INTO reference_table VALUES (1,2);
SELECT * FROM dist_table JOIN reference_table ON (x = a); 
ERROR:  cannot perform query with placements that were modified over multiple connections

Операция DELETE (строка, отмеченная цифрой 2) в рамках общей транзакции с несколькими командами (строка 1) затрагивает несколько шардов. Если эта операция будет выполняться параллельно, то будут задействованы несколько соединений. Таким образом, с учетом требования по reference-таблицам становится невозможным использовать одно соединение для команды join (строка 3).

В целом, Citus вынужден аккуратно обращаться с соединениями по многим аспектам: с одной стороны — контролировать число параллельных соединений, чтобы не перегрузить ноды; с другой стороны — соблюдать правила видимости данных и не допускать взаимных блокировок. За последние два аспекта отвечает отдельный модуль Placement connection tracking. В его задачи входит отслеживание того, какие группы шардов затрагивались операциями DML/DDL/SELECT данной транзакцией, и принятие решения, какие соединения использовать для дальнейших операций.

Модуль руководствуется такими правилами:

  • допускается использование разных соединений (т.е. становится возможным параллелизация исполнения команд) для последовательных (с точки зрения порядка выражений в транзакции) SELECT-запросов;

  • допускается использование разных соединений для DML-запросов после SELECT-запросов;

  • в иных случаях разрешается использование только одного и того же соединения.

В Greenplum для этих целей реализован механизм читающих и пишущих процессов в рамках группы связанных процессов (gangs). При создании соединений между QE и QD последний задаёт параметр, который определяет, будет ли процесс читающим или пишущим. Допускается только один пишущий процесс на группу связанных процессов, он же и будет выполнять задачи записи в рамках распределённой транзакции. Таким образом, только этот процесс может менять состояние БД и, как правило, он же отвечает и за управление блокировками в рамках транзакции (LockHolder). Мне такое чёткое разделение представляется более прозрачным подходом.

Касательно протоколов взаимодействия между нодами. В Greenplum протокол libpq пропатчен, в частности, под распределённую обработку (в основном изменения касаются взаимодействия QD и QE). Интерфейс обработки команд, полученных по libpq, расширен, например, такими командами как:

  • 'M' — команда под выполнение сегментами (QE) спланированных на мастере (QD) запросов (CdbDispatchPlan);

  • 'T' — команда управления протоколом распределённых транзакций (DtxProtocolCommand).

Для интерконнекта используется несколько вариантов: tcp, ufpifc и ic proxy. Центральным элементом такого взаимодействия являются узлы Motion, которые и отвечают за передачу данных между сегментами с учётом их вариантов в виде Gather Motion, Redistribute Motion и Broadcast Motion.

Таким образом, Citus полагается полностью на общепринятый libpq, в то время как Greenplum использует свой вариант протокола libpq для отправки запросов на сегменты, а за перемещение данных отвечает уже один из вариантов интерконнекта. Своя реализация протокола означает сложности с вливанием изменений апстрим-версии, но в целом это касается общего недостатка Greenplum, как существенно "допиленного" под MPP PostgreSQL.

Если сравнивать узлы Motion с их ближайшими концептуальными аналогами в виде описанных выше реализаций fetch_intermediate_results, read_intermediate_result и worker_partition_query_result, то в Greenplum отсутствует перекладывание данных между файлами с узла на узел, использование команды COPY и т.п. Мотивация такого решения создателями Citus понятна — максимально воспользовались имеющимся в PostgreSQL инструментарием. Насколько эффективна такая реализация — вопрос открытый. Мне представляется, что передача узлами Motion данных напрямую "в сеть" является более эффективным подходом.

С точки зрения аналогов параллелизации исполнения запросов, ближайшим аналогом слайсов Greenplum — независимо выполняемых частей плана, "мостиком" между которыми как раз выступают разного рода узлы Motion — как мне представляется, являются задачи (Tasks) и задания (Jobs).

Управление транзакциями

Для пишущих транзакций, которые затрагивают шарды на разных нодах Citus использует PostgreSQL-реализацию двухфазного коммита (2PC) с некоторыми доработками под распределённую среду (связанными с HA и авто-восстановлением 2PC-транзакций после сбоя).

Сам подход и частности его реализации в Citus описаны в массе статей. В его основе выражения PREPARE TRANSACTION, COMMIT PREPARED и ROLLBACK PREPARED, которые PostgreSQL предлагает для внешних менеджеров транзакций. Я же предлагаю сравнить и подсветить преимущества реализации схожего, но, по факту, более продвинутого механизма в Greenplum.

Основной недостаток реализации распределённых транзакций Citus, по моему мнению — отсутствие гарантий соблюдения правил видимости для закоммиченных транзакций в рамках всего кластера.

Чтобы продемонстрировать возможные последствия подобной ситуации, я добавил задержку в блок кода, который отправляет команды COMMIT PREPARED на рабочие ноды согласно реализации протокола 2PC. Последовательность запросов такая:

BEGIN;
INSERT INTO a VALUES (1, 'foo', 1);
INSERT INTO a VALUES (2, 'foo', 2);
COMMIT;
SELECT * FROM a;
 c1 | c2  | c3
----+-----+----
  1 | foo |  1
  2 | foo |  2
(2 rows)

Далее отправляем запросы INSERT:

BEGIN;
INSERT INTO a VALUES (3, 'foo', 3);
INSERT INTO a VALUES (4, 'foo', 4); 
COMMIT; 

Но, так как в рамках COMMIT (строке, отмеченной цифрой 2) между этими двумя транзакциями у нас добавлена искусственная (но возможная в реальности) задержка, то в параллельной сессии становится возможным получение таких данных:

SELECT * FROM a;
 c1 | c2  | c3
----+-----+----
  1 | foo |  1
  3 | foo |  3
  2 | foo |  2
(3 rows)

Проблема в строке, отмеченной цифрой 1, где мы видим, что SELECT-запрос в параллельной сессии выдал строку (3, 'foo', 3), но не выдал (4, 'foo', 4).

Таким образом, отсюда следует очевидный вывод — Citus в рамках распределённой СУБД согласно модели консистентности распределённых систем попадает в классификацию Read Uncommitted.

Причина такого поведения данной СУБД в том, что в Citus отсутствует понятие единого снимка (имеется ввиду Snapshot для реализации MVCC в PostgresSQL) для распределённых транзакций.

Ранее подготовленные транзакции на отдельных нодах применяются в разное время. Таким образом, некоторая последующая транзакция сможет прочитать данные, которые применились на одной ноде, но ещё не успели примениться на другой.

По сути, это нарушает и принцип атомарности — пользователь увидит только часть данных закоммиченной транзакции. На мой взгляд, это может являться ощутимым недостатком Citus.

В Greenplum такая ситуация не допускается. Ядро системы отслеживает информацию об активных распределённых транзакциях. Для этого в общеиспользуемой памяти процессов каждого из бэкендов хранится идентификатор распределённой транзакции (gxid) по аналогии с обычными идентификаторами локальных транзакций PostgreSQL (xid). Этот идентификатор присваивается, если транзакции требуется 2PC (т.е. транзакция является распределённой). За генерацию и присваивание значения gxid отвечает координатор запроса (QD).

Как уже говорилось ранее, в Greenplum расширен набор команд, которые обрабатываются бэкендами. В частности, командами 'M' и 'T'. Обработчики этих команд в том числе принимают сериализованную информацию для распределённой обработки транзакций от QD. Эти данные включают в себя и глобальный (распределённый) снимок.

Greenplum расширяет PostgreSQL-структуру для хранения снимков с целью хранения информации по глобальному снимку. Далее этот снимок используется для проверки видимости строк. Как говорилось ранее, только один процесс в gang может быть пишущим (writer) — таким образом, читающие (reader) процессы полагаются на то, что снимок получил пишущий процесс. Это в том числе решает проблемы видимости строк, затронутых пишущей частью транзакции (т.е. то, что Citus приходится решать манипуляциями с тем, какие libpq-соединения можно использовать).

Упрощённая модель проверки видимости строки с использованием глобальных снимков следующая.

Упрощённая модель проверки видимости строки с использованием глобальных снимков
Упрощённая модель проверки видимости строки с использованием глобальных снимков

Если идентификатор xmin данной строки соответствует транзакции, которая является применённой (правила видимости для ещё не применённых и frozen-транзакций оставим в данном случае за скобками), то далее Greenplum проверяет видимость с учётом снимка видимости для распределённой транзакции.

На примере запроса, который запускался ранее, Greenplum производит проверку, является ли данная транзакция распределённой и нужно ли проверять по глобальному снимку видимости. В нашем случае распределённая транзакция ещё не имела статуса завершённой, поэтому данная строка не является видимой для запроса в другой сессии.

Вычисление функций (functions evaluation)

Citus

Citus следует таким правилам исполнения и получения результирующего значения функций:

  • Запрет использования функций stable и volatile в RETURNING для INSERT-запросов.

  • Вычисление значения, в том числе и для функций volatile, на координаторе запроса для INSERT-запросов.

  • Запрет использования функций volatile в UPDATE и DELETE-запросах.

  • Для SELECT-запросов Citus не вычисляет значения функций на координаторе.

Последний пункт может приводить к потенциальным проблемам. Например, для следующего запроса:

SELECT *
FROM d INNER JOIN r
  ON d.c1 = r.c1
WHERE r.c1 = ROUND(RANDOM());

Citus строит такой план:

                                  QUERY PLAN
------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)
   Task Count: 32
   Tasks Shown: One of 32 
   ->  Task
         Node: host=10.92.40.201 port=5432 dbname=postgres
         ->  Hash Join 
               Hash Cond: (d.c1 = r.c1)
               ->  Seq Scan on d_103166 d
               ->  Hash
                     ->  Seq Scan on r_103198 r 
                           Filter: ((c1)::double precision = round(random())) 

Согласно правилам вызова функций, он вычисляет условие в WHERE (строка, отмеченная цифрой 4) для каждого шарда (строка с номером 1). Таким образом, теоретически возможна следующая ситуация:

Shard 1 Shard 2

Distributed table value (d)

{ 0 }

{ 1 }

Reference table value (r)

{ 0, 1 }

{ 0, 1 }

Допустим, в распределённой таблице d хранится 2 строки. На первом шарде Shard 1 значение атрибута равно 0, на втором шарде Shard 2 значение равно 1. В справочной таблице хранятся такие же 2 строки.

Согласно плану выполнения выборка из таблицы r производится на каждом шарде согласно совпадению вычисленного значения round(random()), т.е. это либо 0, либо 1. С некоторой вероятностью возможно получение таких кортежей на выходе узла Seq Scan (строка 3): на первом Shard 1 получена только строка с ключом 1, на втором с ключом 0.

Однако планировщик сделал pushdown соединения Hash Join на каждый шард (строка 2). Очевидно, что в данной комбинации ключей пользователь с некоторой долей вероятности получит результат, который бы он никогда не получил, например, для обычного PostgreSQL!

Так как "суперпозицией" проекций r для обоих шардов является множество { 1, 0 }, которое бы сматчилось по ключам со строками 0 и 1 распределённой таблицы d, в нашем случае дало пустое множество после объединения результатов с шардов:

Shard 1 Shard 2

Distributed table value (d)

{ 0 }

{ 1 }

Reference table selected keys with random() selection

{ 1 }

{ 0 }

local d and r join result

{ ∅ }

{ ∅ }

after custom scan

{ ∅ }

Greenplum

В свою очередь Greenplum планирует запрос таким образом, чтобы соответствовать критериям "идентичности результатов" нераспределённому хранилищу. Важный момент, что выборку с волатильной функцией random() в рамках запроса к реплицируемой (аналог таблицы Citus reference) он строит единожды и распределяет по сегментам через Redistribute Motion (строка, отмеченная цифрой 2).

                                     QUERY PLAN
------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice2; segments: 3)
   ->  Hash Join 
         Hash Cond: (d.c1 = r.c1)
         ->  Seq Scan on d
         ->  Hash
               ->  Redistribute Motion 1:3  (slice1; segments: 1) 
                     Hash Key: r.c1
                     ->  Result
                           ->  Seq Scan on r 
                                 Filter: ((c1)::double precision = round(random()))
 Optimizer: Postgres query optimizer

Таким образом, Greenplum может обеспечить корректность соединения Hash Join (строка 1), тем самым соответствуя возможным вариантам выборки нераспределённой СУБД:

Shard 1 Shard 2

Distributed table value (d)

{ 0 }

{ 1 }

Replicated table value (r)

{ 0, 1 }

{ 0, 1 }

Так как результаты выборки Seq Scan (строка 3) будут идентичны для всех сегментов, то каждый сегмент сможет сматчить каждый по своему ключу и дать корректный результат:

Shard 1 Shard 2

Distributed table value (d)

{ 0 }

{ 1 }

Replicated table selected keys with random() selection

{ 0, 1 }

local d and r join result

{ 0 }

{ 1 }

after gather motion

{ 0, 1 }

По итогу, в Citus в такой комбинации используемых таблиц есть некоторая вероятность получить некорректные с точки зрения соответствия нераспределённой СУБД результаты выборки.

Заключение первой части

В этой части мы затронули основные понятия Citus с пользовательской и архитектурной точек зрения. Рассмотрели планировщики и некоторые нюансы их работы и сравнили со схожими частями с Greenplum.

В следующей части нас ждёт рассмотрение процесса ребаланса шардов — довольно удобного с практической точки зрения решения проблемы горизонтального масштабирования СУБД. Также мы сравним подходы к контролю за распределением ресурсов.

И на самое вкусное — рассмотрим и проанализируем результаты проведения сравнительных TPC-DS тестов Greenplum vs Citus.

Будьте на связи!