Привет, Я DocuDroid!
Оценка ИИ поиска
Спасибо за оценку нашего ИИ поиска!
Мы будем признательны, если вы поделитесь своими впечатлениями, чтобы мы могли улучшить наш ИИ поиск для вас и других читателей.
GitHub

Использование кастомных форматов и протоколов для внешних таблиц

Антон Монаков

Greengage DB по умолчанию позволяет загружать и выгружать внешние данные через внешние таблицы в формате текста с разделителями (TEXT) или значений, разделенных запятыми (CSV), используя один из поддерживаемых встроенных протоколов.

Вы можете использовать другие форматы, определив собственный формат данных или протокол соединения. Кастомный формат позволяет работать с данными, хранящимися в произвольном формате. Кастомный протокол позволяет подключить Greengage DB к источнику данных, не доступному с использованием других протоколов.

Кастомные форматы и протоколы реализуются путем подключения пользовательского кода (например, общей библиотеки) в Greengage DB посредством динамической загрузки.

Предварительные требования

Для выполнения примеров из этой статьи подключитесь к мастер-хосту Greengage DB как gpadmin с помощью psql, как описано в статье Подключение к Greengage DB с использованием psql. Затем создайте тестовую базу данных customers и подключитесь к ней:

DROP DATABASE IF EXISTS customers;
CREATE DATABASE customers;
\c customers

Кастомный формат данных

Кастомный формат позволяет работать с данными, хранящимися в произвольном формате. Общие шаги работы с кастомным форматом данных следующие:

  1. Реализуйте и скомпилируйте функции ввода и вывода как общую библиотеку-форматтер.

  2. Укажите функцию общей библиотеки с помощью команды CREATE FUNCTION в Greengage DB.

  3. При определении внешней таблицы укажите собственный формат данных в выражении FORMAT команды CREATE EXTERNAL TABLE:

    FORMAT 'CUSTOM' (FORMATTER=<format_function>, key1=val1,...keyN=valN)

    где:

    • FORMATTER указывает функцию форматирования (<format_function>), применяемую к данным.

    • key1=val1,…​keyN=valN указывает разделенный запятыми список параметров функции форматирования.

В Greengage DB встроены функции форматирования данных фиксированной ширины. Для форматирования данных переменной ширины вам потребуется реализовать функции форматирования самостоятельно (см. Пример использования).

Форматирование данных фиксированной ширины

В файлах данных фиксированной ширины на каждое поле отводится определенное количество символов. Чтобы использовать данные фиксированной ширины с внешней таблицей, в выражении FORMAT команды CREATE EXTERNAL TABLE укажите одну из встроенных функций форматирования (fixedwidth_in для чтения данных или fixedwidth_out — для записи) следующим образом:

FORMAT 'CUSTOM' (
    formatter=fixedwidth_in | fixedwidth_out,
    <field_name1>=<field_length1>,
    ...,
    <field_nameN>=<field_lengthN>,
    preserve_blanks=on | off
    null='null_string_value'
    line_delim='line_ending_value'
)

Установите значения параметров функции форматирования.

Параметр Описание

fixedwidth_in

Указывает функцию форматирования, используемую для чтения данных

fixedwidth_out

Указывает функцию форматирования, используемую для записи данных

<field_name>=<field_length>

Указывает список полей и их длину.

Поля должны быть перечислены в порядке их физического следования в файле данных. Имена полей должны совпадать с именами столбцов в команде CREATE EXTERNAL TABLE

preserve_blanks

Определяет, должны ли сохраняться конечные пробелы.

По умолчанию конечные пробелы обрезаются (off)

null

Указывает значение нулевого символа.

Если параметру preserve_blanks присвоено значение on, то требуется указать и значение нулевого символа.

Если параметру preserve_blanks присвоено значение off, значение null не указано, а поле содержит только пробелы, в таблицу Greengage DB будет записано значение NULL. В противном случае, если значение null указано, в таблицу будет записана пустая строка

line_delim

Указывает символ переноса строки.

Нижеприведенные примеры покрывают большинство случаев (E обозначает открывающий символ спецпоследовательности):

  • line_delim=E'\n'

  • line_delim=E'\r'

  • line_delim=E'\r\n'

  • line_delim='<custom_string>'

Пример

В данном примере используется файл customers.txt со строками фиксированной ширины. Файл располагается в каталоге /tmp сегмент-хоста sdw1 и имеет следующее содержимое:

