Hello, I’m DocuDroid!
Submitting feedback
Thank you for rating our AI Search!
We would be grateful if you could share your thoughts so we can improve our AI Search for you and other readers.
GitHub

Use PXF HDFS connector to read and write ORC-format data between Greengage DB and HDFS

Anton Monakov

The Optimized Row Columnar (ORC) file format is a column-oriented data storage format that offers improvements over text and RCFile formats in terms of both compression and performance. ORC is type-aware and is specifically designed for Hadoop workloads. ORC files store both the type of the data in the file and its encoding information.

In an ORC file, all columns within a single group of row data (also known as stripe) are stored together on disk. The columnar nature of the format enables column projection on read operations, helping avoid accessing unnecessary columns during a query. ORC also supports predicate pushdown with built-in indexes at the file, stripe, and row levels, moving the filter operation to the data loading phase. PXF HDFS connector supports ORC file versions v0 and v1.

This topic describes how to configure and use the PXF HDFS connector for reading and writing ORC data stored in HDFS by using external tables and provides practical examples.

Data type mapping

To read and write ORC primitive data types in Greengage DB, map the ORC data values to the Greengage DB columns of the same type.

NOTE

The hdfs:orc profile supports reading and writing scalar data types and lists of certain scalar types from ORC files. If the data resides in a Hive table, and you want to read complex types or the Hive table is partitioned, use the hive:orc profile. For details on working with data residing in a Hive table, see Use PXF Hive connector to read table data from Hive to Greengage DB.

Read mapping

To read ORC scalar data types in Greengage DB, map ORC data values to Greengage DB columns of the same type using the following data type mapping.

ORC physical type ORC logical type PXF / Greengage DB data type

binary

decimal

NUMERIC

binary

timestamp

TIMESTAMP

byte[]

string

TEXT

byte[]

char

BPCHAR

byte[]

varchar

VARCHAR

byte[]

binary

BYTEA

Double

float

REAL

Double

double

FLOAT8

Integer

boolean (1 bit)

BOOLEAN

Integer

tinyint (8 bit)

SMALLINT

Integer

smallint (16 bit)

SMALLINT

Integer

int (32 bit)

INTEGER

Integer

bigint (64 bit)

BIGINT

Integer

date

DATE

PXF supports the list ORC compound type for a subset of the ORC scalar types. The map, union, and struct compound types are not supported.

ORC compound type PXF / Greengage DB data type

array<string>

TEXT[]

array<char>

BPCHAR[]

array<varchar>

VARCHAR[]

array<binary>

BYTEA[]

array<float>

REAL[]

array<double>

FLOAT8[]

array<boolean>

BOOLEAN[]

array<tinyint>

SMALLINT[]

array<smallint>

SMALLINT[]

array<int>

INTEGER[]

array<bigint>

BIGINT[]

Write mapping

PXF uses the following data type mapping when writing ORC data.

PXF / Greengage DB data type ORC logical type ORC physical type

NUMERIC

decimal

binary

TIMESTAMP (1)

timestamp

binary

TIMESTAMPTZ

timestamp with local timezone

timestamp

TEXT

string

byte[]

BPCHAR

char

byte[]

VARCHAR

varchar

byte[]

BYTEA

binary

byte[]

REAL

float

Double

FLOAT8

double

Double

BOOLEAN

boolean (1 bit)

Integer

SMALLINT

tinyint (8 bit)

Integer

SMALLINT

smallint (16 bit)

Integer

INTEGER

int (32 bit)

Integer

BIGINT

bigint (64 bit)

Integer

DATE

date

Integer

UUID

string

byte[]

  1. The pxf.orc.write.timezone.utc server configuration property in the pxf-site.xml file defines how PXF writes TIMESTAMP values to the external data store. By default, a TIMESTAMP type is written using the UTC timezone. To write a TIMESTAMP type using the PXF JVM local timezone, set the pxf.orc.write.timezone.utc property to false.

