\dt
Этой статьей мы открываем цикл материалов, посвященных сравнительному обзору технологий распределенных СУБД.
Для такого обзора нашей командой был выбран ряд распределенных СУБД, которые будут сравниваться с точки зрения как реализации схожих функциональных блоков с различным уровнем погружения в детали, так и высокоуровневого сопоставления по разным критериям.
Выбор этих СУБД продиктован нашими текущими интересами и потребностями в сравнении. Хотя некоторые из технологий можно отнести, скорее, к распределенным SQL-движкам (например, Apache Impala и Apache Trino).
Так как наша команда разработки занимается Greenplum, то прицел будет на сравнение с данной СУБД. Особенностью этих статей будет сравнительно глубокое погружение в детали тех аспектов, которые будут сравниваться.
Дать представление пользователям Greenplum о схожих технологиях. Статья ориентирована на опытных пользователей, так как в пояснениях будут даваться детали по сравниваемым технологиям; подразумевается, что читатель знаком с деталями реализации схожей функциональности в Greenplum.
Дать представление, что лежит в основе сравниваемых технологий, какие есть особенности, преимущества и, возможно, недостатки.
Будут подсвечены те аспекты, которые заинтересовали меня как автора, обладающего знаниями о тех или иных частях Greenplum и исследующего устройство схожих частей в рассматриваемых технологиях. Я не претендую на звание эксперта в этих технологиях, рассматривайте мои тезисы как результат исследований.
Где это возможно, будут даваться сравнительные пояснения, как схожие вещи реализованы в рассматриваемой технологии.
Перепечатывать readme из GitHub и прочих источников не имело смысла, но какие-то базовые понятия ввести требовалось. Эти понятия изложены своими словами в соответствии с моим пониманием предмета. За деталями добро пожаловать в специализированные ресурсы.
Цикл статей начнём со сравнения с таким решением, как Citus.
Концептуально Citus и Greenplum довольно близки. Да, подход реализации Citus, как расширения PostgreSQL, против "пропатченного" PostgreSQL в Greenplum приносит Citus свои сильные черты, но в то же время и ограничивает его возможности. Об этом мы поговорим далее.
Судя по всему, изначально создатели Citus ориентировались на реализацию задачи горизонтального масштабирования существующих решений у конечных пользователей PostgreSQL: добавь шарды, создай распределенные по кластеру/шардам таблицы, перераспредели данные между ними и получи выгоду без переноса имеющегося решения на какой-либо другой технологический стек. Эти идеи можно даже проследить по видам планировщиков Citus: от сравнительно простых, ориентированных на перенаправление запросов в определенные шарды, до более продвинутых, заточенных под аналитику. Вот как раз более продвинутые планировщики Citus, нацеленные и на аналитическую нагрузку, представляют интерес для сравнения их в общей с Greenplum нише.
Greenplum и Citus довольно просто сравнивать даже исходя из близости понятий: распределенных и справочных (reference
в Citus, replicated
в Greenplum) таблиц, чтения планов выполнения и, в целом, нюансов работы планировщика (planner
/optimizer
) и непосредственного выполнения запросов (executor
).
Для пользователя из мира PostgreSQL данная технология представляет собой расширение (PostgreSQL extension), устанавливаемое на отдельные экземпляры сервера PostgreSQL. Расширение предоставляет возможности преобразовать отдельные экземпляры PostgreSQL в распределенный кластер, где будут размещаться части данных таблиц (шарды), которые пользователь может распределить по некоторому признаку (ключу шардирования).
Расширение также добавляет колоночный тип хранения в виде отдельного расширения. Работа по управлению всем этим хозяйством осуществляется с помощью доступных пользователям UDF-функций.
Если сравнивать Citus с Greenplum в плане пользовательского восприятия, то некоторые аналогии прослеживаются, но подходы, по сути, совершенно разные.
Citus предоставляет распределенный движок поверх изначально самостоятельных экземпляров PostgreSQL. Как и кто администрирует эти экземпляры, по большому счету вне скоупа интересов и ответственности самого Citus.
Greenplum, аналогично используя отдельные инстансы на сегментах и мастере, всё-таки считает сегментные экземпляры до известной степени закрытыми, но и несёт за них большую ответственность (хотя бы с точки зрения встроенного HA).
В Citus можно подключиться к любой ноде и начать работать с распределёнными таблицами (за счет синхронизации метаданных в рамках 2PC
). Исключение составляют операции, которые могут выполняться только с координатора. Это различные манипуляции с таблицами самого Citus, например, создание distributed
-таблиц, некоторые операции ALTER
, операции со схемами и т.п.
В Greenplum в плане соединений мы ограничены мастером. Возможность подключения, например, в рамках специальной сессии с отдельным сегментом в данном случае не в счёт.
С точки зрения пользователя я бы назвал это сильной чертой Citus. Впрочем, у подобного подхода и его конкретной реализации в Citus есть и обратная сторона, но об этом подробнее дальше.
Несомненное преимущество с точки зрения использования — это доступность версий PostgreSQL для Citus. Ввиду его реализации как PostgreSQL-расширения, ему доступны самые свежие версии ядра. На текущий момент для Greenplum это только PostgreSQL 12-ой версии в 7X.
К особенностям использования Citus я бы отнёс и необходимость знать о наличии скрытых от пользователей служебных таблиц шардов, которые размещаются в тех же схемах, что и распределённые таблицы. По факту системный каталог pg_class
содержит заведомо больше строк, чем отображается пользователям.
У наших крупных корпоративных пользователей число таблиц может исчисляться и сотнями тысяч (в основном это партицированные таблицы). Разбиение таблиц на шарды ещё больше увеличит число файлов, как и итоговый объём метаданных. Кроме того, в общем случае расширению приходится скрывать эти таблицы, в том числе в pg_class
. Причём делать это довольно низкоуровнево при разборе дерева запроса, отбрасывая такие таблицы. Разработчики утверждают, что "[…] so you might expect to see them when connecting to a worker node and running \d
. While this was previously the case, it caused confusion among users and also breaks tools like pg_dump
".
Например, для исходной партицированной таблицы с двумя партициями при создании распределённой таблицы с числом шардов по умолчанию будет создано 66 таблиц шардов (распределённых по нодам кластера):
\dt
List of relations Schema | Name | Type | Owner --------+-------------------+-------+---------- andrey | part_table_part01 | table | postgres andrey | part_table_part02 | table | postgres (2 rows)
SELECT create_distributed_table('part_table', 'id');
create_distributed_table -------------------------- (1 row)
SELECT logicalrelid, COUNT(*) AS shard_table_count FROM pg_dist_shard GROUP BY logicalrelid;
logicalrelid | shard_table_count -------------------+------------- part_table | 32 part_table_part01 | 32 part_table_part02 | 32 (3 rows)
Citus работает с несколькими видами таблиц:
Таблицы, управляемые Citus. Как только Citus понимает, что это управляемая им распределённая таблица (а понимает он это по наличию записи во внутренней таблице метаданных pg_dist_partition
, см. ниже пример такой записи), планирование запросов к этим таблицам осуществляется самостоятельно.
Исходные, так называемые shell
-таблицы. Пользователь может преобразовать PostgreSQL-таблицу в Citus-таблицу путём вызова функций расширения Citus (например, create_distributed_table
).
Пример записи в pg_dist_partition
:
SELECT logicalrelid, partmethod, partkey, colocationid FROM pg_dist_partition LIMIT 1;
logicalrelid | partmethod | partkey | colocationid --------------+------------+--------------------------------------------------------------------------------------------------------------------------+-------------- dist_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} | 1 (1 row)
В данном случае имя логической таблицы (бывшей "обычной" локальной таблицы PostgreSQL) содержится в поле logicalrelid
. Для данной таблицы распределение осуществляется по хешу (колонка partmethod
), в колонке partkey
содержится служебная информация по ключу распределения.
В Greenplum такого разделения не существует. Для таблиц append optimized
существуют так называемые aux
-таблицы, но они никак не скрываются и видны пользователям и/или администратору базы данных. Как следствие, нет необходимости и в явном преобразовании и неявном копировании исходных данных вызовом каких-либо специальных функций.
Таблицы, управляемые Citus, имеют следующие типы:
Шелл-таблица (shell table
). Если пользователь преобразует обычную PostgreSQL-таблицу в Citus-таблицу, то эта исходная таблица становится скрытой от пользователя shell
-таблицей. Такая таблица вместе с метаданными Citus будет использоваться при планировании, чтобы преобразовать запросы к локальным таблицам в распределённый план, работающий в конечном итоге с таблицами шардов Citus.
Таблица шарда (shard table
). По сути, обычная таблица PostgreSQL, которая содержит данные из shell
-таблицы. Данные из shell
-таблицы копируются на этапе создания Citus-таблицы. По умолчанию shell
-таблица разбивается на 32 таблицы шардов, но число шардов можно поменять. Как уже было упомянуто ранее, таблицы шардов не видны пользователям (в том числе и в запросах к каталогу), но заявляется наличие параметра GUC, который управляет этой видимостью: citus.override_table_visibility
.
Распределённая по некоторому ключу шардирования таблица (distributed table
). Выбор ключа шардирования — основной (и, как мне представляется, ограничивающий) элемент эффективной работы с Citus. Ключ шардирования определяет размещение записи в определённом шарде. С шардом связан некоторый диапазон, который определяет какие ключи попадут на этот шард (эта информация содержится в служебной таблице pg_dist_shard
, см. пример далее). Есть несколько схем шардирования, но для краткости рассмотрим распределение по хешу (hash-distributed tables
). Таблицы с одинаковым диапазоном будут размещаться на одних и тех же нодах для обеспечения так называемого co-located
размещения. Шарды с одинаковым диапазоном хешей размещаются на одних и тех же узлах, чтобы обеспечить локальность операций соединений таблиц по ключу/хешу. Это условие соблюдается и в случае перераспределения данных (rebalance
).
Такое размещение шардов позволит Citus реализовать наиболее эффективную схему соединений без перераспределения данных таблиц и предоставить пользователю наиболее полный набор возможных операций с данными в рамках SQL-запросов. Что происходит в случае, когда пользователь не думает/не хочет/не может поддержать такую схему размещения на практике, мы рассмотрим в отдельном разделе, посвященном деталям планирования и выполнения запросов. Да, ключ распределения может состоять только из одной колонки not null
, так как функция create_distributed_table
принимает аргумент distribution_column
, подразумевающий одну колонку. Также, для некоторым образом связанных таблиц (логически) можно указать одну группу размещения.
Справочная таблица (reference table
). Таблица с одним шардом, который реплицируется на все узлы кластера. Как следует из названия, такие таблицы подходят для измерений в терминах хранилищ данных. Прямой аналог replicated
-таблиц в Greenplum.
Локальная таблица (local table
). Не путать с обычной таблицей PostgreSQL. Для поддержки возможности запроса пользователя с любого узла кластера, который использует PostgreSQL-таблицу на узле координатора, Citus требует наличия метаданных этой таблицы на остальных узлах. Также используется в сценариях ограничений по FK
между локальными таблицами и reference
-таблицами.
Таблица одного шарда (single shard tables
). Применяется для шардирования schema-based
, подробно рассматриваться в рамках этой статьи не будет.
В Citus реализован так называемый многоуровневый подход к планированию запросов. Citus встраивается в процесс планирования за счёт использования доступных расширениям функций обратного вызова planner_hook
, set_rel_pathlist_hook
, get_relation_info_hook
, set_join_pathlist_hook
.
Входная точка планирования с точки зрения Citus — реализация distributed_planner
. Для того, чтобы планировщик Citus включился в процесс, в запросе должна использоваться хотя бы одна таблица Citus. Является ли таблица соответствующей этому признаку определяется по внутренней системной таблице pg_dist_partition
.
Планировщик разделяется на несколько уровней, которые задействуются в зависимости от структуры (можно даже сказать, от условной "сложности") запроса:
fast path planner
router planner
recursive planner
logical planner/optimizer
Каждый следующий уровень реализует более сложные либо высокоуровневые (группировки, сортировки и т.п.) подходы к планированию запросов.
Fast path planner
Fast path planner
ограничен возможностями планирования запросов, по сути, затрагивающих один шард (либо использующих reference
-таблицы).
Также запрос:
не должен быть INSERT … SELECT
;
не должен использовать CTE, подзапросы, операции типа UNION
/UNION ALL
;
предикат выбора шарда должен фигурировать только в условии WHERE
и быть единственным условием в сравнении вида dist_key = const
, где dist_key
— ключ распределения;
для свежих версий PostgreSQL также в эти ограничения попадают MERGE
-запросы.
Этот тип планировщика ориентирован на OLTP-нагрузку, когда запрос можно перенаправить в один шард. В этом варианте планирования не происходит вызова standard_planner
. Судя по документации, тем самым разработчики хотели избежать лишних накладных расходов, связанных с планированием стандартным планировщиком PostgreSQL. Такой вариант хорошо подходит для схем с использованием шардирования по некоторому ключу для так называемых Multi-tenant приложений. Например, когда все операции с таблицами ограничены каким-то определенным шардом, который соответствует id пользователя.
Router planner
Router planner
дополняет fast path planner
возможностью запрашивать несколько таблиц, но всё ещё находясь в границах одного шарда согласно условиям фильтрации по dist_key
. Как было описано, Citus располагает реальные данные таблиц в отдельных таблицах шардов, которые в общем случае не видны пользователям. На этом этапе планировщик подменяет исходную, видимую пользователю распределённую таблицу на таблицы шардов с учётом их распределения по нодам. Данный тип планировщика уже умеет планировать подзапросы и CTE.
Например, для случая, когда таблицы a
и b
шардированы по одной колонке c1
, следующий запрос может быть спланирован так:
EXPLAIN (COSTS OFF, VERBOSE ON) SELECT a.c1,
(SELECT COUNT(*) FROM b
WHERE b.c1 = 1 AND b.c1 = a.c1)
FROM a WHERE a.c1 = 1;
QUERY PLAN
----------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.c1, remote_scan.count
Task Count: 1
Tasks Shown: All
-> Task
Query: SELECT c1, (SELECT count(*) AS count FROM public.b_102332 b WHERE ((b.c1 OPERATOR(pg_catalog.=) 1) AND (b.c1 OPERATOR(pg_catalog.=) a.c1))) AS count FROM public.a_102300 a WHERE (c1 OPERATOR(pg_catalog.=) 1)
Node: host=citus_host port=5432 dbname=citus_db
-> Seq Scan on public.a_102300 a
Output: a.c1, (SubPlan 1)
Filter: (a.c1 = 1)
SubPlan 1
-> Aggregate
Output: count(*)
-> Result
One-Time Filter: (a.c1 = 1)
-> Seq Scan on public.b_102332 b
Output: b.c1, b.c2, b.c3
Filter: (b.c1 = 1)
На что можно обратить внимание:
В строке, отмеченной цифрой 3, можно увидеть Seq Scan
по реальной физической таблице public.b_102332
(данные шарда). Как мы говорили ранее, эта таблица в общем случае не видна пользователям, но в плане она фигурирует как один из 32 шардов таблицы b
.
Так как данные таблицы b
шардированы по колонке c1
и в фильтре указано условие b.c1 = 1
, то планировщик может задействовать router planner
(fast path planner
задействовать нельзя, так как в запросе присутствует подзапрос) и перенаправить запрос только в один шард.
В строке с номером 2 можно увидеть, что эта часть запроса планируется в виде подплана (SubPlan 1
) и с точки зрения реального выполнения запроса может быть выполнена одним заданием (строка с номером 1 Task Count: 1
). О заданиях (Task
) подробнее поговорим в разделе про следующий тип планировщика recursive planning
.
Если проводить сравнение схожего запроса в Greenplum, то можно выделить следующие сходства и различия:
QUERY PLAN
----------------------------------------------------------------
Gather Motion 32:1 (slice1; segments: 32)
Output: a.c1, (COALESCE((count()), '0'::bigint))
-> Result
Output: a.c1, COALESCE((count()), '0'::bigint)
-> Result
Output: (count()), a.c1
-> Hash Left Join
Output: a.c1, (count())
Hash Cond: (a.c1 = b.c1)
-> Seq Scan on tpcds.a
Output: a.c1
Filter: (a.c1 = 1)
-> Hash
Output: (count()), b.c1
-> GroupAggregate
Output: count(), b.c1
Group Key: b.c1
-> Sort
Output: b.c1
Sort Key: b.c1
-> Seq Scan on tpcds.b
Output: b.c1
Filter: (b.c1 = 1)
Optimizer: Pivotal Optimizer (GPORCA)
Greenplum не выделяет этот сценарий с точки зрения перенаправления в один сегмент — в плане присутствует Gather Motion 32:1
(строка, отмеченная цифрой 1), который собирает данные с сегментов от нижележащих узлов. За счёт применения фильтра (строка с номером 6) объём данных в данном случае может быть сокращен, но формально план не будет отличаться для любого количества строк. Таблицы в Greenplum для этого запроса распределены по a.c1
и b.c1
, и это позволяет планировщику, собрав данные от узла Seq Scan
(строка 5), сразу же вычислить и агрегирующую функцию count(), b.c1
(строка 4), далее собрать по этим данным хеш-таблицу (строка 3) и осуществить локальное (в рамках сегмента) соединение Hash Left Join
(строка 2).
По факту, план Citus выглядит более оптимальным для данного сценария, так как в его случае запрос сразу же перенаправляется в нужный шард (а это по умолчанию 1/32 от общего объёма данных всей таблицы), никакого ожидания остальных нод не происходит. Разумеется, это отражается на времени выполнения:
// Citus $ time psql -d citus_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 = 1 AND b.c1 = a.c1) FROM a WHERE a.c1 = 1;' real 0m0.577s user 0m0.003s sys 0m0.005s // Greenplum $ time psql -d greenplum_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 = 1 AND b.c1 = a.c1) FROM a WHERE a.c1 = 1;' real 0m1.378s user 0m0.001s sys 0m0.004s
Однако картина сильно меняется в случае такого запроса (в данном случае уже используется более продвинутый планировщик recursive planner
, который мы рассмотрим в следующем разделе):
SELECT a.c1,
(SELECT COUNT(*)
FROM b
WHERE b.c1 > 100 AND b.c1 = a.c1)
FROM a WHERE a.c1 > 19990000;
QUERY PLAN
----------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=12) (actual time=18601.915..18603.189 rows=10000 loops=1)
Task Count: 32
Tuple data received from nodes: 117 kB
Tasks Shown: One of 32
-> Task
Tuple data received from node: 3852 bytes
Node: host=citus_host port=5432 dbname=citus_db
-> Seq Scan on a_102300 a (cost=0.00..3948635.00 rows=308 width=12) (actual time=187.565..18594.736 rows=321 loops=1)
Filter: (c1 > 19990000)
Rows Removed by Filter: 626133
SubPlan 1
-> Aggregate (cost=12783.81..12783.82 rows=1 width=8) (actual time=57.572..57.572 rows=1 loops=321)
-> Seq Scan on b_102332 b (cost=0.00..12783.81 rows=1 width=0) (actual time=57.549..57.567 rows=1 loops=321)
Filter: ((c1 > 100) AND (c1 = a.c1))
Rows Removed by Filter: 626453
Planning Time: 0.261 ms
Execution Time: 18595.255 ms
Planning Time: 3.022 ms
Execution Time: 18604.370 ms
Greenplum строит более продвинутый план, стоит только обратить внимание на строки, отмеченные цифрами 1 и 3:
QUERY PLAN
------------------------------------------------------------------------------
Gather Motion 32:1 (slice1; segments: 32) (actual time=116.323..406.430 rows=1010000 loops=1)
Output: a.c1, (COALESCE((count()), '0'::bigint))
-> Result (actual time=116.722..141.599 rows=31929 loops=1)
Output: a.c1, COALESCE((count()), '0'::bigint)
-> Result (actual time=116.722..137.731 rows=31929 loops=1)
Output: (count()), a.c1
-> Hash Left Join (actual time=116.721..134.325 rows=31929 loops=1)
Output: a.c1, (count())
Hash Cond: (a.c1 = b.c1)
Executor Memory: 39454kB Segments: 32 Max: 1248kB (segment 13)
work_mem: 39454kB Segments: 32 Max: 1248kB (segment 13) Workfile: (0 spilling)
Extra Text: (seg13) Hash chain length 1.1 avg, 4 max, using 30053 of 262144 buckets.Hash chain length 4.0 avg, 13 max, using 8027 of 8192 buckets; total 8 expansions.
-> Seq Scan on tpcds.a (actual time=40.173..44.500 rows=31929 loops=1)
Output: a.c1
Filter: (a.c1 > 19990000)
-> Hash (actual time=76.270..76.270 rows=31929 loops=1)
Output: (count()), b.c1
-> HashAggregate (actual time=65.918..69.498 rows=31929 loops=1)
Output: count(), b.c1
Group Key: b.c1
Executor Memory: 55335kB Segments: 32 Max: 1738kB (segment 0)
Extra Text: (seg13) Hash chain length 4.0 avg, 13 max, using 8027 of 8192 buckets; total 8 expansions.
-> Seq Scan on tpcds.b (actual time=54.383..59.257 rows=31929 loops=1)
Output: b.c1
Filter: ((b.c1 > 100) AND (b.c1 > 19990000))
Planning time: 6.827 ms
(slice0) Executor memory: 151K bytes.
(slice1) Executor memory: 7933K bytes avg x 32 workers, 7941K bytes max (seg0). Work_mem: 1248K bytes max.
Memory used: 122880kB
Optimizer: Pivotal Optimizer (GPORCA)
Execution time: 464.998 ms
Первое значимое отличие, что Greenplum рассчитал агрегат COUNT
и сгруппировал результаты по колонке b.c1
(строки с номерами 2 и 3). Второе отличие, что GPORCA
пробросила условие фильтрации (b.c1 > 19990000
) вглубь плана (строка 4), что значимо сократило выборку данных из таблицы b
. При этом для соединения был использован Hash Left Join
(строка 1) — фактически проходя по таблице а
с условием (a.c1 > 19990000
) из хеш-таблицы (строка 3), получаем искомый результат.
Что делает Citus: фактически узел последовательного чтения Seq Scan
по таблице a
для каждой строки вызывает подплан SubPlan 1
(строка c номером 1), при этом условие фильтра (c1 > 100
) чтения по таблице b
(строка 3) приводит к полному чтению таблицы b
(отброс первых 100 строк не в счёт). И так происходит 321 раз исходя из значения счетчика loops=321
(строка 2) для каждого из 32 шардов.
Нетрудно себе представить, что это приведёт к разительной разнице по времени выполнения. Даже для малого количества строк в таблицах a
и b
(по 21 млн. строк в каждой) разница во времени составила:
// Citus $ time psql -d citus_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 >100 AND b.c1 = a.c1) FROM a WHERE a.c1 > 19990000;' >> /dev/null real 0m16.556s user 0m0.013s sys 0m0.004s // Greenplum $ time psql -d greenplum_db -c 'SELECT a.c1, (SELECT COUNT(*) FROM b WHERE b.c1 >100 AND b.c1 = a.c1) FROM a WHERE a.c1 > 19990000;' >> /dev/null real 0m0.319s user 0m0.002s sys 0m0.004s
Если таблицы не шардированы по одной и той же колонке (в моём случае я пересоздал таблицы и шардировал одну по a.c1
, вторую — по b.c2
), то попытка выполнения такого запроса завершится неудачей:
SELECT a.c1,
(SELECT COUNT(*) FROM b
WHERE b.c1 = 1 AND b.c1 = a.c1)
FROM a
WHERE a.c1 = 1;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
Greenplum в свою очередь не испытывает тут каких-либо трудностей, как мы говорили ранее, план будет в целом похож на исходный (для случая распределения по одной колонке), но добавляется узел Redistribute Motion
(строка, отмеченная цифрой 1):
QUERY PLAN
------------------------------------------------------------------------------------------
Gather Motion 32:1 (slice2; segments: 32)
Output: a.c1, (COALESCE((count()), '0'::bigint))
-> Result
Output: a.c1, COALESCE((count()), '0'::bigint)
-> Result
Output: (count()), a.c1
-> Hash Left Join
Output: a.c1, (count())
Hash Cond: (a.c1 = b.c1)
-> Seq Scan on public.a
Output: a.c1
Filter: (a.c1 = 1)
-> Hash
Output: (count()), b.c1
-> GroupAggregate
Output: count(), b.c1
Group Key: b.c1
-> Sort
Output: b.c1
Sort Key: b.c1
-> Redistribute Motion 32:32 (slice1; segments: 32)
Output: b.c1
Hash Key: b.c1
-> Seq Scan on public.b
Output: b.c1
Filter: (b.c1 = 1)
Optimizer: Pivotal Optimizer (GPORCA)
Подводя итоги по планировщикам fast path planner
и router planner
:
Основным ограничением этих планировщиков является возможность либо планирования запросов только в рамках определенного шарда, либо — если возможно выполнение запроса по отдельности на каждом шарде — осуществления слияния (merge
) результатов "наверху" плана. По сути, в этих запросах возможны подзапросы, которые могут быть выполнены независимо на шардах.
Сложные соединения не поддерживаются (например, outer join
), подзапросы с соединениями также не поддерживаются.
Эти ограничения снимаются в следующем, более продвинутом типе планировщика recursive planner
.
Recursive planner
Recursive planner
реализует:
Возможность делать соединения таблиц, которые определены как colocated
согласно их ключам распределения по шардам.
Возможность спланировать подзапросы/CTE отдельно (изолированно) таким образом, чтобы манипуляции с результатами выполнения этих подзапросов можно было бы свести к локальным операциям в рамках шарда.
Данный тип планировщика пытается рассмотреть каждую возможную часть запроса на предмет её изолированного выполнения в pushdown-варианте. Суть этого действия в следующем: условно, на вход планировщика попадает дерево запроса, которое анализируется вглубь. В ходе планирования все подзапросы и CTE проверяются на их возможность исполнения непосредственно на рабочих нодах (worker
). Если для данного подзапроса/CTE это невозможно, то для него осуществляется вызов обычного планировщика PostgreSQL (planner
). Результат добавляется в список планов подзапросов. Важный момент состоит в том, что эти подпланы запускаются до начала обработки распределённого плана выполнения (distributed plan
). Результаты отработки этих подпланов записываются во временные файлы на диске. Мы рассмотрим этот процесс в подробностях чуть позднее.
Таким подходом разработчики решают основную задачу — возможность поддержки выполнения большинства видов запросов SQL (да, исключения есть: среди них отсутствие поддержки GROUPING SETS
, CUBE
, ROLLUP
).
Получив и, по сути, материализовав промежуточные результаты подзапроса на каждой ноде/шарде, Citus может соединять эти данные с распределёнными таблицами по любому ключу.
Стоит отметить, что в отличие от Greenplum рекурсивные и модифицирующие CTE, если запрос не ограничен одним шардом, не поддерживаются:
WITH RECURSIVE h AS
(
SELECT a.c1, a.c3, 0 AS level FROM a
WHERE a.c2 IS NULL
UNION ALL
SELECT a.c1, a.c3, level + 1
FROM a, h
WHERE a.c1 = h.c1
)
SELECT * FROM h;
ERROR: recursive CTEs are only supported when they contain a filter on the distribution column
Предлагаю рассмотреть пример работы данного планировщика на паре запросов:
SELECT COUNT(*)
FROM
(SELECT *
FROM a
ORDER BY a.c3
LIMIT 10000000) AS foo;
QUERY PLAN
------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual time=3399.533..3399.535 rows=1 loops=1)
-> Distributed Subplan 3_1
Subplan Duration: 38294.62 ms
Intermediate Data Size: 238 MB
Result destination: Write locally
-> Limit (actual time=22744.153..24169.593 rows=10000000 loops=1)
-> Sort (actual time=22744.151..23669.158 rows=10000000 loops=1)
Sort Key: remote_scan.c3
Sort Method: external merge Disk: 430592kB
-> Custom Scan (Citus Adaptive) (actual time=15420.113..17028.325 rows=20000000 loops=1)
Task Count: 32
Tuple data received from nodes: 210 MB
Tasks Shown: One of 32
-> Task
Tuple data received from node: 6713 kB
Node: host=citus_host port=5432 dbname=citus_db
-> Limit (actual time=172.560..383.698 rows=624917 loops=1)
-> Sort (actual time=172.558..313.678 rows=624917 loops=1)
Sort Key: c3
Sort Method: external merge Disk: 13504kB
-> Seq Scan on a_102317 a (actual time=0.016..68.522 rows=624917 loops=1)
Planning Time: 0.045 ms
Execution Time: 500.327 ms
Planning Time: 0.000 ms
Execution Time: 24575.273 ms
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 8 bytes
Node: host=citus_host port=5432 dbname=citus_db
-> Aggregate (actual time=3357.556..3357.556 rows=1 loops=1)
-> Function Scan on read_intermediate_result intermediate_result (actual time=2233.802..2988.018 rows=10000000 loops=1)
Planning Time: 0.051 ms
Execution Time: 3398.382 ms
Planning Time: 1.654 ms
Execution Time: 41694.215 ms
Recursive planner
принимает решение спланировать подзапрос (строка, отмеченная цифрой 1) через подплан (строка с номером 2), собрать результат выполнения подплана на одной ноде и подсчитать итоговую агрегирующую функцию на основе промежуточных данных (строка 7).
Предлагаю разобрать, почему планировщик принимает такие решения и как осуществляется промежуточное хранение результатов.
Согласно правилам pushdown для подзапросов наличие LIMIT
требует оформления подзапроса в виде отдельного подплана. Если рассматривать план снизу вверх, то строки от Seq Scan
(строка 6) подаются на вход узлу сортировки Sort
(строка 5), далее накладывается условие ограничение строк LIMIT
. Эти действия выполняются параллельно на всех шардах таблицы (по умолчанию равных 32). Таким образом, на выходе этих узлов плана от каждого шарда приходят LIMIT N
строк, отсортированных по a.c3
. Однако, чтобы предоставить пользователю именно LIMIT N
строк согласно ожидаемой пользователем сортировке, планировщику приходится упорядочить данные от всех шардов и только потом применить итоговое условие по LIMIT
(строки 3 и 4). В противном случае пользователь может получить совсем не то, что он ожидает. Эти результаты кешируются на ноде-координаторе запроса и становятся доступны остальной части плана через функцию pg_catalog.read_intermediate_result
. Механизм работы функции read_intermediate_result
описан в разделе Особенности выполнения запросов.
Планировщик Greenplum планирует запрос так:
QUERY PLAN
------------------------------------------------------
Aggregate (actual time=4350.773..4350.773 rows=1 loops=1)
-> Limit (actual time=254.085..3491.350 rows=10000000 loops=1)
-> Gather Motion 32:1 (slice1; segments: 32) (actual time=254.081..2457.549 rows=10000000 loops=1)
Merge Key: c3
-> Limit (actual time=240.614..352.447 rows=657684 loops=1)
-> Sort (actual time=240.609..285.729 rows=657684 loops=1)
Sort Key: c3
Sort Method: top-N heapsort Memory: 1310496kB
-> Seq Scan on a (actual time=0.078..110.649 rows=657714 loops=1)
Planning time: 27.504 ms
(slice0) Executor memory: 2105K bytes.
(slice1) Executor memory: 40994K bytes avg x 32 workers, 40994K bytes max (seg0). Work_mem: 40953K bytes max.
Memory used: 122880kB
Optimizer: Pivotal Optimizer (GPORCA)
Execution time: 4400.946 ms
Тут стоит обратить внимание на шаг Gather Motion 32:1
и его параметризацию в виде Merge Key: c3
(строки 2 и 3). Планировщик GPORCA
, зная, что результаты с каждого сегмента будут приходить упорядоченно согласно a.c3
(строка 4), имеет возможность сделать, по сути, соединение слиянием и применить LIMIT
уже на результатах этого набора строк (строка 1). Этот шаг формально соответствует узлу Limit
и Sort
под ним (строки 3 и 4 в плане Citus).
Ключевое различие планов состоит в том, что Citus для каждого шарда выполняет распределённый подплан Distributed Subplan
(строка 2), в рамках этого плана общий результат, полученный от шардов, сортируется (строка 4) и ограничивается узлом Limit
(строка 3). Результаты собираются на координаторе запроса и материализуются в виде файлов, которые на координаторе вычитываются функцией read_intermediate_result
(строка 8).
Greenplum в рамках Gather Motion делает это на лету и на практике показывает лучший результат:
// Citus $ time psql -d citus_db -c 'SELECT COUNT(*) FROM (SELECT * FROM a ORDER BY a.c3 LIMIT 10000000) AS foo;' >> /dev/null real 0m40.617s user 0m0.002s sys 0m0.003s // Greenplum $ time psql -d greenplum_db -c 'SELECT COUNT(*) FROM (SELECT * FROM a ORDER BY a.c3 LIMIT 10000000) AS foo;' >> /dev/null real 0m2.892s user 0m0.003s sys 0m0.004s
Этот же тип планировщика позволяет спланировать запросы с HAVING
в подзапросах, однако связанные (correlated
) запросы ему в отличие от Greenplum уже не по зубам:
SELECT foo.c1, foo.cnt, a.c3
FROM (SELECT b.c1, COUNT(*) AS cnt FROM b GROUP BY b.c1) foo
INNER JOIN a
ON a.c1 = foo.c1
GROUP BY foo.c1, foo.cnt, a.c3
HAVING foo.cnt >= (SELECT COUNT(*) FROM b WHERE b.c1 >= foo.cnt);
ERROR: Subqueries in HAVING cannot refer to outer query
Greenplum же справляется с этой задаче без проблем:
QUERY PLAN
-----------------------------------------------------------------------------------------------
Gather Motion 32:1 (slice2; segments: 32)
-> HashAggregate
Group Key: b.c1, (count()), a.c3
-> Hash Join
Hash Cond: (b.c1 = a.c1)
-> Result
Filter: ((count()) >= COALESCE(((SubPlan 1)), '0'::bigint))
-> Result
-> HashAggregate
Group Key: b.c1
-> Seq Scan on b
SubPlan 1 (slice2; segments: 32)
-> Aggregate
-> Result
Filter: (b_1.c1 >= (count()))
-> Materialize
-> Broadcast Motion 32:32 (slice1; segments: 32)
-> Seq Scan on b b_1
-> Hash
-> Seq Scan on a
Optimizer: Pivotal Optimizer (GPORCA)
Даже на довольно простых примерах Citus заставляет задумываться, а что мне нужно сделать со схемой и распределением данным, чтобы запрос всё-таки выполнялся. Планировщик GPORCA
на практике выглядит более продвинутым.
Например, для коррелированных подзапросов:
CREATE TABLE sales(brand TEXT, size INT, price NUMERIC(10,2), store INT);
CREATE TABLE
SELECT COUNT(*)
FROM sales s1
WHERE s1.size > 32 OR s1.price > (SELECT avg(s2.price) FROM sales s2 WHERE s2.brand = s1.brand);
count ------- 0 (1 row)
SELECT COUNT(*)
FROM sales s1
WHERE s1.seller
IN (SELECT s2.seller
FROM sales s2
WHERE s2.price = (SELECT min(s3.price) FROM sales s3 WHERE s3.brand = s1.brand));
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
Для первого запроса большие таблицы могут быть в дополнение к шардированию партицированы по каким-либо полям.
Logical Planner & Optimizer
Подробное рассмотрение этого уровня планирования выходит за рамки статьи. Задачи этого уровня отчасти сходны с решаемыми задачами groupping_planner
в PostgreSQL: это операции GROUP BY
, ORDER BY
, LIMIT
, агрегирующие функции.
Также на этом уровне осуществляется оптимизация операций соединений. Основная цель этой оптимизации состоит в сокращении числа соединений, ведущих к перераспределению данных. Судя по всему, ввиду специфики перераспределения на основе записи в файл промежуточных результатов и COPY
-протокола (об этом далее), оценки cost-based не производится.
Citus строит свой план выполнения на основе реализации узла CustomScan
.
Предлагаю разобрать выполнение запроса с точки зрения его процесса исполнения на простом запросе, но затрагивающем некоторые интересные моменты вроде выполнения подпланов, сохранения промежуточных результатов в виде файлов и их broadcast-копирования на ноды.
Для начала сам запрос и его план:
SELECT COUNT(DISTINCT b.c2)
FROM (SELECT DISTINCT a.c2 FROM a)
AS foo, b
WHERE foo.c2 = b.c2;
QUERY PLAN
-----------------------------------------------------------------------------------------------
Aggregate
Output: count(DISTINCT remote_scan.count)
-> Custom Scan (Citus Adaptive)
Output: remote_scan.count
-> Distributed Subplan 24_1
-> HashAggregate
Output: remote_scan.c2
Group Key: remote_scan.c2
-> Custom Scan (Citus Adaptive)
Output: remote_scan.c2
Task Count: 32
Tasks Shown: One of 32
-> Task
Query: SELECT DISTINCT c2 FROM public.a_102809 a WHERE true
Node: host=citus_host port=5432 dbname=citus_db
-> HashAggregate
Output: c2
Group Key: a.c2
-> Seq Scan on public.a_102809 a
Output: c1, c2, c3
Task Count: 32
Tasks Shown: One of 32
-> Task
Query: SELECT worker_column_1 AS count FROM (SELECT b.c2 AS worker_column_1 FROM (SELECT intermediate_result.c2 FROM read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(c2 integer)) foo, public.b_102841 b WHERE (foo.c2 OPERATOR(pg_catalog.=) b.c2)) worker_subquery GROUP BY worker_column_1
Node: host=citus_host port=5432 dbname=citus_db
-> HashAggregate
Output: b.c2
Group Key: b.c2
-> Hash Join
Output: b.c2
Hash Cond: (intermediate_result.c2 = b.c2)
-> Function Scan on pg_catalog.read_intermediate_result intermediate_result
Output: intermediate_result.c2
Function Call: read_intermediate_result('24_1'::text, 'binary'::citus_copy_format)
-> Hash
Output: b.c2
-> Seq Scan on public.b_102841 b
Output: b.c2
Мы уже знаем, что такой запрос приводит к созданию и выполнению подплана (строка, отмеченная цифрой 1), использованию результатов этого подплана в Hash Join
(строка с номером 6) и чтению промежуточных результатов в inner
-части соединения (строка 7).
Citus встраивается в процесс выполнения запросов в таких точках расширения: ExecutorStart_hook
, ExecutorRun_hook
, ExplainOneQuery_hook
, prev_ExecutorEnd
, ExecutorEnd_hook
.
Начнём наш отсчет от реализации ExecutorStart_hook
функции обратного вызова CitusExecutorRun
. Для такого запроса как наш, ядро PostgreSQL вызывает функцию CitusExecutorRun
в ExecutorRun
.
CitusExecutorRun
помимо вспомогательных, специфичных для Citus задач вроде пропуска проверок ограничений (имеется в виду constraints check
) на координаторе для запросов ALTER TABLE
(это делегируется на ноды), решает основную задачу — поиск узлов "состояние сканирования" (CitusScanState
), которые ранее были добавлены одним из планировщиков.
CitusScanState
представляет собой структуру-"контейнер", которая включает в себя:
состояние узла CustomScan
(CustomScanState
);
указатель на распределённый план выполнения (DistributedPlan
);
хранилище строк (кортежей) под результаты выполнения распределённого плана (Tuplestorestate
);
указатель на функцию обратного вызова PreExecScan
(об этом подробнее далее) и ряд других полей.
План (если быть точнее, то узлы PlanState
) обходится вглубь, и найденные экземпляры CitusScanState
добавляются в список.
Для нашего случая в этом списке будет один узел Custom Scan (Citus Adaptive)
(строка 1) и ассоциированный с ним экземпляр CustomScanState
. Для каждого из найденных узлов и ассоциированного с ним распределённого плана выполнения для распределённых таблиц берутся блокировки всех партиций, а также выполняются все подпланы, начиная с самого верхнего. В нашем случае это будет подплан Distributed Subplan 24_1
.
Получение результатов на нодах осуществляется через экземпляр DestReceiver
. Нода-координатор создаёт так называемый RemoteFileDestReceiver
.
В его задачи входит:
установление соединений с нодами по протоколу libpq
;
отправка команды COPY
этим нодам (об этом подробнее далее);
приём кортежей;
подготовка данных кортежей под команду COPY
и пересылка этих строк на ноды (BroadcastCopyData
).
Как только RemoteFileDestReceiver
инициализирован, Citus начинает выполнение подплана (строка 2). Для этого используется механизм порталов (Portal
). В портал отправляется спланированная ранее часть запроса HashAggregate
(строка 3). Сам портал принимает в качестве одного из своих аргументов экземпляр DestReceiver
, которым и выступает RemoteFileDestReceiver
.
В ходе обработки запроса порталом подзапрос опять попадает на планирование, происходит штатный вызов ExecutorStart_hook
, который в свою очередь вызывает функцию CitusExecutorRun
. Она по аналогии с основным запросом находит и выполняет вложенный в подзапрос узел CustomScan
(строка 4).
Далее план попадает на вход функции ядра ExecutePlan
. Так как верхнеуровневым узлом плана является узел агрегатной функции HashAggregate
(строка 3, наш COUNT(DISTINCT)
), то управление передается ядру (модуль nodeAgg.c), а оно в свою очередь вызывает нижележащий узел, которым является узел CustomScan
Citus.
Для выполнения задач сканирования (то есть непосредственного получения строк) реализация CustomScan
использует так называемый AdaptiveExecutor
. Полноценное рассмотрение этого модуля займёт ещё одну такую же статью, рассмотрим только основные задачи, касающиеся нашего запроса.
Верхнеуровнево, основная задача AdaptiveExecutor
— выполнение набора заданий (list of tasks). Примером задания может служить запрос к шарду (по сути, к таблице). В нашем случае планировщик определил 32 задания (по числу шардов таблицы).
Также AdaptiveExecutor
создает TupleStore
для временного хранения полученных результатов.
Как работает функция read_intermediate_result
:
Задача read_intermediate_result
— представить промежуточный результат в формате COPY
, полученный в ходе broadcast
-выполнения подплана либо шафлинга (Repartitioning, об этом в деталях далее) в виде набора строк. Файл с промежуточным результатами парсится и преобразуется в те форматы колонок, которые вызывающая сторона передала в качестве параметров. Например, в случае нашего запроса (строка 5) read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(c2 integer)
первый аргумент 24_1
представляет собой идентификатор результирующего набора строк, второй аргумент задаёт формат ('binary'
). Идентификатор далее используется для получения имени файла в каталоге pgsql_job_cache.
Далее функция ReadFileIntoTupleStore
перегоняет строки из формата COPY
(используется PostgreSQL-реализация модуля copy.c) в Tuplestorestate
. Далее результаты уже представлены в виде кортежей.
Где хранятся промежуточные результаты:
Промежуточные результаты выполнения запросов хранятся в каталоге base/pgsql_job_cache на каждой ноде. Если эти результаты используются в распределённой транзакции, то это каталог base/pgsql_job_cache/<user id>_<coordinator node id>_<transaction number>/. Если распределённая транзакция не используется, то этот путь base/pgsql_job_cache/<user id>_<process id>/.
Пример размещения:
$ pwd
/var/lib/pgsql/14/data/base/pgsql_job_cache/10_0_237
$ ll
total 178660 -rw------- 1 postgres postgres 182944635 Aug 27 02:14 1_1.data
Как Citus делает соединение не по ключу шардирования в том случае, когда таблицы не распределены по этому ключу?
Необходимым условием для выполнения таких соединений является выставленный в on
GUC citus.enable_repartition_joins
. Иначе Citus ругается следующим образом: ERROR: the query contains a join that requires repartitioning
. По умолчанию enable_repartition_joins
выставлен в off
.
Рассмотрим следующий запрос для случая, когда таблица a
распределена по ключу a.c1
, а таблица b
по ключу b.c3
(для упрощения разбора механизма предварительно распределим каждую таблицу по 2 шардам):
SELECT a.c2, COUNT(*)
FROM a
INNER JOIN b ON a.c1 = b.c1
GROUP BY a.c2
LIMIT 100;
QUERY PLAN
----------------------------------------------------------------------------------------------------
Limit (actual time=88944.560..88944.563 rows=1 loops=1)
Output: remote_scan.c2, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))
-> HashAggregate (actual time=88944.559..88944.560 rows=1 loops=1)
Output: remote_scan.c2, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)
Group Key: remote_scan.c2
Batches: 1 Memory Usage: 40kB
-> Custom Scan (Citus Adaptive) (actual time=88944.536..88944.539 rows=12 loops=1)
Output: remote_scan.c2, remote_scan.count
Task Count: 12
Tuple data received from nodes: 132 bytes
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 2
Merge Task Count: 12
-> MapMergeJob
Map Task Count: 2
Merge Task Count: 12
Planning Time: 3.422 ms
Execution Time: 88944.605 ms
Для выполнения подобного соединения задействуется repartitioning
-механизм, основанный на разбиении на задания (Jobs
) с одноуровневой иерархией, где некоторый набор заданий зависим от основного. По сути, этот набор заданий представляет собой вариацию подпланов, результаты выполнения которых перераспределяются между нодами согласно ключам. Отличие от обычных подпланов в том, что в заданиях отсутствует этап объединения результатов (имеется в виду этап merge
), в подпланах результат в свою очередь всегда полностью перераспределяется по нодам (broadcast
). Задания включаются в распределённый план выполнения и выполняются в 2 этапа.
На первом этапе на каждый из 4 шардов обеих таблиц (для обслуживания используется отдельное libpq
-соединение) приходит запрос вида:
SELECT
partition_index,
'repartition_2198637379588_1' || '_' || partition_index::text ,
rows_written
FROM pg_catalog.worker_partition_query_result(
'repartition_2198637379588_1',
'SELECT c2 AS column1, c1 AS column2 FROM public.a_102080 a WHERE true',
1,
'hash',
'{-2147483648,-1789569707,-1431655766,-1073741825,-715827884,-357913943,-2,357913939,715827880,1073741821,1431655762,1789569703}'::text[],
'{-1789569708,-1431655767,-1073741826,-715827885,-357913944,-3,357913938,715827879,1073741820,1431655761,1789569702,2147483647}'::text[],
true,
true,
true
)
WHERE rows_written > 0
Самое важное в этих запросах:
Из строки, отмеченной цифрой 1, в дальнейшем формируется название файла, который будет содержать промежуточные результаты запроса из строки с номером 2.
Строки 3 и 4 содержат диапазоны хеш-значений, согласно которым будет производиться разбиение задачи на части (Merge Task Count
). Можно увидеть, что в массивах partition_min_values
(первый аргумент-массив) и partition_max_values
(второй) как раз по 12 элементов.
Внутри это приводит к вызову функции worker_partition_query_result
. Задача этой функции — получить результаты выполнения запроса и записать их в локальные (для ноды) файлы, согласно схеме шардирования и ключу шардирования. Для получения и записи результатов в промежуточные файлы используются экземпляры DestReceiver
. Для каждого файла с результатами создаётся свой получатель. При этом общее число файлов для одного шарда задаётся значением параметра Merge Task Count
, в нашем случае равным 12. По готовности получателей запускается портал (PortalRun
), который и исполняет запрос.
После выполнения этой операции для 1 из 4 шардов содержимое каталога pgsql_job_cache для соответствующей распределённой транзакции будет примерно таким:
$ ll
total 56 drwx------ 2 citus citus 4096 sep 12 16:38 ./ drwx------ 3 citus citus 4096 sep 12 16:04 ../ -rw------- 1 citus citus 21 sep 12 16:38 repartition_2198637379588_2_0.data -rw------- 1 citus citus 1653 sep 12 16:38 repartition_2198637379588_2_10.data -rw------- 1 citus citus 1517 sep 12 16:38 repartition_2198637379588_2_11.data -rw------- 1 citus citus 21 sep 12 16:38 repartition_2198637379588_2_1.data -rw------- 1 citus citus 21 sep 12 16:38 repartition_2198637379588_2_2.data -rw------- 1 citus citus 21 sep 12 16:38 repartition_2198637379588_2_3.data -rw------- 1 citus citus 21 sep 12 16:38 repartition_2198637379588_2_4.data -rw------- 1 citus citus 21 sep 12 16:38 repartition_2198637379588_2_5.data -rw------- 1 citus citus 1262 sep 12 16:38 repartition_2198637379588_2_6.data -rw------- 1 citus citus 1500 sep 12 16:38 repartition_2198637379588_2_7.data -rw------- 1 citus citus 1347 sep 12 16:38 repartition_2198637379588_2_8.data -rw------- 1 citus citus 1177 sep 12 16:38 repartition_2198637379588_2_9.data
Таким образом, для одной таблицы, участвующей в процессе repartitioning
, число промежуточных файлов для данного этапа будет равно Map Task Count
(Table Shard Count) * Merge Task Count
.
При этом обратите внимание, что для одной и той же таблицы можно увидеть такое содержимое каталогов для 2 узлов:
node #1
$ ll | grep 594_
-rw------- 1 citus citus 1381 sep 12 17:15 repartition_2198637379594_1_0.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_10.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_11.data -rw------- 1 citus citus 1432 sep 12 17:15 repartition_2198637379594_1_1.data -rw------- 1 citus citus 1449 sep 12 17:15 repartition_2198637379594_1_2.data -rw------- 1 citus citus 1364 sep 12 17:15 repartition_2198637379594_1_3.data -rw------- 1 citus citus 1619 sep 12 17:15 repartition_2198637379594_1_4.data -rw------- 1 citus citus 1551 sep 12 17:15 repartition_2198637379594_1_5.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_6.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_7.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_8.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_9.data
node #2
$ ll | grep 594_
-rw------- 1 citus citus 21 sep 12 17:16 repartition_2198637379594_2_0.data -rw------- 1 citus citus 1653 sep 12 17:16 repartition_2198637379594_2_10.data -rw------- 1 citus citus 1517 sep 12 17:16 repartition_2198637379594_2_11.data -rw------- 1 citus citus 21 sep 12 17:16 repartition_2198637379594_2_1.data -rw------- 1 citus citus 21 sep 12 17:16 repartition_2198637379594_2_2.data -rw------- 1 citus citus 21 sep 12 17:16 repartition_2198637379594_2_3.data -rw------- 1 citus citus 21 sep 12 17:16 repartition_2198637379594_2_4.data -rw------- 1 citus citus 21 sep 12 17:16 repartition_2198637379594_2_5.data -rw------- 1 citus citus 1262 sep 12 17:16 repartition_2198637379594_2_6.data -rw------- 1 citus citus 1500 sep 12 17:16 repartition_2198637379594_2_7.data -rw------- 1 citus citus 1347 sep 12 17:16 repartition_2198637379594_2_8.data -rw------- 1 citus citus 1177 sep 12 17:16 repartition_2198637379594_2_9.data
Отсюда видно, что множество данных, судя по именам файлов и их содержанию, не пересекается и каждый каталог содержит свою порцию данных, которые на следующем этапе уже перераспределяются для возможности сделать локальное соединение по ключам.
На втором этапе задействуется функция fetch_intermediate_results
, в задачи которой входит вычитка тех самых промежуточных результатов с других узлов с сохранением этих результатов уже как промежуточных локальных. Перед этим в каталог с промежуточными данными копируются данные для обеспечения локальных соединений. Например, после такого вызова:
SELECT bytes
FROM fetch_intermediate_results(ARRAY['repartition_2198637379594_2_2']::text[],'localhost',8003) bytes;
содержание промежуточного каталога для таблицы выше будет таким:
$ ll | grep 594_
-rw------- 1 citus citus 1381 sep 12 17:15 repartition_2198637379594_1_0.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_10.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_11.data -rw------- 1 citus citus 1432 sep 12 17:15 repartition_2198637379594_1_1.data -rw------- 1 citus citus 1449 sep 12 17:15 repartition_2198637379594_1_2.data -rw------- 1 citus citus 1364 sep 12 17:15 repartition_2198637379594_1_3.data -rw------- 1 citus citus 1619 sep 12 17:15 repartition_2198637379594_1_4.data -rw------- 1 citus citus 1551 sep 12 17:15 repartition_2198637379594_1_5.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_6.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_7.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_8.data -rw------- 1 citus citus 21 sep 12 17:15 repartition_2198637379594_1_9.data -rw------- 1 citus citus 1500 sep 12 17:34 repartition_2198637379594_2_2.data <====== данные другого шарда
Для соединения с другими узлами используется libpq
-протокол и команда COPY
. На этом этапе, собственно, и происходит соединение по уже ставшим локальными данным.
Далее эти результаты используются уже знакомой нам функцией read_intermediate_result
.
Схематично описанный процесс выглядит следующим образом.
Мы подошли к интересной и сложной теме видимости изменений разными транзакциями, блокировкам и самим распределённым транзакциям. Тема обширная, с массой нюансов. В рамках этой статьи я хотел бы показать основные отличия на примере пары запросов.
Для начала общая архитектура Citus в этой части:
Citus для соединений между узлами использует стандартный libpq
-протокол и, соответственно, libpq
-соединения. Соединения кешируются, переиспользуются; для достижения лучшей степени параллелизации при исполнении команд, при возможности, используются несколько соединений с одним узлом.
Важный нюанс состоит в том, что при такой организации доступа к данным, например, для транзакций, затрагивающих несколько таблиц, после выполнения операции записи в шард все транзакции, работающие с шардами этой группы, должны использовать то же соединение.
В противном случае другие команды в рамках этой же транзакции не смогли бы увидеть ещё не закоммиченные данные. Схожее требование возникает и при записи в справочную (reference
) таблицу, после которой выполняются команды чтения из этой таблицы, в том числе и в операциях соединения с распределёнными таблицами (distributed
). Это не всегда возможно в принципе. В документации приведен пример такого случая, если GUC citus.multi_shard_modify_mode
не задан как sequential
:
BEGIN;
DELETE FROM dist_table;
INSERT INTO reference_table VALUES (1,2);
SELECT * FROM dist_table JOIN reference_table ON (x = a);
ERROR: cannot perform query with placements that were modified over multiple connections
Операция DELETE
(строка, отмеченная цифрой 2) в рамках общей транзакции с несколькими командами (строка 1) затрагивает несколько шардов. Если эта операция будет выполняться параллельно, то будут задействованы несколько соединений. Таким образом, с учетом требования по reference
-таблицам становится невозможным использовать одно соединение для команды join
(строка 3).
В целом, Citus вынужден аккуратно обращаться с соединениями по многим аспектам: с одной стороны — контролировать число параллельных соединений, чтобы не перегрузить ноды; с другой стороны — соблюдать правила видимости данных и не допускать взаимных блокировок. За последние два аспекта отвечает отдельный модуль Placement connection tracking
. В его задачи входит отслеживание того, какие группы шардов затрагивались операциями DML/DDL/SELECT
данной транзакцией, и принятие решения, какие соединения использовать для дальнейших операций.
Модуль руководствуется такими правилами:
допускается использование разных соединений (т.е. становится возможным параллелизация исполнения команд) для последовательных (с точки зрения порядка выражений в транзакции) SELECT
-запросов;
допускается использование разных соединений для DML
-запросов после SELECT
-запросов;
в иных случаях разрешается использование только одного и того же соединения.
В Greenplum для этих целей реализован механизм читающих и пишущих процессов в рамках группы связанных процессов (gangs
). При создании соединений между QE
и QD
последний задаёт параметр, который определяет, будет ли процесс читающим или пишущим. Допускается только один пишущий процесс на группу связанных процессов, он же и будет выполнять задачи записи в рамках распределённой транзакции. Таким образом, только этот процесс может менять состояние БД и, как правило, он же отвечает и за управление блокировками в рамках транзакции (LockHolder
). Мне такое чёткое разделение представляется более прозрачным подходом.
Касательно протоколов взаимодействия между нодами. В Greenplum протокол libpq
пропатчен, в частности, под распределённую обработку (в основном изменения касаются взаимодействия QD
и QE
). Интерфейс обработки команд, полученных по libpq
, расширен, например, такими командами как:
'M'
— команда под выполнение сегментами (QE
) спланированных на мастере (QD
) запросов (CdbDispatchPlan
);
'T'
— команда управления протоколом распределённых транзакций (DtxProtocolCommand
).
Для интерконнекта используется несколько вариантов: tcp
, ufpifc
и ic proxy
. Центральным элементом такого взаимодействия являются узлы Motion
, которые и отвечают за передачу данных между сегментами с учётом их вариантов в виде Gather Motion
, Redistribute Motion
и Broadcast Motion
.
Таким образом, Citus полагается полностью на общепринятый libpq
, в то время как Greenplum использует свой вариант протокола libpq
для отправки запросов на сегменты, а за перемещение данных отвечает уже один из вариантов интерконнекта. Своя реализация протокола означает сложности с вливанием изменений апстрим-версии, но в целом это касается общего недостатка Greenplum, как существенно "допиленного" под MPP PostgreSQL.
Если сравнивать узлы Motion
с их ближайшими концептуальными аналогами в виде описанных выше реализаций fetch_intermediate_results
, read_intermediate_result
и worker_partition_query_result
, то в Greenplum отсутствует перекладывание данных между файлами с узла на узел, использование команды COPY
и т.п. Мотивация такого решения создателями Citus понятна — максимально воспользовались имеющимся в PostgreSQL инструментарием. Насколько эффективна такая реализация — вопрос открытый. Мне представляется, что передача узлами Motion
данных напрямую "в сеть" является более эффективным подходом.
С точки зрения аналогов параллелизации исполнения запросов, ближайшим аналогом слайсов Greenplum — независимо выполняемых частей плана, "мостиком" между которыми как раз выступают разного рода узлы Motion
— как мне представляется, являются задачи (Tasks
) и задания (Jobs
).
Для пишущих транзакций, которые затрагивают шарды на разных нодах Citus использует PostgreSQL-реализацию двухфазного коммита (2PC
) с некоторыми доработками под распределённую среду (связанными с HA и авто-восстановлением 2PC
-транзакций после сбоя).
Сам подход и частности его реализации в Citus описаны в массе статей. В его основе выражения PREPARE TRANSACTION
, COMMIT PREPARED
и ROLLBACK PREPARED
, которые PostgreSQL предлагает для внешних менеджеров транзакций. Я же предлагаю сравнить и подсветить преимущества реализации схожего, но, по факту, более продвинутого механизма в Greenplum.
Основной недостаток реализации распределённых транзакций Citus, по моему мнению — отсутствие гарантий соблюдения правил видимости для закоммиченных транзакций в рамках всего кластера.
Чтобы продемонстрировать возможные последствия подобной ситуации, я добавил задержку в блок кода, который отправляет команды COMMIT PREPARED
на рабочие ноды согласно реализации протокола 2PC
. Последовательность запросов такая:
BEGIN;
INSERT INTO a VALUES (1, 'foo', 1);
INSERT INTO a VALUES (2, 'foo', 2);
COMMIT;
SELECT * FROM a;
c1 | c2 | c3 ----+-----+---- 1 | foo | 1 2 | foo | 2 (2 rows)
Далее отправляем запросы INSERT
:
BEGIN;
INSERT INTO a VALUES (3, 'foo', 3);
INSERT INTO a VALUES (4, 'foo', 4);
COMMIT;
Но, так как в рамках COMMIT
(строке, отмеченной цифрой 2) между этими двумя транзакциями у нас добавлена искусственная (но возможная в реальности) задержка, то в параллельной сессии становится возможным получение таких данных:
SELECT * FROM a;
c1 | c2 | c3 ----+-----+---- 1 | foo | 1 3 | foo | 3 2 | foo | 2 (3 rows)
Проблема в строке, отмеченной цифрой 1, где мы видим, что SELECT
-запрос в параллельной сессии выдал строку (3, 'foo', 3)
, но не выдал (4, 'foo', 4)
.
Таким образом, отсюда следует очевидный вывод — Citus в рамках распределённой СУБД согласно модели консистентности распределённых систем попадает в классификацию Read Uncommitted
.
Причина такого поведения данной СУБД в том, что в Citus отсутствует понятие единого снимка (имеется ввиду Snapshot
для реализации MVCC в PostgresSQL) для распределённых транзакций.
Ранее подготовленные транзакции на отдельных нодах применяются в разное время. Таким образом, некоторая последующая транзакция сможет прочитать данные, которые применились на одной ноде, но ещё не успели примениться на другой.
По сути, это нарушает и принцип атомарности — пользователь увидит только часть данных закоммиченной транзакции. На мой взгляд, это может являться ощутимым недостатком Citus.
В Greenplum такая ситуация не допускается. Ядро системы отслеживает информацию об активных распределённых транзакциях. Для этого в общеиспользуемой памяти процессов каждого из бэкендов хранится идентификатор распределённой транзакции (gxid
) по аналогии с обычными идентификаторами локальных транзакций PostgreSQL (xid
). Этот идентификатор присваивается, если транзакции требуется 2PC
(т.е. транзакция является распределённой). За генерацию и присваивание значения gxid
отвечает координатор запроса (QD
).
Как уже говорилось ранее, в Greenplum расширен набор команд, которые обрабатываются бэкендами. В частности, командами 'M'
и 'T'
. Обработчики этих команд в том числе принимают сериализованную информацию для распределённой обработки транзакций от QD
. Эти данные включают в себя и глобальный (распределённый) снимок.
Greenplum расширяет PostgreSQL-структуру для хранения снимков с целью хранения информации по глобальному снимку. Далее этот снимок используется для проверки видимости строк. Как говорилось ранее, только один процесс в gang
может быть пишущим (writer
) — таким образом, читающие (reader
) процессы полагаются на то, что снимок получил пишущий процесс. Это в том числе решает проблемы видимости строк, затронутых пишущей частью транзакции (т.е. то, что Citus приходится решать манипуляциями с тем, какие libpq
-соединения можно использовать).
Упрощённая модель проверки видимости строки с использованием глобальных снимков следующая.
Если идентификатор xmin
данной строки соответствует транзакции, которая является применённой (правила видимости для ещё не применённых и frozen
-транзакций оставим в данном случае за скобками), то далее Greenplum проверяет видимость с учётом снимка видимости для распределённой транзакции.
На примере запроса, который запускался ранее, Greenplum производит проверку, является ли данная транзакция распределённой и нужно ли проверять по глобальному снимку видимости. В нашем случае распределённая транзакция ещё не имела статуса завершённой, поэтому данная строка не является видимой для запроса в другой сессии.
Citus
Citus следует таким правилам исполнения и получения результирующего значения функций:
Запрет использования функций stable
и volatile
в RETURNING
для INSERT
-запросов.
Вычисление значения, в том числе и для функций volatile
, на координаторе запроса для INSERT
-запросов.
Запрет использования функций volatile
в UPDATE
и DELETE
-запросах.
Для SELECT
-запросов Citus не вычисляет значения функций на координаторе.
Последний пункт может приводить к потенциальным проблемам. Например, для следующего запроса:
SELECT *
FROM d INNER JOIN r
ON d.c1 = r.c1
WHERE r.c1 = ROUND(RANDOM());
Citus строит такой план:
QUERY PLAN
------------------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Task Count: 32
Tasks Shown: One of 32
-> Task
Node: host=10.92.40.201 port=5432 dbname=postgres
-> Hash Join
Hash Cond: (d.c1 = r.c1)
-> Seq Scan on d_103166 d
-> Hash
-> Seq Scan on r_103198 r
Filter: ((c1)::double precision = round(random()))
Согласно правилам вызова функций, он вычисляет условие в WHERE
(строка, отмеченная цифрой 4) для каждого шарда (строка с номером 1). Таким образом, теоретически возможна следующая ситуация:
Shard 1 | Shard 2 | |
---|---|---|
Distributed table value ( |
{ 0 } |
{ 1 } |
Reference table value ( |
{ 0, 1 } |
{ 0, 1 } |
Допустим, в распределённой таблице d
хранится 2 строки. На первом шарде Shard 1
значение атрибута равно 0
, на втором шарде Shard 2
значение равно 1
. В справочной таблице хранятся такие же 2 строки.
Согласно плану выполнения выборка из таблицы r
производится на каждом шарде согласно совпадению вычисленного значения round(random())
, т.е. это либо 0
, либо 1
. С некоторой вероятностью возможно получение таких кортежей на выходе узла Seq Scan
(строка 3): на первом Shard 1
получена только строка с ключом 1
, на втором с ключом 0
.
Однако планировщик сделал pushdown соединения Hash Join
на каждый шард (строка 2). Очевидно, что в данной комбинации ключей пользователь с некоторой долей вероятности получит результат, который бы он никогда не получил, например, для обычного PostgreSQL!
Так как "суперпозицией" проекций r
для обоих шардов является множество { 1, 0 }
, которое бы сматчилось по ключам со строками 0
и 1
распределённой таблицы d
, в нашем случае дало пустое множество после объединения результатов с шардов:
Shard 1 | Shard 2 | |
---|---|---|
Distributed table value ( |
{ 0 } |
{ 1 } |
Reference table selected keys with random() selection |
{ 1 } |
{ 0 } |
local |
{ ∅ } |
{ ∅ } |
after custom scan |
{ ∅ } |
Greenplum
В свою очередь Greenplum планирует запрос таким образом, чтобы соответствовать критериям "идентичности результатов" нераспределённому хранилищу. Важный момент, что выборку с волатильной функцией random()
в рамках запроса к реплицируемой (аналог таблицы Citus reference
) он строит единожды и распределяет по сегментам через Redistribute Motion
(строка, отмеченная цифрой 2).
QUERY PLAN
------------------------------------------------------------------------------------
Gather Motion 3:1 (slice2; segments: 3)
-> Hash Join
Hash Cond: (d.c1 = r.c1)
-> Seq Scan on d
-> Hash
-> Redistribute Motion 1:3 (slice1; segments: 1)
Hash Key: r.c1
-> Result
-> Seq Scan on r
Filter: ((c1)::double precision = round(random()))
Optimizer: Postgres query optimizer
Таким образом, Greenplum может обеспечить корректность соединения Hash Join
(строка 1), тем самым соответствуя возможным вариантам выборки нераспределённой СУБД:
Shard 1 | Shard 2 | |
---|---|---|
Distributed table value ( |
{ 0 } |
{ 1 } |
Replicated table value ( |
{ 0, 1 } |
{ 0, 1 } |
Так как результаты выборки Seq Scan
(строка 3) будут идентичны для всех сегментов, то каждый сегмент сможет сматчить каждый по своему ключу и дать корректный результат:
Shard 1 | Shard 2 | |
---|---|---|
Distributed table value ( |
{ 0 } |
{ 1 } |
Replicated table selected keys with random() selection |
{ 0, 1 } |
|
local |
{ 0 } |
{ 1 } |
after gather motion |
{ 0, 1 } |
По итогу, в Citus в такой комбинации используемых таблиц есть некоторая вероятность получить некорректные с точки зрения соответствия нераспределённой СУБД результаты выборки.
В этой части мы затронули основные понятия Citus с пользовательской и архитектурной точек зрения. Рассмотрели планировщики и некоторые нюансы их работы и сравнили со схожими частями с Greenplum.
В следующей части нас ждёт рассмотрение процесса ребаланса шардов — довольно удобного с практической точки зрения решения проблемы горизонтального масштабирования СУБД. Также мы сравним подходы к контролю за распределением ресурсов.
И на самое вкусное — рассмотрим и проанализируем результаты проведения сравнительных TPC-DS тестов Greenplum vs Citus.
Будьте на связи!