Cloudera Enterprise 6.3.x | Other versions

Using Apache Parquet Data Files with CDH

Apache Parquet is a columnar storage format available to any component in the Hadoop ecosystem, regardless of the data processing framework, data model, or programming language. The Parquet file format incorporates several features that support data warehouse-style operations:

  • Columnar storage layout - A query can examine and perform calculations on all values for a column while reading only a small fraction of the data from a data file or table.
  • Flexible compression options - Data can be compressed with any of several codecs. Different data files can be compressed differently.
  • Innovative encoding schemes - Sequences of identical, similar, or related data values can be represented in ways that save disk space and memory. The encoding schemes provide an extra level of space savings beyond overall compression for each data file.
  • Large file size - The layout of Parquet data files is optimized for queries that process large volumes of data, with individual files in the multimegabyte or even gigabyte range.

Parquet is automatically installed when you install CDH, and the required libraries are automatically placed in the classpath for all CDH components. Copies of the libraries are in /usr/lib/parquet or /opt/cloudera/parcels/CDH/lib/parquet.

CDH lets you use the component of your choice with the Parquet file format for each phase of data processing. For example, you can read and write Parquet files using Pig and MapReduce jobs. You can convert, transform, and query Parquet tables through Hive, Impala, and Spark. And you can interchange data files between all of these components.

Continue reading:

Compression for Parquet Files

For most CDH components, by default Parquet data files are not compressed. Cloudera recommends enabling compression to reduce disk usage and increase read and write performance.

You do not need to specify configuration to read a compressed Parquet file. However, to write a compressed Parquet file, you must specify the compression type. The supported compression types, the compression default, and how you specify compression depends on the CDH component writing the files.

Using Parquet Tables in Hive

To create a table named PARQUET_TABLE that uses the Parquet format, use a command like the following, substituting your own table name, column names, and data types:

hive> CREATE TABLE parquet_table_name (x INT, y STRING) STORED AS PARQUET;
  Note:
  • Once you create a Parquet table, you can query it or insert into it through other components such as Impala and Spark.
  • Set dfs.block.size to 256 MB in hdfs-site.xml.

If the table will be populated with data files generated outside of Impala and Hive, you can create the table as an external table pointing to the location where the files will be created:

hive> create external table parquet_table_name (x INT, y STRING)
  ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
  STORED AS
    INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
    OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
    LOCATION '/test-warehouse/tinytable';

To populate the table with an INSERT statement, and to read the table with a SELECT statement, see Using the Parquet File Format with Impala Tables.

To set the compression type to use when writing data, configure the parquet.compression property:

set parquet.compression=GZIP;
INSERT OVERWRITE TABLE tinytable SELECT * FROM texttable;

The supported compression types are UNCOMPRESSED, GZIP, and SNAPPY.

Using Parquet Tables in Impala

Impala can create tables that use Parquet data files, insert data into those tables, convert the data into Parquet format, and query Parquet data files produced by Impala or other components. The only syntax required is the STORED AS PARQUET clause on the CREATE TABLE statement. After that, all SELECT, INSERT, and other statements recognize the Parquet format automatically. For example, a session in the impala-shell interpreter might look as follows:

[localhost:21000] > create table parquet_table (x int, y string) stored as parquet;
[localhost:21000] > insert into parquet_table select x, y from some_other_table;
Inserted 50000000 rows in 33.52s
[localhost:21000] > select y from parquet_table where x between 70 and 100;

Once you create a Parquet table this way in Impala, you can query it or insert into it through either Impala or Hive.

The Parquet format is optimized for working with large data files. In Impala 2.0 and higher, the default size of Parquet files written by Impala is 256 MB; in lower releases, 1 GB. Avoid using the INSERT ... VALUES syntax, or partitioning the table at too granular a level, if that would produce a large number of small files that cannot use Parquet optimizations for large data chunks.

Inserting data into a partitioned Impala table can be a memory-intensive operation, because each data file requires a memory buffer to hold the data before it is written. Such inserts can also exceed HDFS limits on simultaneous open files, because each node could potentially write to a separate data file for each partition, all at the same time. Make sure table and column statistics are in place for any table used as the source for an INSERT ... SELECT operation into a Parquet table. If capacity problems still occur, consider splitting insert operations into one INSERT statement per partition.