NOTE

Learn about configuring a PXF server in Configure a PXF server in PXF documentation.

PXF supports writing the list ORC compound type for one-dimensional arrays of all the ORC primitive types listed above. The map, union, and struct compound types as well as user-provided schemas are not supported.

PXF / Greengage DB data type ORC compound type

NUMERIC[]

array<decimal>

TIMESTAMP[]

array<timestamp>

TEXT[]

array<string>

BPCHAR[]

array<char>

VARCHAR[]

array<varchar>

BYTEA[]

array<binary>

REAL[]

array<float>

FLOAT8[]

array<double>

BOOLEAN[]

array<boolean>

SMALLINT[]

array<tinyint>

SMALLINT[]

array<smallint>

INTEGER[]

array<int>

BIGINT[]

array<bigint>

DATE[]

array<date>

Create an external table using the PXF protocol

To create a Greengage DB external table to read or write ORC-format data in HDFS, use the following general syntax:

CREATE [READABLE | WRITABLE] EXTERNAL TABLE <table_name>
    ( <column_name> <data_type> [, ...] | LIKE <other_table> )

    LOCATION ('pxf://<path_to_data>?PROFILE=hdfs:orc[&<custom_option>=<value>[...]]')
    FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import'|'pxfwritable_export')
    [DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];
Keyword Value

<table_name>

The name of the table to create

<column_name>

The name of the column to create

<data_type>

The data type of the column

LIKE <other_table>

Specifies a table from which the new external table automatically copies all column names, data types, and distribution policy

<path_to_data>

The path to the directory or file in the HDFS data store. When the <server_name> configuration includes a pxf.fs.basePath property setting, the <path_to_data> value is considered to be relative to the base path specified. Otherwise, the path is considered absolute. The value must not include the dollar sign ($) character.

The PXF HDFS connector reads 1024 ORC-formatted data rows at a time

PROFILE

The hdfs:orc profile is used to read or write ORC-format data in HDFS

FORMAT ‘CUSTOM’

The custom format with the built-in custom formatter functions for read (pxfwritable_import) and write (pxfwritable_export) operations is used to work with ORC-format data in HDFS

DISTRIBUTED BY

When loading data from a Greengage DB table into a writable external table, consider specifying the same distribution policy or column name on both tables. This will avoid extra motion of data between segments on the load operation. Learn more about table distribution in Distribution

<custom_option>

One of the custom options provided in the LOCATION string as described below

SERVER=<server_name>

The named server configuration that PXF uses to access the data. If not specified, the default PXF server is used

IGNORE_MISSING_PATH

The action to take when <path_to_data> is missing or invalid. If set to false (default), an error is returned. If set to true, PXF ignores missing path errors and returns an empty fragment

MAP_BY_POSITION

Specifies whether PXF should map an ORC column to a Greengage DB column by position. The default value is false: PXF maps an ORC column to a Greengage DB column by name

COMPRESSION_CODEC

The compression codec to use when writing data: lz4, lzo, zstd, snappy, zlib, or none.

If not specified, PXF compresses the data using Zlib compression

Numeric data overflow conditions

PXF uses the HiveDecimal class to write numeric ORC data, which limits both the precision and the scale of a numeric type to a maximum of 38. When you define a NUMERIC column in an external table without specifying a precision or scale, PXF internally maps the column to DECIMAL(38, 18).

PXF handles the following precision overflow conditions:

  • A NUMERIC column is defined in the external table, and the total digit count of a value exceeds the maximum supported precision of 38, for example, 1234567890123456789012345678901234567890.12345, which has a total digit count of 45.

  • A NUMERIC(<precision>) column is defined with a <precision> value greater than 38, for example NUMERIC(55).

  • A NUMERIC column is defined in the external table, and the integer digit count of a value is greater than 20 (38-18), for example, 123456789012345678901234567890.12345, which has an integer digit count of 30.

