Hello, I’m DocuDroid!
Submitting feedback
Thank you for rating our AI Search!
We would be grateful if you could share your thoughts so we can improve our AI Search for you and other readers.
GitHub

Use PXF HDFS connector to read and write Avro-format data between Greengage DB and HDFS

Anton Monakov

Apache Avro is a data serialization framework designed for efficient storage and fast processing. It provides a compact binary format for serializing data and uses JSON to define both its data types and the schemas that describe the structure of the data. An Avro schema, together with its data, is fully self-describing.

This topic describes how to configure and use the PXF HDFS connector for reading and writing Avro data stored in HDFS by using external tables and provides practical examples.

Data type mapping

The Avro specification defines primitive, complex, and logical types. To represent Avro primitive data types and arrays of primitive types in Greengage DB, you need to map data values to Greengage DB columns of the same type.

Avro supports complex data types including arrays of non-primitive types, maps, records, enumerations, and fixed types. You need to map top-level fields of these complex data types to the Greengage DB text type. While PXF does not natively support reading complex data types, you can create Greengage DB functions or application code to extract or further process their components.

Avro supports logical data types including date, decimal, duration, time, timestamp, and uuid types.

Read mapping

When reading Avro data, PXF uses the following data type mapping.

Avro data type PXF / Greengage DB data type

boolean

BOOLEAN

bytes

BYTEA

double

DOUBLE

float

REAL

int

INT

long

BIGINT

string

TEXT

Complex type: Array (any dimension) of type: boolean, bytes, double, float, int, long, string

Array (any dimension) of type: BOOLEAN, BYTEA, DOUBLE, REAL, BIGINT, TEXT

Complex type: Array of other types (Avro schema is provided)

Array of type TEXT

Complex type: Map, Record, or Enum

TEXT, with delimiters between collection items, mapped key/value pairs, and record data

Complex type: Fixed

BYTEA (supported for read operations only)

Union

Follows the above conventions for primitive or complex data types, depending on the union; must contain 2 elements, one of which must be NULL

Logical type: Date

DATE

Logical type: Decimal

DECIMAL or NUMERIC

Logical type: Duration

BYTEA

Logical type: Time (millisecond precision)

TIME (without time zone)

Logical type: Time (microsecond precision)

TIME (without time zone)

Logical type: Timestamp (millisecond precision)

TIMESTAMP (with or without time zone)

Logical type: Timestamp (microsecond precision)

TIMESTAMP (with or without time zone)

Logical type: Local Timestamp (millisecond precision)

TIMESTAMP (with or without time zone)

Logical type: Local Timestamp (microsecond precision)

TIMESTAMP (with or without time zone)

Logical type: UUID

UUID

Write mapping

PXF supports writing Avro primitive types and arrays of Avro primitive types as well as writing other complex types to Avro as string. When writing Avro data, PXF uses the following data type mapping.

PXF / Greengage DB data type Avro data type

BIGINT

long

BOOLEAN

boolean

BYTEA

bytes

DOUBLE

double

CHAR (1)

string

ENUM

string

INT

int

REAL

float

SMALLINT (2)

int

TEXT

string

VARCHAR

string

NUMERIC, DATE, TIME, TIMESTAMP, TIMESTAMPTZ (no Avro schema is provided)

string

Array (any dimension) of type: BIGINT, BOOLEAN, BYTEA, DOUBLE, INT, REAL, TEXT (Avro schema is provided)

Array (any dimension) of type: long, boolean, bytes, double, int, float, string

One-dimensional array of type: BIGINT, BOOLEAN, BYTEA, DOUBLE, INT, REAL, TEXT (no Avro schema is provided)

One-dimensional array of type: long, boolean, bytes, double, int, float, string

One-dimensional array of type: NUMERIC, DATE, TIME, TIMESTAMP, TIMESTAMPTZ (Avro schema is provided)

One-dimensional array of type string

ENUM, RECORD

string

  1. If required, PXF right-pads CHAR[N] types to length N with whitespace.

  2. PXF converts Greengage DB SMALLINT types to INT before writing the Avro data. Be sure to read the field into an INT.

Avro schemas and data

Avro schemas are defined using JSON and composed of the primitive and complex types listed in Data type mapping. Avro schema files typically have an .avsc suffix. Fields in an Avro schema file are defined via an array of objects, each of which is specified by a name and a type. An Avro data file contains the schema and a compact binary representation of the data and typically has the .avro suffix. You can specify an Avro schema on both read and write operations to HDFS as either a binary *.avro file or a JSON-format *.avsc file for the schema file.