Impala can query Parquet files that use the PLAIN, PLAIN_DICTIONARY, BIT_PACKED, and RLE encodings. Currently, Impala does not support RLE_DICTIONARY encoding. When creating files outside of Impala for use by Impala, make sure to use one of the supported encodings. In particular, for MapReduce jobs, parquet.writer.version must not be defined (especially as PARQUET_2_0) for writing the configurations of Parquet MR jobs. Data using the version 2.0 of Parquet writer might not be consumable by Impala, due to use of the RLE_DICTIONARY encoding. Use the default version of the Parquet writer and refrain from overriding the default writer version by setting the parquet.writer.version property or via WriterVersion.PARQUET_2_0 in the Parquet API.

For complete instructions and examples, see Using the Parquet File Format with Impala Tables.

Using Parquet Files in MapReduce

MapReduce requires Thrift in its CLASSPATH and in libjars to access Parquet files. It also requires parquet-format in libjars. Set up the following before running MapReduce jobs that access Parquet data files:

if [ -e /opt/cloudera/parcels/CDH ] ; then
    CDH_BASE=/opt/cloudera/parcels/CDH
else
    CDH_BASE=/usr
fi
THRIFTJAR=`ls -l $CDH_BASE/lib/hive/lib/libthrift*jar | awk '{print $9}' | head -1`
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$THRIFTJAR
export LIBJARS=`echo "$CLASSPATH" | awk 'BEGIN { RS = ":" } { print }' | grep parquet-format | tail -1`
export LIBJARS=$LIBJARS,$THRIFTJAR

hadoop jar my-parquet-mr.jar -libjars $LIBJARS

Reading Parquet Files in MapReduce

A simple map-only MapReduce job that reads Parquet files can be implemented using the example helper classes of the Parquet library. The TestReadParquet.java example demonstrates how to read a Parquet file in this manner.

Writing Parquet Files in MapReduce

The TestWriteParquet.java example demonstrates writing Parquet files. For writing, you must provide a schema. Specify the schema in the run method of the job before submitting it. You can then write records in the mapper by composing a Group value using the example classes and no key.

To set the compression type before submitting the job, use the setCompression method. The supported compression types are CompressionCodecName.UNCOMPRESSED, CompressionCodecName.GZIP, and CompressionCodecName.SNAPPY.

If input files are in Parquet format, the schema can be extracted using the getSchema method. See TestReadWriteParquet.java for an example of this.

Using Parquet Files in Pig

In CDH 5.12 and earlier, Pig could write to Parquet files using the ParquetStorer and ParquetLoader libraries by using a two-step process to write to Hive tables stored in the Parquet format:
  • First you write to the Parquet files.
  • Then you execute the DDL or DML statements from Hive.
This process is described below.

In CDH 5.13 and later, Pig can directly write to Parquet tables using HCatalog, the table management and storage layer for Hadoop. Both partitioned and non-partitioned tables are supported. This process is described below.

Continue reading:

Using the ParquetLoader and ParquetStorer Libraries to Write to Parquet Hive Tables with Pig

The following process writes to Parquet-formatted Hive tables with Pig using the ParquetLoader and the ParquetStorer libraries. It does not use HCatalog. This method of writing to Parquet-formatted Hive tables with Pig is supported in CDH 5.12 and earlier, and continues to be supported in CDH 5.13 and later. See the Apache documentation for Pig Latin Basics, which is the reference for the Pig scripting language.

  1. If the external table is created and populated, read the Parquet file into a Pig relation and specify the relation schema. In the following example, the relation is named A and the example directories and schema definition are displayed in a bold font. These can be replaced with your directory names and schema definition:
    grunt> A = LOAD '/user/hive/warehouse/web_logs_par/dd=2015-11-18'
                USING ParquetLoader AS (version:int, app:chararray,
                bytes:int, city:chararray, client_ip:chararray);
              
  2. Write the Parquet file from the Pig relation A into a table partition, specifying the partition location. The partition directory is created if it does not already exist. The example directories and schema definition are displayed in a bold font:
    grunt> store A into '/user/hive/warehouse/web_logs_par/dd=2015-11-19' USING ParquetStorer;
    To set the compression type, set the parquet.compression property before the first store instruction in a Pig script. For example, the following statement sets the compression type to gzip:
    grunt> SET parquet.compression gzip;
                 

    The supported compression types are uncompressed, gzip, and snappy, which is the default.

  3. Now that the partition data is in the appropriate directory of a Hive table, create the corresponding partition in Hive using Beeline. Example directories are displayed in a bold font:
    ALTER TABLE web_logs_par ADD PARTITION (dd='2015-11-19') LOCATION
                  '/user/hive/warehouse/web_logs_par/dd=2015-11-19';
                

    Now, the table contains the new partition and can be used from Hive or Impala.