If you define a NUMERIC(<precision>, <scale>) column and the integer digit count of a value is greater than <precision>-<scale>, PXF returns an error. For example, you define a NUMERIC(20,4) column and the value is 12345678901234567.12, whose integer digit count of 17 is greater than 20-4=16.

PXF can perform one of the following actions when detecting a numeric data overflow: round the value (the default), return an error, or ignore the overflow. The pxf.orc.write.decimal.overflow property in the pxf-site.xml server configuration file specifies the action to take.

Value PXF action

round

The default behavior. When PXF encounters an overflow, it attempts to round the value to meet both precision and scale requirements before writing and reports an error if rounding fails. This may potentially leave an incomplete dataset in the external system

error

PXF reports an error when it encounters an overflow, and the transaction fails

ignore

PXF logs a warning and attempts to round the value to meet both precision and scale requirements; otherwise a NULL value is written

NOTE

Learn about configuring a PXF server in Configure a PXF server in PXF documentation.

Examples

These examples demonstrate how to configure and use the PXF HDFS connector for reading and writing HDFS ORC data by using external tables.

Prerequisites

To try out the practical examples, connect to the Greengage DB master host as gpadmin using psql as described in Connect to Greengage DB via psql. Then create the customers test database and connect to it:

DROP DATABASE IF EXISTS customers;
CREATE DATABASE customers;
\c customers

To be able to create an external table using the PXF protocol, enable the PXF extension in the database as described in Register PXF in a database in PXF documentation:

CREATE EXTENSION pxf;

Configure the PXF HDFS connector

To have PXF connect to HDFS, you need to create a Hadoop server configuration as described in Configure PXF Hadoop connectors in PXF documentation and then synchronize it to the Greengage DB cluster:

  1. Log in to the Greengage DB master host as gpadmin.

  2. Go to the $PXF_BASE/servers directory and create a Hadoop server configuration directory named hadoop:

    $ mkdir $PXF_BASE/servers/hadoop
    $ cd $PXF_BASE/servers/hadoop
  3. Copy the core-site.xml, hdfs-site.xml, mapred-site.xml, and yarn-site.xml Hadoop configuration files from the NameNode host of the Hadoop cluster to the current host:

    $ scp hdfsuser@namenode:/etc/hadoop/conf/core-site.xml .
    $ scp hdfsuser@namenode:/etc/hadoop/conf/hdfs-site.xml .
    $ scp hdfsuser@namenode:/etc/hadoop/conf/mapred-site.xml .
    $ scp hdfsuser@namenode:/etc/hadoop/conf/yarn-site.xml .
  4. Synchronize the server configuration to the Greengage DB cluster hosts:

    $ pxf cluster sync

