Hello, I’m DocuDroid!
Submitting feedback
Thank you for rating our AI Search!
We would be grateful if you could share your thoughts so we can improve our AI Search for you and other readers.
GitHub

Use custom formats and protocols for external tables

Anton Monakov

Out of the box, Greengage DB lets you load and unload external data through external tables by using the delimited text (TEXT) or comma-separated values (CSV) formats and one of the supported built-in protocols.

You can use other formats by defining a custom format or a custom protocol. А custom format lets you work with data stored in an arbitrary format. A custom protocol lets you connect Greengage DB to a data source that cannot be accessed with other protocols.

Custom formats and protocols are implemented by incorporating user-written code (for example, a shared library) into Greengage DB through dynamic loading.

Prerequisites

To try out the practical examples listed in this topic, connect to the Greengage DB master host as gpadmin using psql as described in Connect to Greengage DB via psql. Then create the customers test database, and connect to it:

DROP DATABASE IF EXISTS customers;
CREATE DATABASE customers;
\c customers

Custom data format

А custom format lets you work with data stored in an arbitrary format. The general steps for working with a custom data format are as follows:

  1. Write and compile the input and output functions as a shared formatter library.

  2. Specify the shared library function with CREATE FUNCTION in Greengage DB.

  3. When defining an external table, specify the custom data format in the FORMAT clause of the CREATE EXTERNAL TABLE command:

    FORMAT 'CUSTOM' (FORMATTER=<format_function>, key1=val1,...keyN=valN)

    where:

    • FORMATTER specifies the formatting function (<format_function>) that is used to format the data.

    • key1=val1,…​keyN=valN specifies the comma-separated list of the function parameters.

Greengage DB provides built-in functions for formatting fixed-width data; for variable-width data, you must write the formatter functions (see Usage example).

Format fixed-width data

A fixed-width file is a data file where each field contains a certain fixed number of character positions. To use a fixed-width data file in an external table, specify the built-in formatter functions fixedwidth_in (for reading data) or fixedwidth_out (for writing data) in the FORMAT clause of the CREATE EXTERNAL TABLE command as follows:

FORMAT 'CUSTOM' (
    formatter=fixedwidth_in | fixedwidth_out,
    <field_name1>=<field_length1>,
    ...,
    <field_nameN>=<field_lengthN>,
    preserve_blanks=on | off
    null='null_string_value'
    line_delim='line_ending_value'
)

Specify the format function parameters as follows.

Option Description

fixedwidth_in

Specifies the format function used for reading data

fixedwidth_out

Specifies the format function used for writing data

<field_name>=<field_length>

Specifies the list of fields and their lengths in characters.

Fields must be listed in their physical order. The fields' names must match the order in which you define the columns in the CREATE EXTERNAL TABLE command

preserve_blanks

Defines if trailing blank characters are preserved or trimmed.

By default, trailing blanks are trimmed (off)

null

Specifies the value for null characters.

If preserve_blanks is set to on, you must also define a value for null characters.

If preserve_blanks is set to off, null is not defined, and the field contains only blanks, Greengage DB writes a NULL to the table. Otherwise, if null is defined, Greengage DB writes an empty string to the table

line_delim

Specifies the line ending character.

The following examples cover most cases (E specifies an escape string constant):

  • line_delim=E'\n'

  • line_delim=E'\r'

  • line_delim=E'\r\n'

  • line_delim='<custom_string>'

Example

This example uses the customers.txt fixed-width file. The file is located in the /tmp directory on the sdw1 segment host and has the following content:

1     John Doe       New York
2   Jane Smith    Los Angeles
3    Alice Lee  San Francisco
  1. On the master host, run the CREATE EXTERNAL TABLE command. In the FORMAT clause, set CUSTOM as the data format, specify the fixedwidth_in formatting function, and provide the fields' names and lengths:

    CREATE EXTERNAL TABLE customers
    (
        id INTEGER,
        name VARCHAR(20),
        city VARCHAR(20)
    )
        LOCATION ('file://sdw1/tmp/customers.txt')
        FORMAT 'CUSTOM' (FORMATTER=fixedwidth_in, id='4', name='12', city='13');
  2. Query the created external table:

    SELECT * FROM customers;

    The output should look as follows:

     id |    name    |     city
    ----+------------+---------------
      1 |   John Doe |      New York
      2 | Jane Smith |   Los Angeles
      3 |  Alice Lee | San Francisco

Custom protocol