Using HCatalog to Write to Parquet Hive Tables with Pig

The following example shows how to dynamically write to partitioned Hive tables in the Parquet format with Pig using HCatalog. In this example, we have a Hive table that stores foreign currency exchange rates over a period of time, which is partitioned by the year and month. The table was created with the following DDL statement:
CREATE TABLE fxbyyearmonth (price double) PARTITIONED BY (year string, month string) STORED AS PARQUET;
           
The above table is populated with the following data:
INSERT INTO fxbyyearmonth PARTITION (year='2016',month='01') VALUES (1.0,'USD');
INSERT INTO fxbyyearmonth PARTITION (year='2016',month='02') VALUES (1.2,'EUR');
INSERT INTO fxbyyearmonth PARTITION (year='2016',month='01') VALUES (261.0,'HUF');
INSERT INTO fxbyyearmonth PARTITION (year='2016',month='02') VALUES (1.3,'CHF');
INSERT INTO fxbyyearmonth PARTITION (year='2017',month='01') VALUES (1.2,'EUR');
INSERT INTO fxbyyearmonth PARTITION (year='2017',month='01') VALUES (1.0,'USD');
INSERT INTO fxbyyearmonth PARTITION (year='2017',month='03') VALUES (260.0,'HUF');
          
We want to move the data from the fxbyyearmonth table to another Hive table that is defined to store foreign currency exchange rates and is partitioned by the year, month, and the currency. This table was created with the following DDL statement and it is empty:
CREATE TABLE fxbyyearmonthccy (price double) PARTITIONED BY (year string, month string, currency
string) STORED AS PARQUET;
             
  1. Run the following command to bring up the Grunt shell while simultaneously invoking HCatalog:
    pig -useHCatalog
                
  2. Load the data into Pig using a relation named A:
    grunt> A = LOAD 'fxbyyearmonth' USING org.apache.hive.hcatalog.pig.HCatLoader();
               
    If you run the describe A command in the Grunt shell, it returns the following information:
    A: {price: double,currency: chararray,year: chararray,month: chararray}
                  
    This shows that the HCatLoader places the partition columns at the end of the relation A schema. These column names are displayed in bold font.
      Note:

    If you want to load only one partition, you must apply an additional filter operation. Filter operators push down information to HCatalog so it loads only that part of the table. For example:

    grunt> B = FILTER A BY year == '2016';
                  
  3. Dynamically write partitions. HCatStorer can dynamically write partitions when the partition columns have the positions that the schema expects. In our example, the dataset must be ordered as: price, currency, year, month. This is the same order that the Pig shell displayed when you ran the describe A command in Step 2. To dynamically write the partitions in Pig, run:
    grunt> B = FOREACH A generate price, currency, year, month;
                
  4. Store the dataset contained in B into the Hive table fxbyyearmonthccy:
    grunt> STORE B INTO 'fxbyyearmonthccy' USING org.apache.hive.hcatalog.pig.HCatStorer();
                

    This stores everything from the dataset in B into the Hive table by putting all records in their correct partitions dynamically. All partition columns must be present for this to work properly. After running the STORE command, the fxbyyearmonthccy table contains the dataset and can be used from Hive or Impala.

      Note:

    You can also statically specify the target partition. In this case, you do not need the columns to be present in the dataset. For example, you can filter for only a certain partition with Pig, and then use only the price column:

    grunt> C = FILTER B BY (year == '2016') AND (month=='01') AND (currency=='EUR');
    grunt> D = FOREACH C GENERATE price;
    grunt> STORE D into 'fxbyyearmonthccy' USING
    org.apache.hive.hcatalog.pig.HCatStorer('year=2016,month=01,currency=EUR');
                  

Using Parquet Files in Spark

Spark SQL supports loading and saving DataFrames from and to a variety of data sources and has native support for Parquet. For information about Parquet, see Using Apache Parquet Data Files with CDH.

To read Parquet files in Spark SQL, use the SQLContext.read.parquet("path") method.

