Использование кастомных форматов и протоколов для внешних таблиц
Greengage DB по умолчанию позволяет загружать и выгружать внешние данные через внешние таблицы в формате текста с разделителями (TEXT) или значений, разделенных запятыми (CSV), используя один из поддерживаемых встроенных протоколов.
Вы можете использовать другие форматы, определив собственный формат данных или протокол соединения. Кастомный формат позволяет работать с данными, хранящимися в произвольном формате. Кастомный протокол позволяет подключить Greengage DB к источнику данных, не доступному с использованием других протоколов.
Кастомные форматы и протоколы реализуются путем подключения пользовательского кода (например, общей библиотеки) в Greengage DB посредством динамической загрузки.
Кастомный формат данных
Кастомный формат позволяет работать с данными, хранящимися в произвольном формате. Общие шаги работы с кастомным форматом данных следующие:
-
Реализуйте и скомпилируйте функции ввода и вывода как общую библиотеку-форматтер.
-
Укажите функцию общей библиотеки с помощью команды
CREATE FUNCTIONв Greengage DB. -
При определении внешней таблицы укажите собственный формат данных в выражении
FORMATкомандыCREATE EXTERNAL TABLE:FORMAT 'CUSTOM' (FORMATTER=<format_function>, key1=val1,...keyN=valN)где:
-
FORMATTERуказывает функцию форматирования (<format_function>), применяемую к данным. -
key1=val1,…keyN=valNуказывает разделенный запятыми список параметров функции форматирования.
-
В Greengage DB встроены функции форматирования данных фиксированной ширины. Для форматирования данных переменной ширины вам потребуется реализовать функции форматирования самостоятельно (см. Пример использования).
Форматирование данных фиксированной ширины
В файлах данных фиксированной ширины на каждое поле отводится определенное количество символов.
Чтобы использовать данные фиксированной ширины с внешней таблицей, в выражении FORMAT команды CREATE EXTERNAL TABLE укажите одну из встроенных функций форматирования (fixedwidth_in для чтения данных или fixedwidth_out — для записи) следующим образом:
FORMAT 'CUSTOM' (
formatter=fixedwidth_in | fixedwidth_out,
<field_name1>=<field_length1>,
...,
<field_nameN>=<field_lengthN>,
preserve_blanks=on | off
null='null_string_value'
line_delim='line_ending_value'
)
Установите значения параметров функции форматирования.
| Параметр | Описание |
|---|---|
fixedwidth_in |
Указывает функцию форматирования, используемую для чтения данных |
fixedwidth_out |
Указывает функцию форматирования, используемую для записи данных |
<field_name>=<field_length> |
Указывает список полей и их длину. Поля должны быть перечислены в порядке их физического следования в файле данных.
Имена полей должны совпадать с именами столбцов в команде |
preserve_blanks |
Определяет, должны ли сохраняться конечные пробелы. По умолчанию конечные пробелы обрезаются ( |
null |
Указывает значение нулевого символа. Если параметру Если параметру |
line_delim |
Указывает символ переноса строки. Нижеприведенные примеры покрывают большинство случаев (
|
Пример
В данном примере используется файл customers.txt со строками фиксированной ширины.
Файл располагается в каталоге /tmp сегмент-хоста sdw1 и имеет следующее содержимое:
1 John Doe New York 2 Jane Smith Los Angeles 3 Alice Lee San Francisco
-
На мастер-хосте запустите команду
CREATE EXTERNAL TABLE. В выраженииFORMATукажитеCUSTOMв качестве формата данных, в качестве функции форматирования установитеfixedwidth_inи перечислите поля и их длины:CREATE EXTERNAL TABLE customers ( id INTEGER, name VARCHAR(20), city VARCHAR(20) ) LOCATION ('file://sdw1/tmp/customers.txt') FORMAT 'CUSTOM' (FORMATTER=fixedwidth_in, id='4', name='12', city='13'); -
Выполните запрос к созданной таблице:
SELECT * FROM customers;Вывод должен выглядеть следующим образом:
id | name | city ----+------------+--------------- 1 | John Doe | New York 2 | Jane Smith | Los Angeles 3 | Alice Lee | San Francisco
Кастомный протокол
Кастомный протокол позволяет подключить Greengage DB к источнику данных, не доступному с использованием поддерживаемых встроенных протоколов. Кастомные протоколы можно использовать, когда имеющихся встроенных протоколов недостаточно для конкретной потребности. Кастомный протокол потребуется реализовать самостоятельно (см. Пример использования).
Общие шаги работы с кастомным протоколом следующие:
-
Реализуйте на C функции отправки, приема и (опционально) валидации с указанными интерфейсами.
-
Скомпилируйте функции в общий объект (.so).
-
В Greengage DB используйте команду
CREATE FUNCTIONи объявите функции базы данных, которые указывают на файл .so и имена функций. -
Используйте команду
CREATE TRUSTED PROTOCOLдля включения протокола в базу данных. Вы должны быть суперпользователем для создания и регистрации кастомного протокола. -
Используйте команду
GRANT, чтобы выдать пользователям соответствующие привилегии, например:-
Разрешить пользователю создание читающих внешних таблиц с использованием доверенного протокола:
GRANT SELECT ON PROTOCOL <protocol_name> TO <user_name>; -
Разрешить пользователю создание пишущих внешних таблиц с использованием доверенного протокола:
GRANT INSERT ON PROTOCOL <protocol_name> TO <user_name>; -
Разрешить пользователю создание читающих и пишущих внешних таблиц с использованием доверенного протокола:
GRANT ALL ON PROTOCOL <protocol_name> TO <user_name>;Более подробную информацию об управлении доступом в Greengage DB можно получить в статье Роли и привилегии.
-
Пример использования
Этот пример демонстрирует реализацию API Greengage DB для пользовательского протокола доступа к данным и пользовательского формата данных. В репозитории исходного кода Greengage DB вы можете просмотреть полный API функций ввода/вывода и форматирования внешних таблиц.
-
В каталоге /tmp мастер-хоста создайте файл demoprotocol.c, содержащий реализацию протокола на языке C:
#include "postgres.h" #include "fmgr.h" #include "funcapi.h" #include "access/extprotocol.h" #include "catalog/pg_proc.h" #include "commands/defrem.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "catalog/pg_exttable.h" typedef struct DemoUri { char *protocol; char *path; } DemoUri; static DemoUri *ParseDemoUri(const char *uri_str); static void FreeDemoUri(DemoUri* uri); /* Do the module magic dance */ PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(demoprot_export); PG_FUNCTION_INFO_V1(demoprot_import); PG_FUNCTION_INFO_V1(demoprot_validate_urls); Datum demoprot_export(PG_FUNCTION_ARGS); Datum demoprot_import(PG_FUNCTION_ARGS); Datum demoprot_validate_urls(PG_FUNCTION_ARGS); typedef struct { char *url; char *filename; FILE *file; } extprotocol_t; static void check_ext_options(const FunctionCallInfo fcinfo) { ListCell *cell; Relation rel = EXTPROTOCOL_GET_RELATION(fcinfo); ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id); List *options = exttbl->options; foreach(cell, options) { DefElem *def = (DefElem *) lfirst(cell); char *key = def->defname; char *value = defGetString(def); if (key && strcasestr(key, "database") && !strcasestr(value, "greengage")) { ereport(ERROR, (0, errmsg("This is greengage."))); } } } /* * Import data into GPDB. */ Datum demoprot_import(PG_FUNCTION_ARGS) { extprotocol_t *myData; char *data; int datlen; size_t nread = 0; /* Must be called via the external table format manager */ if (!CALLED_AS_EXTPROTOCOL(fcinfo)) elog(ERROR, "extprotocol_import: not called by external protocol manager"); /* Get our internal description of the protocol */ myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo); if(EXTPROTOCOL_IS_LAST_CALL(fcinfo)) { /* we're done receiving data. close our connection */ if(myData && myData->file) if(fclose(myData->file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", myData->filename))); PG_RETURN_INT32(0); } if (myData == NULL) { /* first call. do any desired init */ const char *p_name = "demoprot"; DemoUri *parsed_url; char *url = EXTPROTOCOL_GET_URL(fcinfo); myData = palloc(sizeof(extprotocol_t)); myData->url = pstrdup(url); parsed_url = ParseDemoUri(myData->url); myData->filename = pstrdup(parsed_url->path); if(strcasecmp(parsed_url->protocol, p_name) != 0) elog(ERROR, "internal error: demoprot called with a different protocol (%s)", parsed_url->protocol); /* An example of checking options */ check_ext_options(fcinfo); FreeDemoUri(parsed_url); /* open the destination file (or connect to remote server in other cases) */ myData->file = fopen(myData->filename, "r"); if (myData->file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("demoprot_import: could not open file \"%s\" for reading: %m", myData->filename))); EXTPROTOCOL_SET_USER_CTX(fcinfo, myData); } /* ======================================================================= * DO THE IMPORT * ======================================================================= */ data = EXTPROTOCOL_GET_DATABUF(fcinfo); datlen = EXTPROTOCOL_GET_DATALEN(fcinfo); if(datlen > 0) { nread = fread(data, 1, datlen, myData->file); if (ferror(myData->file)) ereport(ERROR, (errcode_for_file_access(), errmsg("demoprot_import: could not write to file \"%s\": %m", myData->filename))); } PG_RETURN_INT32((int)nread); } /* * Export data out of GPDB. */ Datum demoprot_export(PG_FUNCTION_ARGS) { extprotocol_t *myData; char *data; int datlen; size_t wrote = 0; /* Must be called via the external table format manager */ if (!CALLED_AS_EXTPROTOCOL(fcinfo)) elog(ERROR, "extprotocol_export: not called by external protocol manager"); /* Get our internal description of the protocol */ myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo); if(EXTPROTOCOL_IS_LAST_CALL(fcinfo)) { /* we're done sending data. close our connection */ if(myData && myData->file) if(fclose(myData->file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", myData->filename))); PG_RETURN_INT32(0); } if (myData == NULL) { /* first call. do any desired init */ const char *p_name = "demoprot"; DemoUri *parsed_url; char *url = EXTPROTOCOL_GET_URL(fcinfo); myData = palloc(sizeof(extprotocol_t)); myData->url = pstrdup(url); parsed_url = ParseDemoUri(myData->url); myData->filename = pstrdup(parsed_url->path); if(strcasecmp(parsed_url->protocol, p_name) != 0) elog(ERROR, "internal error: demoprot called with a different protocol (%s)", parsed_url->protocol); FreeDemoUri(parsed_url); /* open the destination file (or connect to remote server in other cases) */ myData->file = fopen(myData->filename, "a"); if (myData->file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("demoprot_export: could not open file \"%s\" for writing: %m", myData->filename))); EXTPROTOCOL_SET_USER_CTX(fcinfo, myData); } /* ======================================================================= * DO THE EXPORT * ======================================================================= */ data = EXTPROTOCOL_GET_DATABUF(fcinfo); datlen = EXTPROTOCOL_GET_DATALEN(fcinfo); if(datlen > 0) { wrote = fwrite(data, 1, datlen, myData->file); if (ferror(myData->file)) ereport(ERROR, (errcode_for_file_access(), errmsg("demoprot_import: could not read from file \"%s\": %m", myData->filename))); } PG_RETURN_INT32((int)wrote); } Datum demoprot_validate_urls(PG_FUNCTION_ARGS) { int nurls; int i; ValidatorDirection direction; /* Must be called via the external table format manager */ if (!CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo)) elog(ERROR, "demoprot_validate_urls: not called by external protocol manager"); nurls = EXTPROTOCOL_VALIDATOR_GET_NUM_URLS(fcinfo); direction = EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo); /* * Dumb example 1: search each url for a substring * we don't want to be used in a url. in this example * it's 'secured_directory'. */ for (i = 1 ; i <= nurls ; i++) { char *url = EXTPROTOCOL_VALIDATOR_GET_NTH_URL(fcinfo, i); if (strstr(url, "secured_directory") != 0) { ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("using 'secured_directory' in a url isn't allowed "))); } } /* * Dumb example 2: set a limit on the number of urls * used. In this example we limit readable external * tables that use our protocol to 2 urls max. */ if(direction == EXT_VALIDATE_READ && nurls > 2) { ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("more than 2 urls aren't allowed in this protocol "))); } PG_RETURN_VOID(); } /* --- utility functions --- */ static DemoUri *ParseDemoUri(const char *uri_str) { DemoUri *uri = (DemoUri *) palloc0(sizeof(DemoUri)); int protocol_len; uri->path = NULL; uri->protocol = NULL; /* * parse protocol */ char *post_protocol = strstr(uri_str, "://"); if(!post_protocol) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid demo prot URI \'%s\'", uri_str))); } protocol_len = post_protocol - uri_str; uri->protocol = (char *) palloc0 (protocol_len + 1); strncpy(uri->protocol, uri_str, protocol_len); /* make sure there is more to the uri string */ if (strlen(uri_str) <= protocol_len) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid demo prot URI \'%s\' : missing path", uri_str))); /* * parse path */ uri->path = pstrdup(uri_str + protocol_len + strlen("://")); return uri; } static void FreeDemoUri(DemoUri *uri) { if (uri->path) pfree(uri->path); if (uri->protocol) pfree(uri->protocol); pfree(uri); }Протокол определен с именем
demoprot. Он указывается в выраженииLOCATIONкомандыCREATE EXTERNAL TABLEкак имя протокола и путь к файлу, разделенные://, следующим образом:LOCATION('demoprot://<path>')В протоколе реализованы три функции:
-
demoprot_import()— функция чтения. -
demoprot_export()— функция записи. -
demoprot_validate_urls()— функция валидации.
-
-
В каталоге /tmp мастер-хоста создайте реализацию форматтера demoformatter.c на языке C:
#include "postgres.h" #include "fmgr.h" #include "funcapi.h" #include "access/formatter.h" #include "catalog/pg_proc.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/typcache.h" #include "utils/syscache.h" /* Do the module magic dance */ PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(formatter_export); PG_FUNCTION_INFO_V1(formatter_import); Datum formatter_export(PG_FUNCTION_ARGS); Datum formatter_import(PG_FUNCTION_ARGS); typedef struct { int ncols; Datum *values; bool *nulls; int buflen; bytea *buffer; } format_t; /* * Maximum size string to support, affects allocation size of the tuple buffer. * Only used for variable length strings. For strings with a declared typmod * we allow that size even if it is larger than this. */ #define MAX_FORMAT_STRING 4096 /* * Our format converts all NULLs to real values, for floats that value is NaN */ #define NULL_FLOAT8_VALUE get_float8_nan() Datum formatter_export(PG_FUNCTION_ARGS) { HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0); TupleDesc tupdesc; HeapTupleData tuple; int ncolumns = 0; format_t *myData; char *data; int datlen; int i; /* Must be called via the external table format manager */ if (!CALLED_AS_FORMATTER(fcinfo)) elog(ERROR, "formatter_export: not called by format manager"); tupdesc = FORMATTER_GET_TUPDESC(fcinfo); /* Get our internal description of the formatter */ ncolumns = tupdesc->natts; myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo); if (myData == NULL) { myData = palloc(sizeof(format_t)); myData->ncols = ncolumns; myData->values = palloc(sizeof(Datum) * ncolumns); myData->nulls = palloc(sizeof(bool) * ncolumns); /* Determine required buffer size */ myData->buflen = 0; for (i = 0; i < ncolumns; i++) { Oid type = tupdesc->attrs[i]->atttypid; int32 typmod = tupdesc->attrs[i]->atttypmod; /* Don't know how to format dropped columns, error for now */ if (tupdesc->attrs[i]->attisdropped) elog(ERROR, "formatter_export: dropped columns"); switch (type) { case FLOAT8OID: { myData->buflen += sizeof(double); break; } case VARCHAROID: case BPCHAROID: case TEXTOID: { myData->buflen += (typmod > 0) ? typmod : MAX_FORMAT_STRING; break; } default: { elog(ERROR, "formatter_export error: unsupported data type"); break; } } } myData->buflen = Max(128, myData->buflen); /* allocate at least 128 bytes */ myData->buffer = palloc(myData->buflen + VARHDRSZ); FORMATTER_SET_USER_CTX(fcinfo, myData); } if (myData->ncols != ncolumns) elog(ERROR, "formatter_export: unexpected change of output record type"); /* break the input tuple into fields */ tuple.t_len = HeapTupleHeaderGetDatumLength(rec); ItemPointerSetInvalid(&(tuple.t_self)); tuple.t_data = rec; heap_deform_tuple(&tuple, tupdesc, myData->values, myData->nulls); datlen = 0; data = VARDATA(myData->buffer); /* ======================================================================= * MAIN FORMATTING CODE * * Currently this code assumes: * - Homogoneos hardware => No need to convert data to network byte order * - Support for TEXT/VARCHAR/BPCHAR/FLOAT8 only * - Length Prefixed strings * - No end of record tags, checksums, or optimizations for alignment. * - NULL values are cast to some sensible default value (NaN, "") * * ======================================================================= */ for (i = 0; i < ncolumns; i++) { Oid type = tupdesc->attrs[i]->atttypid; int typmod = tupdesc->attrs[i]->atttypmod; Datum val = myData->values[i]; bool nul = myData->nulls[i]; switch (type) { case FLOAT8OID: { float8 value; if (datlen + sizeof(value) >= myData->buflen) elog(ERROR, "formatter_export: buffer too small"); if (nul) value = NULL_FLOAT8_VALUE; else value = DatumGetFloat8(val); memcpy(&data[datlen], &value, sizeof(value)); datlen += sizeof(value); break; } case TEXTOID: case VARCHAROID: case BPCHAROID: { text *str; int32 len; if (nul) { str = NULL; len = 0; } else { str = DatumGetTextP(val); len = VARSIZE(str) - VARHDRSZ; if (typmod < 0) len = Min(len, MAX_FORMAT_STRING); } if (datlen + sizeof(len) + len >= myData->buflen) elog(ERROR, "formatter_export: buffer too small"); memcpy(&data[datlen], &len, sizeof(len)); datlen += sizeof(len); if (len > 0) { memcpy(&data[datlen], VARDATA(str), len); datlen += len; } break; } default: elog(ERROR, "formatter_export: unsupported datatype"); break; } } /* ======================================================================= */ SET_VARSIZE(myData->buffer, datlen + VARHDRSZ); PG_RETURN_BYTEA_P(myData->buffer); } Datum formatter_import(PG_FUNCTION_ARGS) { HeapTuple tuple; TupleDesc tupdesc; MemoryContext m, oldcontext; format_t *myData; char *data_buf; int ncolumns = 0; int data_cur; int data_len; int i; /* Must be called via the external table format manager */ if (!CALLED_AS_FORMATTER(fcinfo)) elog(ERROR, "formatter_import: not called by format manager"); tupdesc = FORMATTER_GET_TUPDESC(fcinfo); /* Get our internal description of the formatter */ ncolumns = tupdesc->natts; myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo); if (myData == NULL) { myData = palloc(sizeof(format_t)); myData->ncols = ncolumns; myData->values = palloc(sizeof(Datum) * ncolumns); myData->nulls = palloc(sizeof(bool) * ncolumns); /* misc verification */ for (i = 0; i < ncolumns; i++) { Oid type = tupdesc->attrs[i]->atttypid; //int32 typmod = tupdesc->attrs[i]->atttypmod; /* Don't know how to format dropped columns, error for now */ if (tupdesc->attrs[i]->attisdropped) elog(ERROR, "formatter_import: dropped columns"); switch (type) { case FLOAT8OID: case VARCHAROID: case BPCHAROID: case TEXTOID: break; default: { elog(ERROR, "formatter_import error: unsupported data type"); break; } } } FORMATTER_SET_USER_CTX(fcinfo, myData); } if (myData->ncols != ncolumns) elog(ERROR, "formatter_import: unexpected change of output record type"); /* get our input data buf and number of valid bytes in it */ data_buf = FORMATTER_GET_DATABUF(fcinfo); data_len = FORMATTER_GET_DATALEN(fcinfo); data_cur = FORMATTER_GET_DATACURSOR(fcinfo); /* start clean */ MemSet(myData->values, 0, ncolumns * sizeof(Datum)); MemSet(myData->nulls, true, ncolumns * sizeof(bool)); /* ======================================================================= * MAIN FORMATTING CODE * * Currently this code assumes: * - Homogoneos hardware => No need to convert data to network byte order * - Support for TEXT/VARCHAR/BPCHAR/FLOAT8 only * - Length Prefixed strings * - No end of record tags, checksums, or optimizations for alignment. * - NULL values are cast to some sensible default value (NaN, "") * * ======================================================================= */ m = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo); oldcontext = MemoryContextSwitchTo(m); for (i = 0; i < ncolumns; i++) { Oid type = tupdesc->attrs[i]->atttypid; //int typmod = tupdesc->attrs[i]->atttypmod; int remaining = 0; int attr_len = 0; remaining = data_len - data_cur; switch (type) { case FLOAT8OID: { float8 value; attr_len = sizeof(value); if (remaining < attr_len) { MemoryContextSwitchTo(oldcontext); FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA); } memcpy(&value, data_buf + data_cur, attr_len); if(value != NULL_FLOAT8_VALUE) { myData->nulls[i] = false; myData->values[i] = Float8GetDatum(value); } /* TODO: check for nan? */ break; } case TEXTOID: case VARCHAROID: case BPCHAROID: { text* value; int32 len; bool nextlen = remaining >= sizeof(len); if (nextlen) { memcpy(&len, data_buf + data_cur, sizeof(len)); if (len < 0) elog(ERROR, "invalid length of varlen datatype: %d", len); } /* if len or data bytes don't exist in this buffer, return */ if (!nextlen || (nextlen && (remaining - sizeof(len) < len))) { MemoryContextSwitchTo(oldcontext); FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA); } if (len > 0) { value = (text *) palloc(len + VARHDRSZ); SET_VARSIZE(value, len + VARHDRSZ); memcpy(VARDATA(value), data_buf + data_cur + sizeof(len), len); myData->nulls[i] = false; myData->values[i] = PointerGetDatum(value); } attr_len = len + sizeof(len); break; } default: elog(ERROR, "formatter_import: unsupported datatype"); break; } /* add byte length of last attribute to the temporary cursor */ data_cur += attr_len; } /* ======================================================================= */ MemoryContextSwitchTo(oldcontext); FORMATTER_SET_DATACURSOR(fcinfo, data_cur); tuple = heap_form_tuple(tupdesc, myData->values, myData->nulls); /* hack... pass tuple here. don't free prev tuple - the executor does it */ ((FormatterData*) fcinfo->context)->fmt_tuple = tuple; FORMATTER_RETURN_TUPLE(tuple); }В форматтере реализованы функции
formatter_exportиformatter_import, а также поддерживается загрузка и выгрузка данных в двоичном формате. -
Используйте компилятор языка C
ccдля компиляции исходного кода в общие объекты, которые можно динамически загрузить в Greengage DB:$ cd /tmp || exit $ cc -fpic -c /tmp/demoprotocol.c -I $(pg_config --includedir-server) && \ cc -shared -o /tmp/demoprotocol.so /tmp/demoprotocol.o $ cc -fpic -c /tmp/demoformatter.c -I $(pg_config --includedir-server) && \ cc -shared -o /tmp/demoformatter.so /tmp/demoformatter.oгде:
-
-fpic— указывает на создание позиционно-независимого кода (Position-Independent Code, PIC). Объектный файл должен быть позиционно-независимым, чтобы его можно было загрузить в любое произвольное место в памяти Greengage DB. -
-c— компилирует исходный код без компоновки и создает объектный файл (demoprotocol.o). -
-I— указывает местоположение заголовочных файлов. Данные файлы располагаются в подкаталогах $GPHOME/include/postgresql/. Точное местоположение можно выяснить с помощью следующей команды:$ pg_config --includedir-server -
-shared— указывает на создание общего объекта (общий файл библиотеки). -
-o— указывает имя файла общего объекта (demoprotocol.so).
ПРИМЕЧАНИЕБолее подробную информацию об опциях компилятора
ccможно получить в официальной документации GCC. Более подробную информацию о компиляции и компоновке динамически подгружаемых функций для создания общих библиотек в различных операционных системах можно получить в документации PostgreSQL. -
-
Скопируйте скомпилированный код (файлы библиотек demoprotocol.so и demoformatter.so) в один и тот же каталог на каждом хосте экземпляра Greengage DB (мастер и все сегменты). Местоположение должно быть указано в
LD_LIBRARY_PATH, чтобы сервер мог найти эти файлы. Рекомендуется располагать общие библиотеки либо относительно$libdir, либо через путь к динамическим библиотекам (устанавливается параметром конфигурации сервераdynamic_library_path). Вы можете выяснить, на какой каталог указывает$libdir, выполнив следующую команду:$ pg_config --pkglibdirДля обновления всех сегментов воспользуйтесь утилитой Greengage DB
gpscp:$ gpscp -f ~/hostfile_all_hosts /tmp/demoprotocol.so /tmp/demoformatter.so =:"$(pg_config --pkglibdir)"где
<hostfile_all_hosts>— это список хостов исходного кластера, который использовался для его инициализации. -
В базе данных
customersобъявите функции протокола, ссылающиеся на файл demoprotocol.so и имена соответствующих функций:CREATE OR REPLACE FUNCTION write_to_file() RETURNS integer AS '$libdir/demoprotocol.so', 'demoprot_export' LANGUAGE C STABLE; CREATE OR REPLACE FUNCTION read_from_file() RETURNS integer AS '$libdir/demoprotocol.so', 'demoprot_import' LANGUAGE C STABLE; CREATE OR REPLACE FUNCTION validate_urls() RETURNS void AS '$libdir/demoprotocol.so', 'demoprot_validate_urls' LANGUAGE C STABLE; -
Создайте протокол, обращающийся к этим функциям:
CREATE TRUSTED PROTOCOL demoprot( writefunc='write_to_file', readfunc='read_from_file', validatorfunc='validate_urls' ); -
Объявите функции форматирования, ссылающиеся на файл demoformatter.so и имена соответствующих функций:
CREATE OR REPLACE FUNCTION formatter_export(record) RETURNS bytea AS '$libdir/demoformatter.so', 'formatter_export' LANGUAGE C STABLE; CREATE OR REPLACE FUNCTION formatter_import() RETURNS record AS '$libdir/demoformatter.so', 'formatter_import' LANGUAGE C STABLE; -
Создайте пишущую внешнюю таблицу
customers_w. В выраженииLOCATIONукажите протоколdemoprotи выходной файл customers.demo. В выраженииFORMATукажитеCUSTOMв качестве формата данных иformatter_export— в качестве функции форматирования:CREATE WRITABLE EXTERNAL TABLE customers_w ( id TEXT, name TEXT, email TEXT, address TEXT ) LOCATION('demoprot://customers.demo') FORMAT 'CUSTOM' (FORMATTER=formatter_export) DISTRIBUTED BY (id); -
Вставьте тестовые данные в таблицу
customers_w:INSERT INTO customers_w (id, name, email, address) VALUES (1,'John Doe','john.doe@example.com','123 Elm Street'), (2,'Jane Smith','jane.smith@example.com','456 Oak Avenue'), (3,'Bob Brown','bob.brown@example.com','789 Pine Street'), (4,'Rob Stuart','rob.stuart@example.com','119 Willow Street'), (5, 'Alice Johnson', 'alice.johnson@example.com', '221 Maple Drive'), (6, 'David Lee', 'david.lee@example.com', '333 Birch Lane'), (7, 'Emily Wilson', 'emily.wilson@example.com', '444 Cedar Court'), (8, 'Michael Garcia', 'michael.garcia@example.com', '555 Walnut Place');Запуск команды создает двоичный файл customers.demo в каталоге данных каждого сегмента.
-
Проверьте, что файлы созданы. Для просмотра содержимого каталогов данных всех сегментов воспользуйтесь утилитой Greengage DB
gpssh:$ gpssh -f ~/hostfile_segment_hosts -e 'ls /data1/primary/gpseg*/customers.demo'где
<hostfile_segment_hosts>— это список сегмент-хостов кластера.Вывод должен выглядеть подобным образом:
[sdw1] /data1/primary/gpseg0/customers.demo /data1/primary/gpseg1/customers.demo [sdw2] /data1/primary/gpseg2/customers.demo /data1/primary/gpseg3/customers.demo
-
Создайте читающую внешнюю таблицу
customers_r. В выраженииLOCATIONукажите протоколdemoprotи файл исходных данных customers.demo. В выраженииFORMATукажитеCUSTOMв качестве формата данных иformatter_import— в качестве функции форматирования:CREATE EXTERNAL TABLE customers_r ( id TEXT, name TEXT, email TEXT, address TEXT ) LOCATION('demoprot://customers.demo') FORMAT 'CUSTOM' (FORMATTER=formatter_import); -
Выполните запрос к таблице:
SELECT * FROM customers_r;Вывод должен выглядеть следующим образом:
id | name | email | address ----+----------------+----------------------------+------------------- 1 | John Doe | john.doe@example.com | 123 Elm Street 5 | Alice Johnson | alice.johnson@example.com | 221 Maple Drive 6 | David Lee | david.lee@example.com | 333 Birch Lane 2 | Jane Smith | jane.smith@example.com | 456 Oak Avenue 3 | Bob Brown | bob.brown@example.com | 789 Pine Street 4 | Rob Stuart | rob.stuart@example.com | 119 Willow Street 7 | Emily Wilson | emily.wilson@example.com | 444 Cedar Court 8 | Michael Garcia | michael.garcia@example.com | 555 Walnut Place (8 rows)