1     John Doe       New York
2   Jane Smith    Los Angeles
3    Alice Lee  San Francisco
  1. На мастер-хосте запустите команду CREATE EXTERNAL TABLE. В выражении FORMAT укажите CUSTOM в качестве формата данных, в качестве функции форматирования установите fixedwidth_in и перечислите поля и их длины:

    CREATE EXTERNAL TABLE customers
    (
        id INTEGER,
        name VARCHAR(20),
        city VARCHAR(20)
    )
        LOCATION ('file://sdw1/tmp/customers.txt')
        FORMAT 'CUSTOM' (FORMATTER=fixedwidth_in, id='4', name='12', city='13');
  2. Выполните запрос к созданной таблице:

    SELECT * FROM customers;

    Вывод должен выглядеть следующим образом:

     id |    name    |     city
    ----+------------+---------------
      1 |   John Doe |      New York
      2 | Jane Smith |   Los Angeles
      3 |  Alice Lee | San Francisco

Кастомный протокол

Кастомный протокол позволяет подключить Greengage DB к источнику данных, не доступному с использованием поддерживаемых встроенных протоколов. Кастомные протоколы можно использовать, когда имеющихся встроенных протоколов недостаточно для конкретной потребности. Кастомный протокол потребуется реализовать самостоятельно (см. Пример использования).

Общие шаги работы с кастомным протоколом следующие:

  1. Реализуйте на C функции отправки, приема и (опционально) валидации с указанными интерфейсами.

  2. Скомпилируйте функции в общий объект (.so).

  3. В Greengage DB используйте команду CREATE FUNCTION и объявите функции базы данных, которые указывают на файл .so и имена функций.

  4. Используйте команду CREATE TRUSTED PROTOCOL для включения протокола в базу данных. Вы должны быть суперпользователем для создания и регистрации кастомного протокола.

  5. Используйте команду GRANT, чтобы выдать пользователям соответствующие привилегии, например:

    • Разрешить пользователю создание читающих внешних таблиц с использованием доверенного протокола:

      GRANT SELECT ON PROTOCOL <protocol_name> TO <user_name>;
    • Разрешить пользователю создание пишущих внешних таблиц с использованием доверенного протокола:

      GRANT INSERT ON PROTOCOL <protocol_name> TO <user_name>;
    • Разрешить пользователю создание читающих и пишущих внешних таблиц с использованием доверенного протокола:

      GRANT ALL ON PROTOCOL <protocol_name> TO <user_name>;

      Более подробную информацию об управлении доступом в Greengage DB можно получить в статье Роли и привилегии.

Пример использования