To write Parquet files in Spark SQL, use the DataFrame.write.parquet("path") method.

To set the compression type, configure the spark.sql.parquet.compression.codec property:
sqlContext.setConf("spark.sql.parquet.compression.codec","codec") 
The supported codec values are: uncompressed, gzip, lzo, and snappy. The default is gzip.

Currently, Spark looks up column data from Parquet files by using the names stored within the data files. This is different than the default Parquet lookup behavior of Impala and Hive. If data files are produced with a different physical layout due to added or reordered columns, Spark still decodes the column data correctly. If the logical layout of the table is changed in the metastore database, for example through an ALTER TABLE CHANGE statement that renames a column, Spark still looks for the data using the now-nonexistent column name and returns NULLs when it cannot locate the column values. To avoid behavior differences between Spark and Impala or Hive when modifying Parquet tables, avoid renaming columns, or use Impala, Hive, or a CREATE TABLE AS SELECT statement to produce a new table and new set of Parquet files containing embedded column names that match the new layout.

For an example of writing Parquet files to Amazon S3, see Examples of Accessing S3 Data from Spark. For a similar example for Microsoft ADLS, see Examples of Accessing ADLS Data from Spark.

For general information and examples of Spark working with data in different file formats, see Accessing External Storage from Spark.

Importing Data into Parquet Format Using Sqoop

There are two ways to import data into Parquet format:
  • Kite Dataset API based implementation
  • Parquet Hadoop API based implementation
  Note: Both the Kite Dataset API based implementation and the Parquet Hadoop API based implementation can be used for export as well.

Kite Dataset API Based Implementation

The Kite Dataset API based implementation executes the import command on a different code path than the text import. It creates the Hive table based on the Avro schema that is generated by connecting to the Hive metastore. Importing from text file format to Parquet file format can lead to unexpected behavioral changes. As Kite checks the Hive table schema before importing the data into it, if you want to import data that has a schema incompatible with the Hive table's schema, there will be an error.

Kite Dataset API based implementation uses snappy codec for compression by default. It also supports gzip codec.

Parquet Hadoop API Based Implementation

The Parquet Hadoop API based implementation builds the Hive CREATE TABLE statement and executes the LOAD DATA INPATH command the same way the text import does. It does not check the Hive table's schema before importing the data into it, therefore, it is possible to successfully import data which has a schema that is incompatible with the Hive table's schema into Hive. However, there will be an error later, during a Hive read operation.

Parquet Hadoop API bases implementation does not use any compressor by default. It supports snappy and gzip codec.

The Parquet Hadoop API based implementation has the following advantages over the Kite Dataset API based implementation:
  • Connection to HiveServer2 is supported by the --hs2-url option. It provides better security features.
  • Import into S3 is supported.
  • The imported table can consist other than alphanumeric and '_' character. For example, it can contain a '.' character in its name.
  • Import when the /tmp directory and the target directory are in different encryption zones is supported.

Specify the Data Importing Implementation

The option --parquet-configurator-implementation is used to specify the implementation for data import into Parquet format:

  • kite: Kite Dataset API based implementation. This is the default value.
  • hadoop: Parquet Hadoop API based implementation. This is the recommended value.

If the --parquet-configurator-implementation option is absent, Sqoop checks the value of the parquetjob.configurator.implementation property. This property can be specified using -D in the Sqoop command or in the sqoop-site.xml.

If the parquetjob.configurator.implementation property is also absent, Sqoop uses the Kite Dataset API based implementation by default.

Example commands:

The option --parquet-configurator-implementation specifies that the Kite Dataset API based implementation is used for data import into Parquet:
sqoop import --connect $MYCONN --username $MYUSER --password $MYPSWD --table "employees_test" --as-parquetfile --parquet-configurator-implementation kite
The option --parquet-configurator-implementation specifies that the Parquet Hadoop API based implementation is used for data import into Parquet:
sqoop import --connect $MYCONN --username $MYUSER --password $MYPSWD --table "employees_test" --as-parquetfile --parquet-configurator-implementation hadoop
The property parquejob.configurator.implementation specifies that the Parquet Hadoop API based implementation is used for data import into Parquet:
sqoop import -Dparquetjob.configurator.implementation=hadoop --connect $MYCONN --username $MYUSER --password $MYPSWD --table "employees_test" --as-parquetfile
The option --compression-codec specifies that snappy compression is used for data import into Parquet:
sqoop import --connect $MYCONN --username $MYUSER --password $MYPSWD --table "employees_test" --as-parquetfile --parquet-configurator-implementation hadoop --compression-codec snappy