Create a readable external table

  1. In the /tmp directory on the HDFS host, create a file named customers.json having the following content:

    {"id": 1,"name": "John Doe","ordered_items":["laptop", "monitor"]}
    {"id": 2,"name": "Jane Smith","ordered_items":["keyboard", "mouse", "pad"]}
    {"id": 3,"name": "Bob Brown","ordered_items":["headphones", "laptop"]}
    {"id": 4,"name": "Alice Green","ordered_items":["webcam", "microphone", "mouse"]}

    The following field names and data types are used in the file:

    • id — INT;

    • name — TEXT;

    • ordered_items — TEXT[].

  2. Download the most recent version of the ORC Java tools JAR from Maven Central to the /tmp directory and convert customers.json to the customers.orc ORC file:

    $ curl -O https://repo1.maven.org/maven2/org/apache/orc/orc-tools/2.2.2/orc-tools-2.2.2-uber.jar
    $ java -jar /tmp/orc-tools-2.2.2-uber.jar convert /tmp/customers.json \
        --schema 'struct<id:int,name:string,ordered_items:array<string>>' \
        -o /tmp/customers.orc
  3. Create the /tmp/pxf_examples HDFS directory for storing PXF example data files and add the generated ORC file to HDFS:

    $ hdfs dfs -mkdir -p /tmp/pxf_examples
    $ hdfs dfs -put /tmp/customers.orc /tmp/pxf_examples/
  4. On the Greengage DB master host, create an external table that references the customers.orc file. In the LOCATION clause, specify the PXF hdfs:orc profile and the server configuration. In the FORMAT clause, specify pxfwritable_import, which is the built-in custom formatter function for read operations:

    CREATE EXTERNAL TABLE customers_r (
            id INT,
            name TEXT,
            ordered_items TEXT[]
        )
        
        LOCATION('pxf://tmp/pxf_examples/customers.orc?PROFILE=hdfs:orc&SERVER=hadoop')
        FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
  5. Query the created external table:

    SELECT * FROM customers_r;

    The output should look as follows:

      id |    name     |       ordered_items
    ----+-------------+---------------------------
      1 | John Doe    | {laptop,monitor}
      2 | Jane Smith  | {keyboard,mouse,pad}
      3 | Bob Brown   | {headphones,laptop}
      4 | Alice Green | {webcam,microphone,mouse}
    (4 rows)
  6. Run a query to view the rows where the ordered_items column includes laptop or monitor:

    SELECT * FROM customers_r WHERE ordered_items && '{"laptop", "monitor"}';

    The output should look as follows:

     id |   name    |    ordered_items
    ----+-----------+---------------------
      1 | John Doe  | {laptop,monitor}
      3 | Bob Brown | {headphones,laptop}
    (2 rows)
  7. Run a query to view the rows where the first item in the ordered_items column is headphones:

    SELECT * FROM customers_r WHERE ordered_items[1] = 'headphones';

    The output should look as follows:

     id |   name    |    ordered_items
    ----+-----------+---------------------
      3 | Bob Brown | {headphones,laptop}
    (1 row)

Create a writable external table

  1. On the Greengage DB master host, create the writable external table that stores data into the /tmp/pxf_examples/customers_w HDFS directory. In the LOCATION clause, specify the PXF hdfs:orc profile and the server configuration and set COMPRESSION_CODEC to none to disable compression. In the FORMAT clause, specify pxfwritable_export, which is the built-in custom formatter function for write operations:

    CREATE WRITABLE EXTERNAL TABLE customers_w(
            id INT,
            name TEXT, 
            ordered_items TEXT[]    
        )
        LOCATION ('pxf://tmp/pxf_examples/customers_w?PROFILE=hdfs:orc&SERVER=hadoop&COMPRESSION_CODEC=none')
        FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  2. Insert some data into the customers_w table:

    INSERT INTO customers_w 
    VALUES (1, 'Bob Brown', ARRAY['monitor', 'webcam']),
           (2, 'John Doe', ARRAY['keyboard', 'microphone']),
           (3, 'Alice Green', ARRAY['headphones', 'pad']),
           (4, 'Jane Smith', ARRAY['laptop', 'mouse']);
  3. Copy the /tmp/pxf_examples/customers_w HDFS directory to the local /tmp directory:

    $ hdfs dfs -get -f /tmp/pxf_examples/customers_w /tmp
  4. View the contents of the file in the /tmp/customers_w directory:

    $ java -jar /tmp/orc-tools-2.2.2-uber.jar meta -d /tmp/customers_w/39-0000000016_2.orc

    The output should look as follows:

    Processing data file /tmp/customers_w/39-0000000016_2.orc [length: 675]
    {"id":1,"name":"Bob Brown","ordered_items":["monitor","webcam"]}
    {"id":2,"name":"John Doe","ordered_items":["keyboard","microphone"]}
    {"id":3,"name":"Alice Green","ordered_items":["headphones","pad"]}
    {"id":4,"name":"Jane Smith","ordered_items":["laptop","mouse"]}
    ______________________________________________________________________