External table type Schema specified Description

Readable

Yes

PXF uses the specified schema; this overrides the schema embedded in the Avro data file

Readable

No

PXF uses the schema embedded in the Avro data file

Writable

Yes

PXF uses the specified schema

Writable

No

PXF creates the Avro schema based on the external table definition

The provided Avro schema file must reside either in the same location on each Greengage DB host or on the Hadoop file system. PXF first searches for an absolute file path on the Greengage DB hosts. If the schema file is not found, it searches for the file relative to the PXF classpath. If the schema file is not found locally, it searches for the file on HDFS.

NOTE

The $PXF_BASE/conf directory is in the PXF classpath. PXF can locate an Avro schema file that you add to this directory on every Greengage DB host.

Write Avro data

When you create a writable external table to write Avro data, you specify the name of an HDFS directory. PXF assigns the name of the external table column to the Avro field name, and each table row is written as an Avro record. If no schema file is specified, PXF generates a schema for the Avro file based on the Greengage DB external table definition.

While Avro has a null type, Greengage DB external tables do not support the NOT NULL column qualifier. For this reason, PXF wraps each data type in an Avro union of the mapped type and null. For example, for a writable external table column defined with the text data type, PXF generates the following schema element:

["string", "null"]

PXF returns an error if it encounters a NULL data field and the provided schema does not include a union of the field data type with null.

PXF supports writing only Avro primitive data types and arrays of the types listed in Write mapping. PXF does not support writing complex types to Avro:

  • When you specify a schema file in the LOCATION clause of the CREATE EXTERNAL TABLE command, the schema must include only primitive data types.

  • When PXF generates the schema, it writes any complex type that you specify in the writable external table column definition to the Avro file as a string type.

For example, if you write an array of the Greengage DB numeric type, PXF converts the array to a string, and you must read this data with a Greengage DB column of the TEXT type.

Create an external table using the PXF protocol

To create a Greengage DB external table to read or write Avro-format data in HDFS, use the following general syntax:

CREATE [READABLE | WRITABLE] EXTERNAL TABLE <table_name>
    ( <column_name> <data_type> [, ...] | LIKE <other_table> )

    LOCATION ('pxf://<path-to-hdfs>?PROFILE=hdfs:avro[&<custom-option>=<value>[...]]')
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import'|'pxfwritable_export')
    [DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];
Keyword Value

<path‑to‑hdfs>

The path to the directory or file in the HDFS data store. When the <server_name> configuration includes a pxf.fs.basePath property setting, the <path‑to‑hdfs> value is considered to be relative to the base path specified. Otherwise, the path is considered absolute. The value must not include the dollar sign ($) character.

PXF supports reading or writing Avro files compressed with the bzip2, xz, snappy, and deflate codecs

PROFILE=hdfs:avro

The hdfs:avro profile is used to read or write Avro-format data in HDFS

FORMAT 'CUSTOM'

The custom format with the built-in custom formatter functions for read (pxfwritable_import) and write (pxfwritable_export) operations is used to work with Avro-format data in HDFS

DISTRIBUTED BY

When loading data from a Greengage DB table into a writable external table, consider specifying the same distribution policy or column name on both tables. This will avoid extra motion of data between segments on the load operation. Learn more about table distribution in Distribution

<custom‑option>

One of the custom options provided in the LOCATION string as described below

SERVER=<server_name>

The named server configuration that PXF uses to access the data. If not specified, the default PXF server is used

COLLECTION_DELIM

The delimiter characters placed between entries in a top-level array, map, or record field when mapping an Avro complex data type to a text column during data reading. The default is the comma character (,)

MAPKEY_DELIM

The delimiter characters placed between the key and value of a map entry when mapping an Avro complex data type to a text column during data reading. The default is the colon character (:)

RECORDKEY_DELIM

The delimiter characters placed between the field name and value of a record entry when mapping an Avro complex data type to a text column during data reading. The default is the colon character (:)

SCHEMA

The absolute path to the Avro schema file on the Greengage DB host or on HDFS, or the relative path to the schema file on the host

IGNORE_MISSING_PATH