Import Decimal Data Types into Parquet Format Using Sqoop

The scope of this page is for users with upgraded CDH 6.2 clusters. All new CDH 6.2 clusters have decimal support enabled by default when using Parquet.

Databases vary in the way they store DECIMAL data types. Therefore, when you’re importing a table into Parquet format, it is essential to take measures to avoid any complications. Use this guide to learn how to import tables containing DECIMAL data types into Parquet format successfully.

  Note: Decimal support is turned on by default in a newly installed CDH 6.2 cluster. However, this configuration is not picked up by an Oozie Sqoop action. This is due to Oozie generating its own sqoop-site.xml instead of picking up the one generated by Cloudera Manager. Therefore, the user has to manually specify the necessary Hadoop configurations in their command.

Configurations for Sqoop Decimal Support with Parquet

This section describes the different configurations used for importing tables with DECIMAL data types into Parquet format. For code examples and usage guidance, see Code Examples.

Enable Decimals

The Enable Decimals configuration allows you to import tables that contain DECIMAL data types. Without this, DECIMAL values convert to strings.

The following property enables decimal support:

Dsqoop.parquet.logical_types.decimal.enable=true

Configurator Implementation

The parquet-configurator-implementation is used to specify the implementation for importing data into Parquet format. There are two options for this: Kite or Hadoop. For more information, read Importing Data into Parquet Format Using Sqoop. For decimal support, users must use the hadoop configuration.

The following property sets the configurator implementation to hadoop:

Dparquetjob.configurator.implementation=hadoop

Default Precision and Scale

Specify a default precision and scale to import tables containing DECIMAL data types where precision and scale are not previously specified in the table.

The following properties set the default precision and scale:

  • sqoop.avro.logical_types.decimal.default.precision=38

    Specifies the default precision to a total of 38 digits.

  • sqoop.avro.logical_types.decimal.default.scale=10

    Specifies the default scale to ten digits beyond the decimal point.

      Note: The above configuration parameters are shared with Avro. Default values do not replace any DECIMAL data types where precision and scale are already specified.

Enable Padding

Padded numbers have a defined number of digits. If there are not enough numbers to fulfill the precision and scale, then zeros (0s) are used to pad out any missing digits.

Some databases, such as Oracle and Postgres, do not use padding to store DECIMAL data types. As such, the user must enable padding manually.

sqoop.avro.decimal_padding.enable=true

  Note: The above configuration parameter is shared with Avro.

Code Examples

Importing a DECIMAL Data Type that has no Precision and Padding

To import a column of DECIMAL data types without precision and scale previously defined, the user must:

  1. Enable decimals
  2. Enable padding
  3. Specify values for precision and padding

Example: Import as a Parquet File to a Specified HDFS Directory

In the below example, decimals and padding are enabled, and values are set for precision and padding.

The following command imports a NUMBER or NUMERIC column:

sqoop import -Dparquetjob.configurator.implementation=hadoop -Dsqoop.parquet.logical_types.decimal.enable=true
-Dsqoop.avro.decimal_padding.enable=true 
-Dsqoop.avro.logical_types.decimal.default.precision=38 
-Dsqoop.avro.logical_types.decimal.default.scale=10 --connect $MYCONN --username $MYUSER --password $MYPASS --as-parquetfile --delete-target-dir --table TEST_ONE -m 1

Example: Import as a Parquet File and Move Data to Hive

In the below example, the parquet file is imported into the HDFS, and then Hive moves the data into a Hive directory. Decimals and padding are enabled, and values are set for precision and padding.

The following command imports a NUMBER or NUMERIC column:

sqoop import -Dparquetjob.configurator.implementation=hadoop 
-Dsqoop.parquet.logical_types.decimal.enable=true -Dsqoop.avro.decimal_padding.enable=true 
-Dsqoop.avro.logical_types.decimal.default.precision=38 
-Dsqoop.avro.logical_types.decimal.default.scale=10 --connect $MYCONN --username $MYUSER --password $MYPASS --as-parquetfile
--delete-target-dir --table TEST_ONE -m 1 --hive-import
--create-hive-table

Importing a DECIMAL Data Type that Contains Precision and Padding