Этот пример демонстрирует реализацию API Greengage DB для пользовательского протокола доступа к данным и пользовательского формата данных. В репозитории исходного кода Greengage DB вы можете просмотреть полный API функций ввода/вывода и форматирования внешних таблиц.

  1. В каталоге /tmp мастер-хоста создайте файл demoprotocol.c, содержащий реализацию протокола на языке C:

    #include "postgres.h"
    #include "fmgr.h"
    #include "funcapi.h"
    
    #include "access/extprotocol.h"
    #include "catalog/pg_proc.h"
    #include "commands/defrem.h"
    #include "utils/array.h"
    #include "utils/builtins.h"
    #include "utils/memutils.h"
    
    #include "catalog/pg_exttable.h"
    
    
    typedef struct DemoUri
    {
    	char	   *protocol;
    	char	   *path;
    
    }	DemoUri;
    
    static DemoUri *ParseDemoUri(const char *uri_str);
    static void FreeDemoUri(DemoUri* uri);
    
    
    /* Do the module magic dance */
    PG_MODULE_MAGIC;
    PG_FUNCTION_INFO_V1(demoprot_export);
    PG_FUNCTION_INFO_V1(demoprot_import);
    PG_FUNCTION_INFO_V1(demoprot_validate_urls);
    
    Datum demoprot_export(PG_FUNCTION_ARGS);
    Datum demoprot_import(PG_FUNCTION_ARGS);
    Datum demoprot_validate_urls(PG_FUNCTION_ARGS);
    
    
    typedef struct {
    	char	  *url;
    	char	  *filename;
    	FILE	  *file;
    } extprotocol_t;
    
    static void check_ext_options(const FunctionCallInfo fcinfo)
    {
    	ListCell *cell;
    	Relation rel = EXTPROTOCOL_GET_RELATION(fcinfo);
    	ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id);
    	List *options = exttbl->options;
    
    	foreach(cell, options) {
    		DefElem *def = (DefElem *) lfirst(cell);
    		char *key = def->defname;
    		char *value = defGetString(def);
    
    		if (key && strcasestr(key, "database") && !strcasestr(value, "greengage")) {
    				ereport(ERROR, (0, errmsg("This is greengage.")));
    		}
    	}
    }
    
    /*
     * Import data into GPDB.
     */
    Datum
    demoprot_import(PG_FUNCTION_ARGS)
    {
    	extprotocol_t   *myData;
    	char			*data;
    	int				 datlen;
    	size_t			 nread = 0;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_EXTPROTOCOL(fcinfo))
    		elog(ERROR, "extprotocol_import: not called by external protocol manager");
    
    	/* Get our internal description of the protocol */
    	myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
    
    	if(EXTPROTOCOL_IS_LAST_CALL(fcinfo))
    	{
    		/* we're done receiving data. close our connection */
    		if(myData && myData->file)
    			if(fclose(myData->file))
    				ereport(ERROR,
    						(errcode_for_file_access(),
    						 errmsg("could not close file \"%s\": %m",
    								 myData->filename)));
    
    		PG_RETURN_INT32(0);
    	}
    
    	if (myData == NULL)
    	{
    		/* first call. do any desired init */
    
    		const char	*p_name = "demoprot";
    		DemoUri		*parsed_url;
    		char		*url = EXTPROTOCOL_GET_URL(fcinfo);
    
    		myData 			 = palloc(sizeof(extprotocol_t));
    
    		myData->url 	 = pstrdup(url);
    		parsed_url 		 = ParseDemoUri(myData->url);
    		myData->filename = pstrdup(parsed_url->path);
    
    		if(strcasecmp(parsed_url->protocol, p_name) != 0)
    			elog(ERROR, "internal error: demoprot called with a different protocol (%s)",
    						parsed_url->protocol);
    
    		/* An example of checking options */
    		check_ext_options(fcinfo);
    
    		FreeDemoUri(parsed_url);
    
    		/* open the destination file (or connect to remote server in other cases) */
    		myData->file = fopen(myData->filename, "r");
    
    		if (myData->file == NULL)
    			ereport(ERROR,
    					(errcode_for_file_access(),
    					 errmsg("demoprot_import: could not open file \"%s\" for reading: %m",
    							myData->filename)));
    
    		EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
    	}
    
    	/* =======================================================================
    	 *                            DO THE IMPORT
    	 * ======================================================================= */
    
    	data 	= EXTPROTOCOL_GET_DATABUF(fcinfo);
    	datlen 	= EXTPROTOCOL_GET_DATALEN(fcinfo);
    
    	if(datlen > 0)
    	{
    		nread = fread(data, 1, datlen, myData->file);
    		if (ferror(myData->file))
    			ereport(ERROR,
    					(errcode_for_file_access(),
    					 errmsg("demoprot_import: could not write to file \"%s\": %m",
    							 myData->filename)));
    	}
    
    
    	PG_RETURN_INT32((int)nread);
    }
    
    /*
     * Export data out of GPDB.
     */
    Datum
    demoprot_export(PG_FUNCTION_ARGS)
    {
    	extprotocol_t   *myData;
    	char			*data;
    	int				 datlen;
    	size_t			 wrote = 0;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_EXTPROTOCOL(fcinfo))
    		elog(ERROR, "extprotocol_export: not called by external protocol manager");
    
    	/* Get our internal description of the protocol */
    	myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
    
    	if(EXTPROTOCOL_IS_LAST_CALL(fcinfo))
    	{
    		/* we're done sending data. close our connection */
    		if(myData && myData->file)
    			if(fclose(myData->file))
    				ereport(ERROR,
    						(errcode_for_file_access(),
    						 errmsg("could not close file \"%s\": %m",
    								 myData->filename)));
    
    		PG_RETURN_INT32(0);
    	}
    
    	if (myData == NULL)
    	{
    		/* first call. do any desired init */
    
    		const char	*p_name = "demoprot";
    		DemoUri		*parsed_url;
    		char		*url = EXTPROTOCOL_GET_URL(fcinfo);
    
    		myData 			 = palloc(sizeof(extprotocol_t));
    
    		myData->url 	 = pstrdup(url);
    		parsed_url 		 = ParseDemoUri(myData->url);
    		myData->filename = pstrdup(parsed_url->path);
    
    		if(strcasecmp(parsed_url->protocol, p_name) != 0)
    			elog(ERROR, "internal error: demoprot called with a different protocol (%s)",
    						parsed_url->protocol);
    
    		FreeDemoUri(parsed_url);
    
    		/* open the destination file (or connect to remote server in other cases) */
    		myData->file = fopen(myData->filename, "a");
    
    		if (myData->file == NULL)
    			ereport(ERROR,
    					(errcode_for_file_access(),
    					 errmsg("demoprot_export: could not open file \"%s\" for writing: %m",
    							myData->filename)));
    
    		EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
    	}
    
    	/* =======================================================================
    	 *                            DO THE EXPORT
    	 * ======================================================================= */
    
    	data 	= EXTPROTOCOL_GET_DATABUF(fcinfo);
    	datlen 	= EXTPROTOCOL_GET_DATALEN(fcinfo);
    
    	if(datlen > 0)
    	{
    		wrote = fwrite(data, 1, datlen, myData->file);
    		if (ferror(myData->file))
    			ereport(ERROR,
    					(errcode_for_file_access(),
    					 errmsg("demoprot_import: could not read from file \"%s\": %m",
    							 myData->filename)));
    	}
    
    	PG_RETURN_INT32((int)wrote);
    }
    
    Datum
    demoprot_validate_urls(PG_FUNCTION_ARGS)
    {
    	int					nurls;
    	int					i;
    	ValidatorDirection	direction;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo))
    		elog(ERROR, "demoprot_validate_urls: not called by external protocol manager");
    
    	nurls 		= EXTPROTOCOL_VALIDATOR_GET_NUM_URLS(fcinfo);
    	direction 	= EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo);
    
    	/*
    	 * Dumb example 1: search each url for a substring
    	 * we don't want to be used in a url. in this example
    	 * it's 'secured_directory'.
    	 */
    	for (i = 1 ; i <= nurls ; i++)
    	{
    		char *url = EXTPROTOCOL_VALIDATOR_GET_NTH_URL(fcinfo, i);
    
    		if (strstr(url, "secured_directory") != 0)
    		{
                ereport(ERROR,
                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                         errmsg("using 'secured_directory' in a url isn't allowed ")));
    		}
    	}
    
    	/*
    	 * Dumb example 2: set a limit on the number of urls
    	 * used. In this example we limit readable external
    	 * tables that use our protocol to 2 urls max.
    	 */
    	if(direction == EXT_VALIDATE_READ && nurls > 2)
    	{
            ereport(ERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("more than 2 urls aren't allowed in this protocol ")));
    	}
    
    	PG_RETURN_VOID();
    }
    
    /* --- utility functions --- */
    
    static
    DemoUri *ParseDemoUri(const char *uri_str)
    {
    	DemoUri	   *uri = (DemoUri *) palloc0(sizeof(DemoUri));
    	int			protocol_len;
    
     	uri->path = NULL;
     	uri->protocol = NULL;
    
    	/*
    	 * parse protocol
    	 */
    	char *post_protocol = strstr(uri_str, "://");
    
    	if(!post_protocol)
    	{
    		ereport(ERROR,
    				(errcode(ERRCODE_SYNTAX_ERROR),
    				 errmsg("invalid demo prot URI \'%s\'", uri_str)));
    	}
    
    	protocol_len = post_protocol - uri_str;
    	uri->protocol = (char *) palloc0 (protocol_len + 1);
    	strncpy(uri->protocol, uri_str, protocol_len);
    
    	/* make sure there is more to the uri string */
    	if (strlen(uri_str) <= protocol_len)
    		ereport(ERROR,
    				(errcode(ERRCODE_SYNTAX_ERROR),
    		errmsg("invalid demo prot URI \'%s\' : missing path", uri_str)));
    
    	/*
    	 * parse path
    	 */
    	uri->path = pstrdup(uri_str + protocol_len + strlen("://"));
    
    	return uri;
    }
    
    static
    void FreeDemoUri(DemoUri *uri)
    {
    	if (uri->path)
    		pfree(uri->path);
    	if (uri->protocol)
    		pfree(uri->protocol);
    
    	pfree(uri);
    }

    Протокол определен с именем demoprot. Он указывается в выражении LOCATION команды CREATE EXTERNAL TABLE как имя протокола и путь к файлу, разделенные ://, следующим образом:

    LOCATION('demoprot://<path>')

    В протоколе реализованы три функции:

    • demoprot_import() — функция чтения.

    • demoprot_export() — функция записи.

    • demoprot_validate_urls() — функция валидации.

  2. В каталоге /tmp мастер-хоста создайте реализацию форматтера demoformatter.c на языке C:

    #include "postgres.h"
    #include "fmgr.h"
    #include "funcapi.h"
    
    #include "access/formatter.h"
    #include "catalog/pg_proc.h"
    #include "utils/builtins.h"
    #include "utils/memutils.h"
    #include "utils/typcache.h"
    #include "utils/syscache.h"
    
    /* Do the module magic dance */
    PG_MODULE_MAGIC;
    PG_FUNCTION_INFO_V1(formatter_export);
    PG_FUNCTION_INFO_V1(formatter_import);
    
    Datum formatter_export(PG_FUNCTION_ARGS);
    Datum formatter_import(PG_FUNCTION_ARGS);
    
    
    typedef struct {
    	int        ncols;
    	Datum     *values;
    	bool      *nulls;
    	int        buflen;
    	bytea     *buffer;
    } format_t;
    
    
    /*
     * Maximum size string to support, affects allocation size of the tuple buffer.
     * Only used for variable length strings.  For strings with a declared typmod
     * we allow that size even if it is larger than this.
     */
    #define MAX_FORMAT_STRING 4096
    
    /*
     * Our format converts all NULLs to real values, for floats that value is NaN
     */
    #define NULL_FLOAT8_VALUE get_float8_nan()
    
    
    Datum
    formatter_export(PG_FUNCTION_ARGS)
    {
    	HeapTupleHeader		rec	= PG_GETARG_HEAPTUPLEHEADER(0);
    	TupleDesc           tupdesc;
    	HeapTupleData		tuple;
    	int                 ncolumns = 0;
    	format_t           *myData;
    	char               *data;
    	int                 datlen;
    	int                 i;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_FORMATTER(fcinfo))
    		elog(ERROR, "formatter_export: not called by format manager");
    
    	tupdesc = FORMATTER_GET_TUPDESC(fcinfo);
    
    	/* Get our internal description of the formatter */
    	ncolumns = tupdesc->natts;
    	myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo);
    	if (myData == NULL)
    	{
    		myData          = palloc(sizeof(format_t));
    		myData->ncols   = ncolumns;
    		myData->values  = palloc(sizeof(Datum) * ncolumns);
    		myData->nulls   = palloc(sizeof(bool) * ncolumns);
    
    		/* Determine required buffer size */
    		myData->buflen = 0;
    		for (i = 0; i < ncolumns; i++)
    		{
    			Oid   type   = tupdesc->attrs[i]->atttypid;
    			int32 typmod = tupdesc->attrs[i]->atttypmod;
    
    			/* Don't know how to format dropped columns, error for now */
    			if (tupdesc->attrs[i]->attisdropped)
    				elog(ERROR, "formatter_export: dropped columns");
    
    			switch (type)
    			{
    				case FLOAT8OID:
    				{
    					myData->buflen += sizeof(double);
    					break;
    				}
    
    				case VARCHAROID:
    				case BPCHAROID:
    				case TEXTOID:
    				{
    					myData->buflen += (typmod > 0) ? typmod : MAX_FORMAT_STRING;
    					break;
    				}
    
    				default:
    				{
    					elog(ERROR, "formatter_export error: unsupported data type");
    					break;
    				}
    			}
    		}
    
    		myData->buflen = Max(128, myData->buflen);  /* allocate at least 128 bytes */
    		myData->buffer = palloc(myData->buflen + VARHDRSZ);
    
    		FORMATTER_SET_USER_CTX(fcinfo, myData);
    	}
    	if (myData->ncols != ncolumns)
    		elog(ERROR, "formatter_export: unexpected change of output record type");
    
    	/* break the input tuple into fields */
    	tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
    	ItemPointerSetInvalid(&(tuple.t_self));
    	tuple.t_data = rec;
    	heap_deform_tuple(&tuple, tupdesc, myData->values, myData->nulls);
    
    	datlen = 0;
    	data = VARDATA(myData->buffer);
    
    
    	/* =======================================================================
    	 *                            MAIN FORMATTING CODE
    	 *
    	 * Currently this code assumes:
    	 *  - Homogoneos hardware => No need to convert data to network byte order
    	 *  - Support for TEXT/VARCHAR/BPCHAR/FLOAT8 only
    	 *  - Length Prefixed strings
    	 *  - No end of record tags, checksums, or optimizations for alignment.
    	 *  - NULL values are cast to some sensible default value (NaN, "")
    	 *
    	 * ======================================================================= */
    	for (i = 0; i < ncolumns; i++)
    	{
    		Oid	  type    = tupdesc->attrs[i]->atttypid;
    		int   typmod  = tupdesc->attrs[i]->atttypmod;
    		Datum val     = myData->values[i];
    		bool  nul     = myData->nulls[i];
    
    		switch (type)
    		{
    			case FLOAT8OID:
    			{
    				float8 value;
    
    				if (datlen + sizeof(value) >= myData->buflen)
    					elog(ERROR, "formatter_export: buffer too small");
    
    				if (nul)
    					value = NULL_FLOAT8_VALUE;
    				else
    					value = DatumGetFloat8(val);
    
    				memcpy(&data[datlen], &value, sizeof(value));
    				datlen += sizeof(value);
    				break;
    			}
    
    			case TEXTOID:
    			case VARCHAROID:
    			case BPCHAROID:
    			{
    				text  *str;
    				int32  len;
    
    				if (nul)
    				{
    					str = NULL;
    					len = 0;
    				}
    				else
    				{
    					str = DatumGetTextP(val);
    					len = VARSIZE(str) - VARHDRSZ;
    					if (typmod < 0)
    						len  = Min(len, MAX_FORMAT_STRING);
    				}
    
    				if (datlen + sizeof(len) + len >= myData->buflen)
    					elog(ERROR, "formatter_export: buffer too small");
    				memcpy(&data[datlen], &len, sizeof(len));
    				datlen += sizeof(len);
    
    				if (len > 0)
    				{
    					memcpy(&data[datlen], VARDATA(str), len);
    					datlen += len;
    				}
    				break;
    			}
    
    			default:
    				elog(ERROR, "formatter_export: unsupported datatype");
    				break;
    		}
    	}
    	/* ======================================================================= */
    
    	SET_VARSIZE(myData->buffer, datlen + VARHDRSZ);
    	PG_RETURN_BYTEA_P(myData->buffer);
    }
    
    Datum
    formatter_import(PG_FUNCTION_ARGS)
    {
    	HeapTuple			tuple;
    	TupleDesc           tupdesc;
    	MemoryContext 		m, oldcontext;
    	format_t           *myData;
    	char               *data_buf;
    	int                 ncolumns = 0;
    	int			  		data_cur;
    	int                 data_len;
    	int                 i;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_FORMATTER(fcinfo))
    		elog(ERROR, "formatter_import: not called by format manager");
    
    	tupdesc = FORMATTER_GET_TUPDESC(fcinfo);
    
    	/* Get our internal description of the formatter */
    	ncolumns = tupdesc->natts;
    	myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo);
    
    	if (myData == NULL)
    	{
    
    		myData          = palloc(sizeof(format_t));
    		myData->ncols   = ncolumns;
    		myData->values  = palloc(sizeof(Datum) * ncolumns);
    		myData->nulls   = palloc(sizeof(bool) * ncolumns);
    
    		/* misc verification */
    		for (i = 0; i < ncolumns; i++)
    		{
    			Oid   type   = tupdesc->attrs[i]->atttypid;
    			//int32 typmod = tupdesc->attrs[i]->atttypmod;
    
    			/* Don't know how to format dropped columns, error for now */
    			if (tupdesc->attrs[i]->attisdropped)
    				elog(ERROR, "formatter_import: dropped columns");
    
    			switch (type)
    			{
    				case FLOAT8OID:
    				case VARCHAROID:
    				case BPCHAROID:
    				case TEXTOID:
    					break;
    
    				default:
    				{
    					elog(ERROR, "formatter_import error: unsupported data type");
    					break;
    				}
    			}
    		}
    
    		FORMATTER_SET_USER_CTX(fcinfo, myData);
    
    	}
    	if (myData->ncols != ncolumns)
    		elog(ERROR, "formatter_import: unexpected change of output record type");
    
    	/* get our input data buf and number of valid bytes in it */
    	data_buf = FORMATTER_GET_DATABUF(fcinfo);
    	data_len = FORMATTER_GET_DATALEN(fcinfo);
    	data_cur = FORMATTER_GET_DATACURSOR(fcinfo);
    
    	/* start clean */
    	MemSet(myData->values, 0, ncolumns * sizeof(Datum));
    	MemSet(myData->nulls, true, ncolumns * sizeof(bool));
    
    	/* =======================================================================
    	 *                            MAIN FORMATTING CODE
    	 *
    	 * Currently this code assumes:
    	 *  - Homogoneos hardware => No need to convert data to network byte order
    	 *  - Support for TEXT/VARCHAR/BPCHAR/FLOAT8 only
    	 *  - Length Prefixed strings
    	 *  - No end of record tags, checksums, or optimizations for alignment.
    	 *  - NULL values are cast to some sensible default value (NaN, "")
    	 *
    	 * ======================================================================= */
    	m = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo);
    	oldcontext = MemoryContextSwitchTo(m);
    
    	for (i = 0; i < ncolumns; i++)
    	{
    		Oid		type    	= tupdesc->attrs[i]->atttypid;
    		//int	typmod		= tupdesc->attrs[i]->atttypmod;
    		int		remaining	= 0;
    		int		attr_len 	= 0;
    
    		remaining = data_len - data_cur;
    
    		switch (type)
    		{
    			case FLOAT8OID:
    			{
    				float8 value;
    
    				attr_len = sizeof(value);
    
    				if (remaining < attr_len)
    				{
    					MemoryContextSwitchTo(oldcontext);
    					FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
    				}
    
    				memcpy(&value, data_buf + data_cur, attr_len);
    
    				if(value != NULL_FLOAT8_VALUE)
    				{
    					myData->nulls[i] = false;
    					myData->values[i] = Float8GetDatum(value);
    				}
    
    				/* TODO: check for nan? */
    
    				break;
    			}
    
    			case TEXTOID:
    			case VARCHAROID:
    			case BPCHAROID:
    			{
    				text*	value;
    				int32	len;
    				bool	nextlen = remaining >= sizeof(len);
    
    				if (nextlen)
    				{
    					memcpy(&len, data_buf + data_cur, sizeof(len));
    
    					if (len < 0)
    						elog(ERROR, "invalid length of varlen datatype: %d",
    									len);
    				}
    
    				/* if len or data bytes don't exist in this buffer, return */
    				if (!nextlen || (nextlen && (remaining - sizeof(len) < len)))
    				{
    					MemoryContextSwitchTo(oldcontext);
    					FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
    				}
    
    				if (len > 0)
    				{
    					value = (text *) palloc(len + VARHDRSZ);
    					SET_VARSIZE(value, len + VARHDRSZ);
    
    					memcpy(VARDATA(value), data_buf + data_cur + sizeof(len), len);
    
    					myData->nulls[i] = false;
    					myData->values[i] = PointerGetDatum(value);
    				}
    
    				attr_len = len + sizeof(len);
    
    				break;
    			}
    
    			default:
    				elog(ERROR, "formatter_import: unsupported datatype");
    				break;
    		}
    
    		/* add byte length of last attribute to the temporary cursor */
    		data_cur += attr_len;
    
    	}
    	/* ======================================================================= */
    
    	MemoryContextSwitchTo(oldcontext);
    
    	FORMATTER_SET_DATACURSOR(fcinfo, data_cur);
    
    	tuple = heap_form_tuple(tupdesc, myData->values, myData->nulls);
    
    	/* hack... pass tuple here. don't free prev tuple - the executor does it  */
    	((FormatterData*) fcinfo->context)->fmt_tuple = tuple;
    
    	FORMATTER_RETURN_TUPLE(tuple);
    }

    В форматтере реализованы функции formatter_export и formatter_import, а также поддерживается загрузка и выгрузка данных в двоичном формате.

  3. Используйте компилятор языка C cc для компиляции исходного кода в общие объекты, которые можно динамически загрузить в Greengage DB:

    $ cd /tmp || exit
    $ cc -fpic -c /tmp/demoprotocol.c -I $(pg_config --includedir-server) && \
    cc -shared -o /tmp/demoprotocol.so /tmp/demoprotocol.o
    $ cc -fpic -c /tmp/demoformatter.c -I $(pg_config --includedir-server) && \
    cc -shared -o /tmp/demoformatter.so /tmp/demoformatter.o

    где:

    • -fpic — указывает на создание позиционно-независимого кода (Position-Independent Code, PIC). Объектный файл должен быть позиционно-независимым, чтобы его можно было загрузить в любое произвольное место в памяти Greengage DB.

    • -c — компилирует исходный код без компоновки и создает объектный файл (demoprotocol.o).

    • -I — указывает местоположение заголовочных файлов. Данные файлы располагаются в подкаталогах $GPHOME/include/postgresql/. Точное местоположение можно выяснить с помощью следующей команды:

      $ pg_config --includedir-server
    • -shared — указывает на создание общего объекта (общий файл библиотеки).

    • -o — указывает имя файла общего объекта (demoprotocol.so).

    ПРИМЕЧАНИЕ

    Более подробную информацию об опциях компилятора cc можно получить в официальной документации GCC. Более подробную информацию о компиляции и компоновке динамически подгружаемых функций для создания общих библиотек в различных операционных системах можно получить в документации PostgreSQL.

  4. Скопируйте скомпилированный код (файлы библиотек demoprotocol.so и demoformatter.so) в один и тот же каталог на каждом хосте экземпляра Greengage DB (мастер и все сегменты). Местоположение должно быть указано в LD_LIBRARY_PATH, чтобы сервер мог найти эти файлы. Рекомендуется располагать общие библиотеки либо относительно $libdir, либо через путь к динамическим библиотекам (устанавливается параметром конфигурации сервера dynamic_library_path). Вы можете выяснить, на какой каталог указывает $libdir, выполнив следующую команду:

    $ pg_config --pkglibdir

    Для обновления всех сегментов воспользуйтесь утилитой Greengage DB gpscp:

    $ gpscp -f ~/hostfile_all_hosts /tmp/demoprotocol.so /tmp/demoformatter.so =:"$(pg_config --pkglibdir)"

    где <hostfile_all_hosts> — это список хостов исходного кластера, который использовался для его инициализации.

  5. В базе данных customers объявите функции протокола, ссылающиеся на файл demoprotocol.so и имена соответствующих функций:

    CREATE OR REPLACE FUNCTION write_to_file() RETURNS integer AS
       '$libdir/demoprotocol.so', 'demoprot_export' LANGUAGE C STABLE;
    
    CREATE OR REPLACE FUNCTION read_from_file() RETURNS integer AS
        '$libdir/demoprotocol.so', 'demoprot_import' LANGUAGE C STABLE;
    
    CREATE OR REPLACE FUNCTION validate_urls() RETURNS void AS
        '$libdir/demoprotocol.so', 'demoprot_validate_urls' LANGUAGE C STABLE;
  6. Создайте протокол, обращающийся к этим функциям:

    CREATE TRUSTED PROTOCOL demoprot(
        writefunc='write_to_file',
        readfunc='read_from_file',
        validatorfunc='validate_urls'
    );
  7. Объявите функции форматирования, ссылающиеся на файл demoformatter.so и имена соответствующих функций:

    CREATE OR REPLACE FUNCTION formatter_export(record) RETURNS bytea
    AS '$libdir/demoformatter.so', 'formatter_export'
    LANGUAGE C STABLE;
    
    CREATE OR REPLACE FUNCTION formatter_import() RETURNS record
    AS '$libdir/demoformatter.so', 'formatter_import'
    LANGUAGE C STABLE;
  8. Создайте пишущую внешнюю таблицу customers_w. В выражении LOCATION укажите протокол demoprot и выходной файл customers.demo. В выражении FORMAT укажите CUSTOM в качестве формата данных и formatter_export — в качестве функции форматирования:

    CREATE WRITABLE EXTERNAL TABLE customers_w
    (
        id TEXT,
        name TEXT,
        email TEXT,
        address TEXT
    )
        LOCATION('demoprot://customers.demo')
        FORMAT 'CUSTOM' (FORMATTER=formatter_export)
        DISTRIBUTED BY (id);
  9. Вставьте тестовые данные в таблицу customers_w:

    INSERT INTO customers_w (id, name, email, address)
    VALUES (1,'John Doe','john.doe@example.com','123 Elm Street'),
           (2,'Jane Smith','jane.smith@example.com','456 Oak Avenue'),
           (3,'Bob Brown','bob.brown@example.com','789 Pine Street'),
           (4,'Rob Stuart','rob.stuart@example.com','119 Willow Street'),
           (5, 'Alice Johnson', 'alice.johnson@example.com', '221 Maple Drive'),
           (6, 'David Lee', 'david.lee@example.com', '333 Birch Lane'),
           (7, 'Emily Wilson', 'emily.wilson@example.com', '444 Cedar Court'),
           (8, 'Michael Garcia', 'michael.garcia@example.com', '555 Walnut Place');

    Запуск команды создает двоичный файл customers.demo в каталоге данных каждого сегмента.

  10. Проверьте, что файлы созданы. Для просмотра содержимого каталогов данных всех сегментов воспользуйтесь утилитой Greengage DB gpssh:

    $ gpssh -f ~/hostfile_segment_hosts -e 'ls /data1/primary/gpseg*/customers.demo'

    где <hostfile_segment_hosts> — это список сегмент-хостов кластера.

    Вывод должен выглядеть подобным образом:

    [sdw1] /data1/primary/gpseg0/customers.demo  /data1/primary/gpseg1/customers.demo
    [sdw2] /data1/primary/gpseg2/customers.demo  /data1/primary/gpseg3/customers.demo
  11. Создайте читающую внешнюю таблицу customers_r. В выражении LOCATION укажите протокол demoprot и файл исходных данных customers.demo. В выражении FORMAT укажите CUSTOM в качестве формата данных и formatter_import — в качестве функции форматирования:

    CREATE EXTERNAL TABLE customers_r
    (
        id TEXT,
        name TEXT,
        email TEXT,
        address TEXT
    )
        LOCATION('demoprot://customers.demo')
        FORMAT 'CUSTOM' (FORMATTER=formatter_import);
  12. Выполните запрос к таблице:

    SELECT * FROM customers_r;

    Вывод должен выглядеть следующим образом:

     id |      name      |           email            |      address
    ----+----------------+----------------------------+-------------------
      1 | John Doe       | john.doe@example.com       | 123 Elm Street
      5 | Alice Johnson  | alice.johnson@example.com  | 221 Maple Drive
      6 | David Lee      | david.lee@example.com      | 333 Birch Lane
      2 | Jane Smith     | jane.smith@example.com     | 456 Oak Avenue
      3 | Bob Brown      | bob.brown@example.com      | 789 Pine Street
      4 | Rob Stuart     | rob.stuart@example.com     | 119 Willow Street
      7 | Emily Wilson   | emily.wilson@example.com   | 444 Cedar Court
      8 | Michael Garcia | michael.garcia@example.com | 555 Walnut Place
    (8 rows)