The action to take when <path-to-hdfs> is missing or invalid. If set to false (default), an error is returned. If set to true, PXF ignores missing path errors and returns an empty fragment

COMPRESSION_CODEC

The compression codec to use when writing data: bzip2, xz, snappy, deflate, and uncompressed. If not provided (or uncompressed is provided), no data compression is performed

CODEC_LEVEL

The compression level (applicable to the deflate and xz codecs only), which provides the trade-off between speed and compression. Valid values are 1 (fastest) to 9 (most compressed). The default compression level is 6

Examples

These examples demonstrate how to configure and use the PXF HDFS connector for reading and writing HDFS Avro data by using external tables.

Prerequisites

To try out the practical examples, connect to the Greengage DB master host as gpadmin using psql as described in Connect to Greengage DB via psql. Then create the customers test database and connect to it:

DROP DATABASE IF EXISTS customers;
CREATE DATABASE customers;
\c customers

To be able to create an external table using the PXF protocol, enable the PXF extension in the database as described in Register PXF in a database in PXF documentation:

CREATE EXTENSION pxf;

Configure the PXF HDFS connector

To have PXF connect to HDFS, you need to create a Hadoop server configuration as described in Configure PXF Hadoop connectors in PXF documentation and then synchronize it to the Greengage DB cluster:

  1. Log in to the Greengage DB master host as gpadmin.

  2. Go to the $PXF_BASE/servers directory and create a Hadoop server configuration directory named hadoop:

    $ mkdir $PXF_BASE/servers/hadoop
    $ cd $PXF_BASE/servers/hadoop
  3. Copy the core-site.xml, hdfs-site.xml, mapred-site.xml, and yarn-site.xml Hadoop configuration files from the NameNode host of the Hadoop cluster to the current host:

    $ scp hdfsuser@namenode:/etc/hadoop/conf/core-site.xml .
    $ scp hdfsuser@namenode:/etc/hadoop/conf/hdfs-site.xml .
    $ scp hdfsuser@namenode:/etc/hadoop/conf/mapred-site.xml .
    $ scp hdfsuser@namenode:/etc/hadoop/conf/yarn-site.xml .
  4. Synchronize the PXF server configuration to the Greengage DB cluster:

    $ pxf cluster sync