To import a column of DECIMAL data types with precision and scale previously defined, the user must:

  1. Enable decimals
  2. Enable padding

Example: Import as a Parquet File to a Specified HDFS Directory

In the below example, decimals and padding are enabled, but no values are set for default precision and padding.

The following command imports a NUMBER(10, 3) or NUMERIC(10,3) column:

sqoop import -Dparquetjob.configurator.implementation=hadoop 
-Dsqoop.parquet.logical_types.decimal.enable=true
-Dsqoop.avro.decimal_padding.enable=true --connect $MYCONN --username $MYUSER --password $MYPASS --as-parquetfile --delete-target-dir --table TEST_ONE -m 1

Example: Import as a Parquet File and Move Data to Hive

In the below example, the parquet file is imported to the HDFS, and then Hive moves the data into a Hive directory. Decimals and padding are enabled, but no values are set for precision and padding.

The following command imports a NUMBER(10, 3) or NUMERIC(10,3) column:

sqoop import -Dparquetjob.configurator.implementation=hadoop 
-Dsqoop.parquet.logical_types.decimal.enable=true 
-Dsqoop.avro.decimal_padding.enable=true --connect $MYCONN --username $MYUSER --password $MYPASS --as-parquetfile
--delete-target-dir --table TEST_ONE -m 1 --hive-import
--create-hive-table

Parquet File Interoperability

Impala has always included Parquet support, using high-performance code written in C++ to read and write Parquet files. The Parquet JARs for use with Hive, Pig, and MapReduce are available with CDH 4.5 and higher. Using the Java-based Parquet implementation on a CDH release lower than CDH 4.5 is not supported.

A Parquet table created by Hive can typically be accessed by Impala 1.1.1 and higher with no changes, and vice versa. Before Impala 1.1.1, when Hive support for Parquet was not available, Impala wrote a dummy SerDe class name into each data file. These older Impala data files require a one-time ALTER TABLE statement to update the metadata for the SerDe class name before they can be used with Hive. See Apache Impala for details.

A Parquet file written by Hive, Impala, Pig, or MapReduce can be read by any of the others. Different defaults for file and block sizes, compression and encoding settings, and so on might cause performance differences depending on which component writes or reads the data files. For example, Impala typically sets the HDFS block size to 256 MB and divides the data files into 256 MB chunks, so that each I/O request reads an entire data file.

In CDH 5.5 and higher, non-Impala components that write Parquet files include extra padding to ensure that the Parquet row groups are aligned with HDFS data blocks. The maximum amount of padding is controlled by the parquet.writer.max-padding setting, specified as a number of bytes. By default, up to 8 MB of padding can be added to the end of each row group. This alignment helps prevent remote reads during Impala queries. The setting does not apply to Parquet files written by Impala, because Impala always writes each Parquet file as a single HDFS data block.

Each release may have limitations. The following are current limitations in CDH:

  • Parquet has not been tested with HCatalog. Without HCatalog, Pig cannot correctly read dynamically partitioned tables; this is true for all file formats.
  • Impala supports table columns using nested data types or complex data types such as map, struct, or array only in Impala 2.3 (corresponding to CDH 5.5) and higher. Impala 2.2 (corresponding to CDH 5.4) can query only the scalar columns of Parquet files containing such types. Lower releases of Impala cannot query any columns from Parquet data files that include such types.
  • Cloudera supports some but not all of the object models from the upstream Parquet-MR project. Currently supported object models are:
    • parquet-avro (recommended for Cloudera users)
    • parquet-thrift
    • parquet-protobuf
    • parquet-pig
    • The Impala and Hive object models built into those components, not available in external libraries. (CDH does not include the parquet-hive module of the parquet-mr project, because recent versions of Hive have Parquet support built in.)

Parquet File Structure

To examine the internal structure and data of Parquet files, you can use the parquet-tools command that comes with CDH. Make sure this command is in your $PATH. (Typically, it is symlinked from /usr/bin; sometimes, depending on your installation setup, you might need to locate it under a CDH-specific bin directory.) The arguments to this command let you perform operations such as:
  • cat: Print a file's contents to standard out. In CDH 5.5 and higher, you can use the -j option to output JSON.
  • head: Print the first few records of a file to standard output.
  • schema: Print the Parquet schema for the file.
  • meta: Print the file footer metadata, including key-value properties (like Avro schema), compression ratios, encodings, compression used, and row group information.
  • dump: Print all data and metadata.