A custom protocol allows you to connect Greengage DB to a data source that cannot be accessed with the supported built-in protocols. You can create a custom protocol whenever the available built-in protocols do not suffice for a particular need. You must write the protocol functions yourself (see Usage example).

The general steps for working with a custom protocol are as follows:

  1. Implement the send, receive, and (optionally) validator functions in C with specified interfaces.

  2. Compile the functions into a shared object (.so).

  3. In Greengage DB, use the CREATE FUNCTION command to declare the database functions that point to the .so file and function names.

  4. Use the CREATE TRUSTED PROTOCOL command to enable the protocol in the database. You must be a superuser to create and register a custom protocol.

  5. Use the GRANT command to grant access to your users, for example:

    • Allow a user to create readable external tables with a trusted protocol:

      GRANT SELECT ON PROTOCOL <protocol_name> TO <user_name>;
    • Allow a user to create writable external tables with a trusted protocol:

      GRANT INSERT ON PROTOCOL <protocol_name> TO <user_name>;
    • Allow a user to create readable and writable external tables with a trusted protocol:

      GRANT ALL ON PROTOCOL <protocol_name> TO <user_name>;

      To learn more about the Greengage DB access control system, see Roles and privileges.

Usage example

This example demonstrates the implementation of the Greengage DB API for a custom data access protocol and a custom data format. In the Greengage DB source code repository, you can view the complete Input/Output and External Table Formatter functions API.

  1. In the tmp directory on the master host, create the demoprotocol.c protocol implementation in C:

    #include "postgres.h"
    #include "fmgr.h"
    #include "funcapi.h"
    
    #include "access/extprotocol.h"
    #include "catalog/pg_proc.h"
    #include "commands/defrem.h"
    #include "utils/array.h"
    #include "utils/builtins.h"
    #include "utils/memutils.h"
    
    #include "catalog/pg_exttable.h"
    
    
    typedef struct DemoUri
    {
    	char	   *protocol;
    	char	   *path;
    
    }	DemoUri;
    
    static DemoUri *ParseDemoUri(const char *uri_str);
    static void FreeDemoUri(DemoUri* uri);
    
    
    /* Do the module magic dance */
    PG_MODULE_MAGIC;
    PG_FUNCTION_INFO_V1(demoprot_export);
    PG_FUNCTION_INFO_V1(demoprot_import);
    PG_FUNCTION_INFO_V1(demoprot_validate_urls);
    
    Datum demoprot_export(PG_FUNCTION_ARGS);
    Datum demoprot_import(PG_FUNCTION_ARGS);
    Datum demoprot_validate_urls(PG_FUNCTION_ARGS);
    
    
    typedef struct {
    	char	  *url;
    	char	  *filename;
    	FILE	  *file;
    } extprotocol_t;
    
    static void check_ext_options(const FunctionCallInfo fcinfo)
    {
    	ListCell *cell;
    	Relation rel = EXTPROTOCOL_GET_RELATION(fcinfo);
    	ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id);
    	List *options = exttbl->options;
    
    	foreach(cell, options) {
    		DefElem *def = (DefElem *) lfirst(cell);
    		char *key = def->defname;
    		char *value = defGetString(def);
    
    		if (key && strcasestr(key, "database") && !strcasestr(value, "greengage")) {
    				ereport(ERROR, (0, errmsg("This is greengage.")));
    		}
    	}
    }
    
    /*
     * Import data into GPDB.
     */
    Datum
    demoprot_import(PG_FUNCTION_ARGS)
    {
    	extprotocol_t   *myData;
    	char			*data;
    	int				 datlen;
    	size_t			 nread = 0;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_EXTPROTOCOL(fcinfo))
    		elog(ERROR, "extprotocol_import: not called by external protocol manager");
    
    	/* Get our internal description of the protocol */
    	myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
    
    	if(EXTPROTOCOL_IS_LAST_CALL(fcinfo))
    	{
    		/* we're done receiving data. close our connection */
    		if(myData && myData->file)
    			if(fclose(myData->file))
    				ereport(ERROR,
    						(errcode_for_file_access(),
    						 errmsg("could not close file \"%s\": %m",
    								 myData->filename)));
    
    		PG_RETURN_INT32(0);
    	}
    
    	if (myData == NULL)
    	{
    		/* first call. do any desired init */
    
    		const char	*p_name = "demoprot";
    		DemoUri		*parsed_url;
    		char		*url = EXTPROTOCOL_GET_URL(fcinfo);
    
    		myData 			 = palloc(sizeof(extprotocol_t));
    
    		myData->url 	 = pstrdup(url);
    		parsed_url 		 = ParseDemoUri(myData->url);
    		myData->filename = pstrdup(parsed_url->path);
    
    		if(strcasecmp(parsed_url->protocol, p_name) != 0)
    			elog(ERROR, "internal error: demoprot called with a different protocol (%s)",
    						parsed_url->protocol);
    
    		/* An example of checking options */
    		check_ext_options(fcinfo);
    
    		FreeDemoUri(parsed_url);
    
    		/* open the destination file (or connect to remote server in other cases) */
    		myData->file = fopen(myData->filename, "r");
    
    		if (myData->file == NULL)
    			ereport(ERROR,
    					(errcode_for_file_access(),
    					 errmsg("demoprot_import: could not open file \"%s\" for reading: %m",
    							myData->filename)));
    
    		EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
    	}
    
    	/* =======================================================================
    	 *                            DO THE IMPORT
    	 * ======================================================================= */
    
    	data 	= EXTPROTOCOL_GET_DATABUF(fcinfo);
    	datlen 	= EXTPROTOCOL_GET_DATALEN(fcinfo);
    
    	if(datlen > 0)
    	{
    		nread = fread(data, 1, datlen, myData->file);
    		if (ferror(myData->file))
    			ereport(ERROR,
    					(errcode_for_file_access(),
    					 errmsg("demoprot_import: could not write to file \"%s\": %m",
    							 myData->filename)));
    	}
    
    
    	PG_RETURN_INT32((int)nread);
    }
    
    /*
     * Export data out of GPDB.
     */
    Datum
    demoprot_export(PG_FUNCTION_ARGS)
    {
    	extprotocol_t   *myData;
    	char			*data;
    	int				 datlen;
    	size_t			 wrote = 0;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_EXTPROTOCOL(fcinfo))
    		elog(ERROR, "extprotocol_export: not called by external protocol manager");
    
    	/* Get our internal description of the protocol */
    	myData = (extprotocol_t *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
    
    	if(EXTPROTOCOL_IS_LAST_CALL(fcinfo))
    	{
    		/* we're done sending data. close our connection */
    		if(myData && myData->file)
    			if(fclose(myData->file))
    				ereport(ERROR,
    						(errcode_for_file_access(),
    						 errmsg("could not close file \"%s\": %m",
    								 myData->filename)));
    
    		PG_RETURN_INT32(0);
    	}
    
    	if (myData == NULL)
    	{
    		/* first call. do any desired init */
    
    		const char	*p_name = "demoprot";
    		DemoUri		*parsed_url;
    		char		*url = EXTPROTOCOL_GET_URL(fcinfo);
    
    		myData 			 = palloc(sizeof(extprotocol_t));
    
    		myData->url 	 = pstrdup(url);
    		parsed_url 		 = ParseDemoUri(myData->url);
    		myData->filename = pstrdup(parsed_url->path);
    
    		if(strcasecmp(parsed_url->protocol, p_name) != 0)
    			elog(ERROR, "internal error: demoprot called with a different protocol (%s)",
    						parsed_url->protocol);
    
    		FreeDemoUri(parsed_url);
    
    		/* open the destination file (or connect to remote server in other cases) */
    		myData->file = fopen(myData->filename, "a");
    
    		if (myData->file == NULL)
    			ereport(ERROR,
    					(errcode_for_file_access(),
    					 errmsg("demoprot_export: could not open file \"%s\" for writing: %m",
    							myData->filename)));
    
    		EXTPROTOCOL_SET_USER_CTX(fcinfo, myData);
    	}
    
    	/* =======================================================================
    	 *                            DO THE EXPORT
    	 * ======================================================================= */
    
    	data 	= EXTPROTOCOL_GET_DATABUF(fcinfo);
    	datlen 	= EXTPROTOCOL_GET_DATALEN(fcinfo);
    
    	if(datlen > 0)
    	{
    		wrote = fwrite(data, 1, datlen, myData->file);
    		if (ferror(myData->file))
    			ereport(ERROR,
    					(errcode_for_file_access(),
    					 errmsg("demoprot_import: could not read from file \"%s\": %m",
    							 myData->filename)));
    	}
    
    	PG_RETURN_INT32((int)wrote);
    }
    
    Datum
    demoprot_validate_urls(PG_FUNCTION_ARGS)
    {
    	int					nurls;
    	int					i;
    	ValidatorDirection	direction;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo))
    		elog(ERROR, "demoprot_validate_urls: not called by external protocol manager");
    
    	nurls 		= EXTPROTOCOL_VALIDATOR_GET_NUM_URLS(fcinfo);
    	direction 	= EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo);
    
    	/*
    	 * Dumb example 1: search each url for a substring
    	 * we don't want to be used in a url. in this example
    	 * it's 'secured_directory'.
    	 */
    	for (i = 1 ; i <= nurls ; i++)
    	{
    		char *url = EXTPROTOCOL_VALIDATOR_GET_NTH_URL(fcinfo, i);
    
    		if (strstr(url, "secured_directory") != 0)
    		{
                ereport(ERROR,
                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                         errmsg("using 'secured_directory' in a url isn't allowed ")));
    		}
    	}
    
    	/*
    	 * Dumb example 2: set a limit on the number of urls
    	 * used. In this example we limit readable external
    	 * tables that use our protocol to 2 urls max.
    	 */
    	if(direction == EXT_VALIDATE_READ && nurls > 2)
    	{
            ereport(ERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("more than 2 urls aren't allowed in this protocol ")));
    	}
    
    	PG_RETURN_VOID();
    }
    
    /* --- utility functions --- */
    
    static
    DemoUri *ParseDemoUri(const char *uri_str)
    {
    	DemoUri	   *uri = (DemoUri *) palloc0(sizeof(DemoUri));
    	int			protocol_len;
    
     	uri->path = NULL;
     	uri->protocol = NULL;
    
    	/*
    	 * parse protocol
    	 */
    	char *post_protocol = strstr(uri_str, "://");
    
    	if(!post_protocol)
    	{
    		ereport(ERROR,
    				(errcode(ERRCODE_SYNTAX_ERROR),
    				 errmsg("invalid demo prot URI \'%s\'", uri_str)));
    	}
    
    	protocol_len = post_protocol - uri_str;
    	uri->protocol = (char *) palloc0 (protocol_len + 1);
    	strncpy(uri->protocol, uri_str, protocol_len);
    
    	/* make sure there is more to the uri string */
    	if (strlen(uri_str) <= protocol_len)
    		ereport(ERROR,
    				(errcode(ERRCODE_SYNTAX_ERROR),
    		errmsg("invalid demo prot URI \'%s\' : missing path", uri_str)));
    
    	/*
    	 * parse path
    	 */
    	uri->path = pstrdup(uri_str + protocol_len + strlen("://"));
    
    	return uri;
    }
    
    static
    void FreeDemoUri(DemoUri *uri)
    {
    	if (uri->path)
    		pfree(uri->path);
    	if (uri->protocol)
    		pfree(uri->protocol);
    
    	pfree(uri);
    }

    The protocol is defined with the name demoprot. It is specified in the LOCATION clause of the CREATE EXTERNAL TABLE command as the protocol name and the data file path separated by :// as follows:

    LOCATION('demoprot://<path>')

    Three functions are implemented in the protocol:

    • demoprot_import() is a read function.

    • demoprot_export() is a write function.

    • demoprot_validate_urls() is a validation function.

  2. In the /tmp directory on the master host, create the demoformatter.c formatter implementation in C:

    #include "postgres.h"
    #include "fmgr.h"
    #include "funcapi.h"
    
    #include "access/formatter.h"
    #include "catalog/pg_proc.h"
    #include "utils/builtins.h"
    #include "utils/memutils.h"
    #include "utils/typcache.h"
    #include "utils/syscache.h"
    
    /* Do the module magic dance */
    PG_MODULE_MAGIC;
    PG_FUNCTION_INFO_V1(formatter_export);
    PG_FUNCTION_INFO_V1(formatter_import);
    
    Datum formatter_export(PG_FUNCTION_ARGS);
    Datum formatter_import(PG_FUNCTION_ARGS);
    
    
    typedef struct {
    	int        ncols;
    	Datum     *values;
    	bool      *nulls;
    	int        buflen;
    	bytea     *buffer;
    } format_t;
    
    
    /*
     * Maximum size string to support, affects allocation size of the tuple buffer.
     * Only used for variable length strings.  For strings with a declared typmod
     * we allow that size even if it is larger than this.
     */
    #define MAX_FORMAT_STRING 4096
    
    /*
     * Our format converts all NULLs to real values, for floats that value is NaN
     */
    #define NULL_FLOAT8_VALUE get_float8_nan()
    
    
    Datum
    formatter_export(PG_FUNCTION_ARGS)
    {
    	HeapTupleHeader		rec	= PG_GETARG_HEAPTUPLEHEADER(0);
    	TupleDesc           tupdesc;
    	HeapTupleData		tuple;
    	int                 ncolumns = 0;
    	format_t           *myData;
    	char               *data;
    	int                 datlen;
    	int                 i;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_FORMATTER(fcinfo))
    		elog(ERROR, "formatter_export: not called by format manager");
    
    	tupdesc = FORMATTER_GET_TUPDESC(fcinfo);
    
    	/* Get our internal description of the formatter */
    	ncolumns = tupdesc->natts;
    	myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo);
    	if (myData == NULL)
    	{
    		myData          = palloc(sizeof(format_t));
    		myData->ncols   = ncolumns;
    		myData->values  = palloc(sizeof(Datum) * ncolumns);
    		myData->nulls   = palloc(sizeof(bool) * ncolumns);
    
    		/* Determine required buffer size */
    		myData->buflen = 0;
    		for (i = 0; i < ncolumns; i++)
    		{
    			Oid   type   = tupdesc->attrs[i]->atttypid;
    			int32 typmod = tupdesc->attrs[i]->atttypmod;
    
    			/* Don't know how to format dropped columns, error for now */
    			if (tupdesc->attrs[i]->attisdropped)
    				elog(ERROR, "formatter_export: dropped columns");
    
    			switch (type)
    			{
    				case FLOAT8OID:
    				{
    					myData->buflen += sizeof(double);
    					break;
    				}
    
    				case VARCHAROID:
    				case BPCHAROID:
    				case TEXTOID:
    				{
    					myData->buflen += (typmod > 0) ? typmod : MAX_FORMAT_STRING;
    					break;
    				}
    
    				default:
    				{
    					elog(ERROR, "formatter_export error: unsupported data type");
    					break;
    				}
    			}
    		}
    
    		myData->buflen = Max(128, myData->buflen);  /* allocate at least 128 bytes */
    		myData->buffer = palloc(myData->buflen + VARHDRSZ);
    
    		FORMATTER_SET_USER_CTX(fcinfo, myData);
    	}
    	if (myData->ncols != ncolumns)
    		elog(ERROR, "formatter_export: unexpected change of output record type");
    
    	/* break the input tuple into fields */
    	tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
    	ItemPointerSetInvalid(&(tuple.t_self));
    	tuple.t_data = rec;
    	heap_deform_tuple(&tuple, tupdesc, myData->values, myData->nulls);
    
    	datlen = 0;
    	data = VARDATA(myData->buffer);
    
    
    	/* =======================================================================
    	 *                            MAIN FORMATTING CODE
    	 *
    	 * Currently this code assumes:
    	 *  - Homogoneos hardware => No need to convert data to network byte order
    	 *  - Support for TEXT/VARCHAR/BPCHAR/FLOAT8 only
    	 *  - Length Prefixed strings
    	 *  - No end of record tags, checksums, or optimizations for alignment.
    	 *  - NULL values are cast to some sensible default value (NaN, "")
    	 *
    	 * ======================================================================= */
    	for (i = 0; i < ncolumns; i++)
    	{
    		Oid	  type    = tupdesc->attrs[i]->atttypid;
    		int   typmod  = tupdesc->attrs[i]->atttypmod;
    		Datum val     = myData->values[i];
    		bool  nul     = myData->nulls[i];
    
    		switch (type)
    		{
    			case FLOAT8OID:
    			{
    				float8 value;
    
    				if (datlen + sizeof(value) >= myData->buflen)
    					elog(ERROR, "formatter_export: buffer too small");
    
    				if (nul)
    					value = NULL_FLOAT8_VALUE;
    				else
    					value = DatumGetFloat8(val);
    
    				memcpy(&data[datlen], &value, sizeof(value));
    				datlen += sizeof(value);
    				break;
    			}
    
    			case TEXTOID:
    			case VARCHAROID:
    			case BPCHAROID:
    			{
    				text  *str;
    				int32  len;
    
    				if (nul)
    				{
    					str = NULL;
    					len = 0;
    				}
    				else
    				{
    					str = DatumGetTextP(val);
    					len = VARSIZE(str) - VARHDRSZ;
    					if (typmod < 0)
    						len  = Min(len, MAX_FORMAT_STRING);
    				}
    
    				if (datlen + sizeof(len) + len >= myData->buflen)
    					elog(ERROR, "formatter_export: buffer too small");
    				memcpy(&data[datlen], &len, sizeof(len));
    				datlen += sizeof(len);
    
    				if (len > 0)
    				{
    					memcpy(&data[datlen], VARDATA(str), len);
    					datlen += len;
    				}
    				break;
    			}
    
    			default:
    				elog(ERROR, "formatter_export: unsupported datatype");
    				break;
    		}
    	}
    	/* ======================================================================= */
    
    	SET_VARSIZE(myData->buffer, datlen + VARHDRSZ);
    	PG_RETURN_BYTEA_P(myData->buffer);
    }
    
    Datum
    formatter_import(PG_FUNCTION_ARGS)
    {
    	HeapTuple			tuple;
    	TupleDesc           tupdesc;
    	MemoryContext 		m, oldcontext;
    	format_t           *myData;
    	char               *data_buf;
    	int                 ncolumns = 0;
    	int			  		data_cur;
    	int                 data_len;
    	int                 i;
    
    	/* Must be called via the external table format manager */
    	if (!CALLED_AS_FORMATTER(fcinfo))
    		elog(ERROR, "formatter_import: not called by format manager");
    
    	tupdesc = FORMATTER_GET_TUPDESC(fcinfo);
    
    	/* Get our internal description of the formatter */
    	ncolumns = tupdesc->natts;
    	myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo);
    
    	if (myData == NULL)
    	{
    
    		myData          = palloc(sizeof(format_t));
    		myData->ncols   = ncolumns;
    		myData->values  = palloc(sizeof(Datum) * ncolumns);
    		myData->nulls   = palloc(sizeof(bool) * ncolumns);
    
    		/* misc verification */
    		for (i = 0; i < ncolumns; i++)
    		{
    			Oid   type   = tupdesc->attrs[i]->atttypid;
    			//int32 typmod = tupdesc->attrs[i]->atttypmod;
    
    			/* Don't know how to format dropped columns, error for now */
    			if (tupdesc->attrs[i]->attisdropped)
    				elog(ERROR, "formatter_import: dropped columns");
    
    			switch (type)
    			{
    				case FLOAT8OID:
    				case VARCHAROID:
    				case BPCHAROID:
    				case TEXTOID:
    					break;
    
    				default:
    				{
    					elog(ERROR, "formatter_import error: unsupported data type");
    					break;
    				}
    			}
    		}
    
    		FORMATTER_SET_USER_CTX(fcinfo, myData);
    
    	}
    	if (myData->ncols != ncolumns)
    		elog(ERROR, "formatter_import: unexpected change of output record type");
    
    	/* get our input data buf and number of valid bytes in it */
    	data_buf = FORMATTER_GET_DATABUF(fcinfo);
    	data_len = FORMATTER_GET_DATALEN(fcinfo);
    	data_cur = FORMATTER_GET_DATACURSOR(fcinfo);
    
    	/* start clean */
    	MemSet(myData->values, 0, ncolumns * sizeof(Datum));
    	MemSet(myData->nulls, true, ncolumns * sizeof(bool));
    
    	/* =======================================================================
    	 *                            MAIN FORMATTING CODE
    	 *
    	 * Currently this code assumes:
    	 *  - Homogoneos hardware => No need to convert data to network byte order
    	 *  - Support for TEXT/VARCHAR/BPCHAR/FLOAT8 only
    	 *  - Length Prefixed strings
    	 *  - No end of record tags, checksums, or optimizations for alignment.
    	 *  - NULL values are cast to some sensible default value (NaN, "")
    	 *
    	 * ======================================================================= */
    	m = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo);
    	oldcontext = MemoryContextSwitchTo(m);
    
    	for (i = 0; i < ncolumns; i++)
    	{
    		Oid		type    	= tupdesc->attrs[i]->atttypid;
    		//int	typmod		= tupdesc->attrs[i]->atttypmod;
    		int		remaining	= 0;
    		int		attr_len 	= 0;
    
    		remaining = data_len - data_cur;
    
    		switch (type)
    		{
    			case FLOAT8OID:
    			{
    				float8 value;
    
    				attr_len = sizeof(value);
    
    				if (remaining < attr_len)
    				{
    					MemoryContextSwitchTo(oldcontext);
    					FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
    				}
    
    				memcpy(&value, data_buf + data_cur, attr_len);
    
    				if(value != NULL_FLOAT8_VALUE)
    				{
    					myData->nulls[i] = false;
    					myData->values[i] = Float8GetDatum(value);
    				}
    
    				/* TODO: check for nan? */
    
    				break;
    			}
    
    			case TEXTOID:
    			case VARCHAROID:
    			case BPCHAROID:
    			{
    				text*	value;
    				int32	len;
    				bool	nextlen = remaining >= sizeof(len);
    
    				if (nextlen)
    				{
    					memcpy(&len, data_buf + data_cur, sizeof(len));
    
    					if (len < 0)
    						elog(ERROR, "invalid length of varlen datatype: %d",
    									len);
    				}
    
    				/* if len or data bytes don't exist in this buffer, return */
    				if (!nextlen || (nextlen && (remaining - sizeof(len) < len)))
    				{
    					MemoryContextSwitchTo(oldcontext);
    					FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
    				}
    
    				if (len > 0)
    				{
    					value = (text *) palloc(len + VARHDRSZ);
    					SET_VARSIZE(value, len + VARHDRSZ);
    
    					memcpy(VARDATA(value), data_buf + data_cur + sizeof(len), len);
    
    					myData->nulls[i] = false;
    					myData->values[i] = PointerGetDatum(value);
    				}
    
    				attr_len = len + sizeof(len);
    
    				break;
    			}
    
    			default:
    				elog(ERROR, "formatter_import: unsupported datatype");
    				break;
    		}
    
    		/* add byte length of last attribute to the temporary cursor */
    		data_cur += attr_len;
    
    	}
    	/* ======================================================================= */
    
    	MemoryContextSwitchTo(oldcontext);
    
    	FORMATTER_SET_DATACURSOR(fcinfo, data_cur);
    
    	tuple = heap_form_tuple(tupdesc, myData->values, myData->nulls);
    
    	/* hack... pass tuple here. don't free prev tuple - the executor does it  */
    	((FormatterData*) fcinfo->context)->fmt_tuple = tuple;
    
    	FORMATTER_RETURN_TUPLE(tuple);
    }

    The formatter implements the formatter_export and formatter_import functions and supports loading and unloading data in a binary format.

  3. Use the cc C compiler to compile and link the source code to create shared objects that can be dynamically loaded by Greengage DB:

    $ cd /tmp || exit
    $ cc -fpic -c /tmp/demoprotocol.c -I $(pg_config --includedir-server) && \
    cc -shared -o /tmp/demoprotocol.so /tmp/demoprotocol.o
    $ cc -fpic -c /tmp/demoformatter.c -I $(pg_config --includedir-server) && \
    cc -shared -o /tmp/demoformatter.so /tmp/demoformatter.o

    where:

    • -fpic — specifies creating position-independent code (PIC). The object file needs to be created as PIC so that it can be loaded at any arbitrary location in memory by Greengage DB.

    • -c — compiles the source code without linking and creates an object file (demoprotocol.o).

    • -I — specifies the location of header include files. These are located in subdirectories of $GPHOME/include/postgresql/. You can view the exact location by running the following command:

      $ pg_config --includedir-server
    • -shared — specifies creating a shared object (shared library).

    • -o — specifies the shared object file name (demoprotocol.so).

    NOTE

    For more information on the cc options, refer to the GCC manual. For more information on compiling and linking dynamically-loaded functions to create a shared library on different operating systems, see the PostgreSQL documentation.

  4. Copy the compiled code (demoprotocol.so and demoformatter.so shared object files) to the same location on every host in your Greengage DB instance (master and all segments). This location must be in the LD_LIBRARY_PATH so that the server can locate the files. It is recommended to locate shared libraries either relative to $libdir or through the dynamic library path (set by the dynamic_library_path server configuration parameter). You can view the actual directory that $libdir stands for by running the following command:

    $ pg_config --pkglibdir

    To update all segments at once, you can use the gpscp Greengage DB utility:

    $ gpscp -f ~/hostfile_all_hosts /tmp/demoprotocol.so /tmp/demoformatter.so =:"$(pg_config --pkglibdir)"

    where <hostfile_all_hosts> is a list of the original cluster hosts that was used for its initialization.

  5. In the customers database, declare the protocol functions pointing to the demoprotocol.so file and the respective function names:

    CREATE OR REPLACE FUNCTION write_to_file() RETURNS integer AS
       '$libdir/demoprotocol.so', 'demoprot_export' LANGUAGE C STABLE;
    
    CREATE OR REPLACE FUNCTION read_from_file() RETURNS integer AS
        '$libdir/demoprotocol.so', 'demoprot_import' LANGUAGE C STABLE;
    
    CREATE OR REPLACE FUNCTION validate_urls() RETURNS void AS
        '$libdir/demoprotocol.so', 'demoprot_validate_urls' LANGUAGE C STABLE;
  6. Create a protocol that accesses these functions:

    CREATE TRUSTED PROTOCOL demoprot(
        writefunc='write_to_file',
        readfunc='read_from_file',
        validatorfunc='validate_urls'
    );
  7. Declare the formatting functions pointing to the demoformatter.so file and the respective function names:

    CREATE OR REPLACE FUNCTION formatter_export(record) RETURNS bytea
    AS '$libdir/demoformatter.so', 'formatter_export'
    LANGUAGE C STABLE;
    
    CREATE OR REPLACE FUNCTION formatter_import() RETURNS record
    AS '$libdir/demoformatter.so', 'formatter_import'
    LANGUAGE C STABLE;
  8. Create the customers_w writable external table. In the LOCATION clause, use the demoprot protocol and specify the customers.demo output file. In the FORMAT clause, set CUSTOM as the data format and specify the formatter_export formatting function:

    CREATE WRITABLE EXTERNAL TABLE customers_w
    (
        id TEXT,
        name TEXT,
        email TEXT,
        address TEXT
    )
        LOCATION('demoprot://customers.demo')
        FORMAT 'CUSTOM' (FORMATTER=formatter_export)
        DISTRIBUTED BY (id);
  9. Populate the customers_w table with sample data:

    INSERT INTO customers_w (id, name, email, address)
    VALUES (1,'John Doe','john.doe@example.com','123 Elm Street'),
           (2,'Jane Smith','jane.smith@example.com','456 Oak Avenue'),
           (3,'Bob Brown','bob.brown@example.com','789 Pine Street'),
           (4,'Rob Stuart','rob.stuart@example.com','119 Willow Street'),
           (5, 'Alice Johnson', 'alice.johnson@example.com', '221 Maple Drive'),
           (6, 'David Lee', 'david.lee@example.com', '333 Birch Lane'),
           (7, 'Emily Wilson', 'emily.wilson@example.com', '444 Cedar Court'),
           (8, 'Michael Garcia', 'michael.garcia@example.com', '555 Walnut Place');

    Running the command creates the customers.demo binary file in each segment’s data directory.

  10. Ensure the files are created. You can use the gpssh utility to view the contents of all segments' data directories:

    $ gpssh -f ~/hostfile_segment_hosts -e 'ls /data1/primary/gpseg*/customers.demo'

    where <hostfile_segment_hosts> is the list of cluster segment hosts.

    The output should look similar to the following:

    [sdw1] /data1/primary/gpseg0/customers.demo  /data1/primary/gpseg1/customers.demo
    [sdw2] /data1/primary/gpseg2/customers.demo  /data1/primary/gpseg3/customers.demo
  11. Create the customers_r readable external table. In the LOCATION clause, use the demoprot protocol and specify the customers.demo input file. In the FORMAT clause, set CUSTOM as the data format and specify the formatter_import formatting function:

    CREATE EXTERNAL TABLE customers_r
    (
        id TEXT,
        name TEXT,
        email TEXT,
        address TEXT
    )
        LOCATION('demoprot://customers.demo')
        FORMAT 'CUSTOM' (FORMATTER=formatter_import);
  12. Query the created external table:

    SELECT * FROM customers_r;

    The output should look as follows:

     id |      name      |           email            |      address
    ----+----------------+----------------------------+-------------------
      1 | John Doe       | john.doe@example.com       | 123 Elm Street
      5 | Alice Johnson  | alice.johnson@example.com  | 221 Maple Drive
      6 | David Lee      | david.lee@example.com      | 333 Birch Lane
      2 | Jane Smith     | jane.smith@example.com     | 456 Oak Avenue
      3 | Bob Brown      | bob.brown@example.com      | 789 Pine Street
      4 | Rob Stuart     | rob.stuart@example.com     | 119 Willow Street
      7 | Emily Wilson   | emily.wilson@example.com   | 444 Cedar Court
      8 | Michael Garcia | michael.garcia@example.com | 555 Walnut Place
    (8 rows)