Create a readable external table

  1. In the /tmp directory on the HDFS host, create a file named avro_schema.avsc containing the Avro schema:

    {
      "type": "record",
      "name": "Customer",
      "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": "string"},
        {"name": "preferredChannels", "type": {"type": "array", "items": "string"}},
        {
          "name": "address",
          "type": {
            "type": "record",
            "name": "Address",
            "fields": [
              {"name": "street", "type": "string"},
              {"name": "city", "type": "string"},
              {"name": "state", "type": "string"},
              {"name": "zipCode", "type": "string"}
            ]
          }
        },
        {
          "name": "preferences",
          "type": {
            "type": "map",
            "values": "string"
          }
        }
      ]
    }

    The schema uses the following field names and data types:

    • id — int;

    • name — string;

    • email — string;

    • preferredChannels — array of string (string[]);

    • address — record comprised of the fields of type string;

    • preferences — map of string.

  2. In the /tmp directory on the HDFS host, create a sample JSON Avro data file named customers.txt that conforms to the above schema. The sample data uses the comma character (,) to separate top-level records and the colon character (:) to separate map keys / values and record field names / values:

    {
      "id": 123,
      "name": "Alice Smith",
      "email": "alice.smith@example.com",
      "preferredChannels": ["email", "sms"],
      "address": {
        "street": "123 Main St",
        "city": "Anytown",
        "state": "CA",
        "zipCode": "91234"
      },
      "preferences": {
        "notificationsEnabled": "true",
        "language": "en-US",
        "theme": "dark"
      }
    }
    {
      "id": 456,
      "name": "Bob Johnson",
      "email": "bob.johnson@example.com",
      "preferredChannels": ["push", "in-app"],
      "address": {
        "street": "456 Oak Ave",
        "city": "Springfield",
        "state": "IL",
        "zipCode": "62704"
      },
      "preferences": {
        "notificationsEnabled": "false",
        "language": "fr-CA",
        "theme": "light"
      }
    }
  3. There are various ways to perform the conversion of a text file to Avro format, both programmatically and via the command line. This example uses the Java Avro tools. Download the most recent version of the Avro tools JAR from Maven Central to the /tmp directory and convert the customers.txt file to Avro binary format:

    $ curl -O https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.12.1/avro-tools-1.12.1.jar
    $ java -jar ./avro-tools-1.12.1.jar fromjson --schema-file /tmp/avro_schema.avsc /tmp/customers.txt > /tmp/customers.avro
  4. Create the /tmp/pxf_examples HDFS directory for storing PXF example data files and add the generated Avro file to HDFS:

    $ hdfs dfs -mkdir -p /tmp/pxf_examples
    $ hdfs dfs -put /tmp/customers.avro /tmp/pxf_examples/
  5. On the Greengage DB master host, create an external table that references the customers.avro file. Map the top-level primitive fields, id, name, and email, to the Greengage DB types (INT and TEXT). Map the complex fields (preferredChannels, address, and preferences) to a Greengage DB text array (TEXT[]). In the LOCATION clause, specify the PXF hdfs:avro profile and the server configuration. Set the record, map, and collection delimiters using the corresponding custom options. In the FORMAT clause, specify pxfwritable_import, which is the built-in custom formatter function for read operations:

    CREATE EXTERNAL TABLE customers_r (
            id INT,
            name TEXT,
            email TEXT,
            preferredChannels TEXT[],
            address TEXT[],
            preferences TEXT[]
        )
        LOCATION ('pxf://tmp/pxf_examples/customers.avro?&PROFILE=hdfs:avro&SERVER=hadoop&COLLECTION_DELIM=,&MAPKEY_DELIM=:&RECORDKEY_DELIM=:')
        FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
  6. Query the created external table:

    SELECT * FROM customers_r;

    The output should look as follows:

     id  |    name     |          email          | preferredchannels |                            address                             |                       preferences
    -----+-------------+-------------------------+-------------------+----------------------------------------------------------------+---------------------------------------------------------
     123 | Alice Smith | alice.smith@example.com | {email,sms}       | {"street:123 Main St",city:Anytown,state:CA,zipCode:91234}     | {theme:dark,notificationsEnabled:true,language:en-US}
     456 | Bob Johnson | bob.johnson@example.com | {push,in-app}     | {"street:456 Oak Ave",city:Springfield,state:IL,zipCode:62704} | {theme:light,notificationsEnabled:false,language:fr-CA}
    (2 rows)
  7. Optionally, query the table, displaying the customer ID and the first elements of the preferences and preferredchannels text arrays:

    SELECT id, preferences[1], preferredChannels[1] FROM customers_r;

    The output should look as follows:

     id  | preferences | preferredchannels
    -----+-------------+-------------------
     123 | theme:dark  | email
     456 | theme:light | push
    (2 rows)

Create a writable external table

  1. On the Greengage DB master host, create the writable external table that stores data into the /tmp/pxf_examples/customers_w HDFS directory. In the LOCATION clause, specify the PXF hdfs:avro profile and the server configuration. In the FORMAT clause, specify pxfwritable_export, which is the built-in custom formatter function for write operations:

    CREATE WRITABLE EXTERNAL TABLE customers_w(
            id INT,
            name TEXT, 
            preferredChannels TEXT[]    
        )
        LOCATION ('pxf://tmp/pxf_examples/customers_w?PROFILE=hdfs:avro&SERVER=hadoop')
        FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  2. Insert some data into the customers_w table:

    INSERT INTO customers_w 
    VALUES (1, 'Alice Smith', ARRAY['email', 'sms']),
           (1, 'Bob Johnson', ARRAY['push', 'in-app']);
  3. Copy the /tmp/pxf_examples/customers_w HDFS directory to the local /tmp directory:

    $ hdfs dfs -get -f /tmp/pxf_examples/customers_w /tmp
  4. Concatenate the Avro files in the customers_w directory into a single customers_w.avro file:

    $ java -jar ./avro-tools-1.12.1.jar concat /tmp/customers_w/ /tmp/customers_w.avro
  5. View the contents of the file:

    $ java -jar ./avro-tools-1.12.1.jar tojson /tmp/customers_w.avro

    The output should look as follows:

    {"id":{"int":1},"name":{"string":"Alice Smith"},"preferredchannels":{"array":[{"string":"email"},{"string":"sms"}]}}
    {"id":{"int":1},"name":{"string":"Bob Johnson"},"preferredchannels":{"array":[{"string":"push"},{"string":"in-app"}]}}