Use parquet-tools -h to see usage information for all the arguments. Here are some examples showing parquet-tools usage:
$ # Be careful doing this for a big file! Use parquet-tools head to be safe.
$ parquet-tools cat sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0

year = 1992
month = 1
day = 3
...

$ parquet-tools head -n 2 sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0

year = 1992
month = 1
day = 3
...

$ parquet-tools schema sample.parq
message schema {
  optional int32 year;
  optional int32 month;
  optional int32 day;
  optional int32 dayofweek;
  optional int32 dep_time;
  optional int32 crs_dep_time;
  optional int32 arr_time;
  optional int32 crs_arr_time;
  optional binary carrier;
  optional int32 flight_num;
...

$ parquet-tools meta sample.parq
creator:             impala version 2.2.0-cdh5.4.3 (build 517bb0f71cd604a00369254ac6d88394df83e0f6)

file schema:         schema
-------------------------------------------------------------------
year:                OPTIONAL INT32 R:0 D:1
month:               OPTIONAL INT32 R:0 D:1
day:                 OPTIONAL INT32 R:0 D:1
dayofweek:           OPTIONAL INT32 R:0 D:1
dep_time:            OPTIONAL INT32 R:0 D:1
crs_dep_time:        OPTIONAL INT32 R:0 D:1
arr_time:            OPTIONAL INT32 R:0 D:1
crs_arr_time:        OPTIONAL INT32 R:0 D:1
carrier:             OPTIONAL BINARY R:0 D:1
flight_num:          OPTIONAL INT32 R:0 D:1
...

row group 1:         RC:20636601 TS:265103674
-------------------------------------------------------------------
year:                 INT32 SNAPPY DO:4 FPO:35 SZ:10103/49723/4.92 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
month:                INT32 SNAPPY DO:10147 FPO:10210 SZ:11380/35732/3.14 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
day:                  INT32 SNAPPY DO:21572 FPO:21714 SZ:3071658/9868452/3.21 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dayofweek:            INT32 SNAPPY DO:3093276 FPO:3093319 SZ:2274375/5941876/2.61 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dep_time:             INT32 SNAPPY DO:5367705 FPO:5373967 SZ:28281281/28573175/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_dep_time:         INT32 SNAPPY DO:33649039 FPO:33654262 SZ:10220839/11574964/1.13 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
arr_time:             INT32 SNAPPY DO:43869935 FPO:43876489 SZ:28562410/28797767/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_arr_time:         INT32 SNAPPY DO:72432398 FPO:72438151 SZ:10908972/12164626/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
carrier:              BINARY SNAPPY DO:83341427 FPO:83341558 SZ:114916/128611/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
flight_num:           INT32 SNAPPY DO:83456393 FPO:83488603 SZ:10216514/11474301/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
...

parquet-tools also provides the functionality to merge Parquet files, but Cloudera strongly discourages its use. Historically, good performance has been associated with large Parquet files. For this reason, many users attempted using parquet-tools merge to achieve good performance. In practice, however, parquet-tools merge doesn't improve Hive performance, and it degrades Impala performance significantly.

Good performance is not a result of large files but large row groups instead up to the HDFS block size. However, parquet-tools merge does not merge row groups, and it just places them one after the other. It was intended to be used for Parquet files that are already arranged in row groups of the desired size. When used to merge many small files, the resulting file will still contain small row groups and you lose the advantage of larger files.

The reason why you may want to combine small files in the first place is frequent ingestion. Because each ingestion will create small files, frequent ingestion will create lots of small files. Periodically, a compaction process is needed to combine these small files into bigger ones. At this moment, the only solution to this problem is rewriting the data. See the Cloudera blog post titled How-to: Ingest and Query “Fast Data” with Impala (Without Kudu) about the best practice for doing compaction while keeping the data continuously available at the same time.

Examples of Java Programs to Read and Write Parquet Files

You can find full examples of Java code at the Cloudera Parquet examples GitHub repository.

The TestReadWriteParquet.java example demonstrates the "identity" transform. It reads any Parquet data file and writes a new file with exactly the same content.

The TestReadParquet.java example reads a Parquet data file, and produces a new text file in CSV format with the same content.

The TestWriteParquet.java example demonstrates specifying a schema for writing the first two columns of a CSV input to Parquet output.

Page generated August 29, 2019.