Iceberg Connector

Overview

openLooKeng Iceberg is an open table format for huge analytic datasets. The Iceberg connector allows querying data stored in files written in Iceberg format. Iceberg data files can be stored in either Parquet, ORC format, as determined by the format property in the table definition. The table format defaults to ORC.

Requirements

To use Iceberg, you need:

  • Network access from the openLooKeng coordinator and workers to the distributed object storage.
  • Access to a Hive metastore service (HMS) .
  • Network access from the openLooKeng coordinator to the HMS. Hive metastore access with the Thrift protocol defaults to using port 9083.

Hive metastore catalog

The Hive metastore catalog is the default implementation. When using it, the Iceberg connector supports the same metastore configuration properties as the Hive connector. At a minimum, hive.metastore.uri must be configured.

connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083

General configuration

These configuration properties are independent of which catalog implementation is used.

Iceberg general configuration properties With the following content creationetc/catalog/iceberg.properties,please repalcelocalhost:9083 with the right ip and port:

Attribute nameAttribute valuenecessaryDescription
connector.nameicebergtrueconnector.name
hive.metastore.urithrift://localhost:9083trueHive connector.uri
iceberg.file-formatORCfalseDefine the data storage file format for Iceberg tables. Possible values are PARQUET、ORC
iceberg.compression-codecZSTDfalseThe compression codec to be used when writing files. Possible values are (NONE SNAPPY LZ4 ZSTD GZIP)
iceberg.use-file-size-from-metadatatruefalseRead file sizes from metadata instead of file system. This property should only be set as a workaround for this issue. The problem was fixed in Iceberg version 0.11.0.
iceberg.max-partitions-per-writer100falseMaximum number of partitions handled per writer.
iceberg.unique-table-locationtruefalseUse randomized, unique table locations.
iceberg.dynamic-filtering.wait-timeout0sfalseMaximum duration to wait for completion of dynamic filters during split generation.
iceberg.table-statistics-enabledtruefalseEnables Table statistics. The equivalent catalog session property is for session specific use. Set to to disable statistics. Disabling statistics means that Cost based optimizations can not make smart decisions about the query plan.statistics_enabledfalse
iceberg.minimum-assigned-split-weight0.05falseA decimal value in the range (0, 1] used as a minimum for weights assigned to each split. A low value may improve performance on tables with small files. A higher value may improve performance for queries with highly skewed aggregations or joins.

SQL support

This connector provides read access and write access to data and metadata in Iceberg. In addition to the globally available and read operation statements, the connector supports the following features:

  • create
CREATE TABLE ordersg (
    order_id BIGINT,
    order_date DATE,
    account_number BIGINT,
    customer VARCHAR,
    country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country']);
  • INSERT
alter table ordersg add COLUMN zip varchar;
  • RENAME
ALTER TABLE ordersg RENAME COLUMN zip TO zip_r;
  • DELETE
ALTER TABLE ordersg DROP COLUMN zip_r;
  • INSERT
insert into ordersg values(1,date'1988-11-09',666888,'tim','US');
  • DELETE
Delete from ordersg where order_id = 1;
  • UPDATE
update ordersg set customer = 'Alice' where order_id = 2;
  • SELECT
select * from ordersg; 

Partitioned tables

Iceberg supports partitioning by specifying transforms over the table columns. A partition is created for each unique tuple value produced by the transforms. Identity transforms are simply the column name. Other transforms are:

TransformDescription
year(ts)A partition is created for each year. The partition value is the integer difference in years between ts and January 1 1970.
month(ts)A partition is created for each month of each year. The partition value is the integer difference in months between ts and January 1 1970.
day(ts)A partition is created for each day of each year. The partition value is the integer difference in days between ts and January 1 1970.
hour(ts)A partition is created hour of each day. The partition value is a timestamp with the minutes and seconds set to zero.
bucket(x,nbuckets)The data is hashed into the specified number of buckets. The partition value is an integer hash of x, with a value between 0 and nbuckets - 1 inclusive.
truncate(s,nchars)The partition value is the first nchars characters of s.

In this example, the table is partitioned by the month of order_date, a hash of account_number (with 10 buckets), and country:

CREATE TABLE ordersg (
                         order_id BIGINT,
                         order_date DATE,
                         account_number BIGINT,
                         customer VARCHAR,
                         country VARCHAR)
    WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country']);

Manually Modifying Partitions

ALTER TABLE ordersg SET PROPERTIES partitioning = ARRAY['month(order_date)'];

For partitioned tables, the Iceberg connector supports the deletion of entire partitions if the WHERE clause specifies filters only on the identity-transformed partitioning columns, that can match entire partitions. Given the table definition above, this SQL will delete all partitions for which country is US:

DELETE FROM iceberg.testdb.ordersg WHERE country = 'US';

Rolling back to a previous snapshot

Iceberg supports a “snapshot” model of data, where table snapshots are identified by an snapshot IDs.

The connector provides a system snapshots table for each Iceberg table. Snapshots are identified by BIGINT snapshot IDs. You can find the latest snapshot ID for table customer_orders by running the following command:

SELECT snapshot_id FROM "ordersg $snapshots" ORDER BY committed_at DESC LIMIT 1;
snapshot_id
921254093881523606
535467754709887442
343895437069940394
34i302849038590348
(4 rows)

A SQL procedure system.rollback_to_snapshot allows the caller to roll back the state of the table to a previous snapshot id:

CALL iceberg.system.rollback_to_snapshot('testdb', 'ordersg', 8954597067493422955);

Metadata tables

The connector exposes several metadata tables for each Iceberg table. These metadata tables contain information about the internal structure of the Iceberg table. You can query each metadata table by appending the metadata table name to the table name:

SELECT * FROM "ordersg$data";

$data table#

The $data table is an alias for the Iceberg table itself.

The statement:

SELECT * FROM "ordersg$data";

is equivalent to:

SELECT * FROM ordersg;

$properties table

The $properties table provides access to general information about Iceberg table configuration and any additional metadata key/value pairs that the table is tagged with.

You can retrieve the properties of the current snapshot of the Iceberg table by using the following query:

SELECT * FROM "ordersg$properties";

key | value | ———————–+———-+ write.format.default | PARQUET |

$history table#

The $history table provides a log of the metadata changes performed on the Iceberg table.

You can retrieve the changelog of the Iceberg table by using the following query:

SELECT * FROM "ordersg$history";

made_current_at | snapshot_id | parent_id | is_current_ancestor ———————————-+———————-+———————-+——————– 2022-08-19 05:42:37.854 UTC | 7464177163099901858 | 7924870943332311497 | true 2022-08-19 05:44:35.212 UTC | 2228056193383591891 | 7464177163099901858 | true

The output of the query has the following columns:

NameTypeDescription
made_current_attimestamp(3)with time zoneThe time when the snapshot became active
snapshot_idbigintThe identifier of the snapshot
parent_idbigintThe identifier of the parent snapshot
is_current_ancestorbooleanWhether or not this snapshot is an ancestor of the current snapshot

$snapshots table

The $snapshots table provides a detailed view of snapshots of the Iceberg table. A snapshot consists of one or more file manifests, and the complete table contents is represented by the union of all the data files in those manifests.

You can retrieve the information about the snapshots of the Iceberg table by using the following query:

SELECT * FROM "ordersg$snapshots";

| committed_at | snapshot_id | parent_id | operation | manifest_list | summary | | 2022-08-08 08:20:04.126 UTC | 7026585913702073835 | | append | hdfs://hadoop1:9000/home/gitama/hadoop/hive/user/hive/warehouse/test_100.db/orders08/metadata/snap-7026585913702073835-1-d0b5ba3d-6363-4f32-974e-79bb68d19423.avro | {changed-partition-count=0, total-equality-deletes=0, total-position-deletes=0, total-delete-files=0, total-files-size=0, total-records=0, total-data-files=0} | | 2022-08-08 08:21:58.343 UTC | 629134202395791160 | 7026585913702073835 | append | hdfs://hadoop1:9000/home/gitama/hadoop/hive/user/hive/warehouse/test_100.db/orders08/metadata/snap-629134202395791160-1-b6e9c1c3-0532-4bf8-a814-a159494e272d.avro | {changed-partition-count=1, added-data-files=1, total-equality-deletes=0, added-records=1, total-position-deletes=0, added-files-size=289, total-delete-files=0, total-files-size=289, total-records=1, total-data-files=1} |

The output of the query has the following columns:

NameTypeDescription
committed_attimestamp(3) with time zoneThe time when the snapshot became active
snapshot_idbigintThe identifier for the snapshot
parent_idbigintThe identifier for the parent snapshot
operationvarcharThe type of operation performed on the Iceberg table. The supported operation types in Iceberg are: -append when new data is appended -replace when files are removed and replaced without changing the data in the table -overwrite when new data is added to overwrite existing data -delete when data is deleted from the table and no new data is added
manifest_listvarcharThe list of avro manifest files containing the detailed information about the snapshot changes.
summarymap(varchar, varchar)A summary of the changes made from the previous snapshot to the current snapshot

$manifests table#

The $manifests table provides a detailed overview of the manifests corresponding to the snapshots performed in the log of the Iceberg table. You can retrieve the information about the manifests of the Iceberg table by using the following query:

SELECT * FROM "ordersg$manifests";

Path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partitions
—————————————+———————+———————+———————+——————-+——————–+——————–+——————–+——————–+——————- hdfs://hadoop1:9000/home/gitama/hadoop/hive/user/hive/warehouse/test_100.db/orders08/metadata/b6e9c1c3-0532-4bf8-a814-a159494e272d-m0.avro | 6534 | 0 | 629134202395791160 | 1 | 0 | 0 | [ ]

The output of the query has the following columns:

NameTypeDescription
pathvarcharThe manifest file location
lengthbigintlength
partition_spec_idintegerpartition_spec_id
added_snapshot_idbigintadded_snapshot_id
added_data_files_countintegerThe number of data files with status ADDED in the manifest file
existing_data_files_countintegerThe number of data files with status EXISTING in the manifest file
deleted_data_files_countintegerThe number of data files with status DELETED in the manifest file
partitionsarray(row(contains_null boolean, contains_nan boolean, lower_bound varchar, upper_bound varchar))Partition range metadata

$partitions table

The $partitions table provides a detailed overview of the partitions of the Iceberg table. You can retrieve the information about the partitions of the Iceberg table by using the following query:

SELECT * FROM "ordersg$partitions";

| record_count | file_count | total_size | data | ————–+————-+———————+———————+————————————| 1 | 1 | 289 | {id={min=null, max=null, null_count=0, nan_count=null}, name={min=null, max=null, null_count=0, nan_count=null}} |

The output of the query has the following columns: Partitions columns

NameTypeDescription
record_countbigintThe number of records in the partition
file_countbigintThe number of files mapped in the partition
total_sizebigintThe size of all the files in the partition
datarow(… row (min …, max … , null_count bigint, nan_count bigint))Partition range metadat

$files table#

The $files table provides a detailed overview of the data files in current snapshot of the Iceberg table.

To retrieve the information about the data files of the Iceberg table use the following query:

SELECT * FROM "ordersg$files";

| content | file_path | file_format | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids |
| 0 | hdfs://192.168.31.120:9000/user/hive/warehouse/orders19/data/20220819_034313_39152_vdmku-1709db2a-dc6f-4ef9-bb77-23f4c150801f.orc | ORC | 1 | 354 | | {1=1, 2=1, 3=1}","{1=0, 2=0, 3=0} | | | | | | |
| 0 | hdfs://192.168.31.120:9000/user/hive/warehouse/orders19/data/20220819_054009_11365_xq568-1803130c-6b7b-4da6-b460-dfb44f176ef4.orc | ORC | 1 | 413 | | {1=1, 2=1, 3=1, 4=1} | {1=0, 2=0, 3=0, 4=1} | | | | | | |

The output of the query has the following columns: Files columns

NameTypeDescription
contentintegerType of content stored in the file. The supported content types in Iceberg are: -DATA(0) - POSITION_DELETES(1) - EQUALITY_DELETES(2)
file_pathvarcharThe data file location
file_formatvarcharThe format of the data file
record_countbigintThe number of entries contained in the data file
file_size_in_bytesbigintThe data file size
column_sizesmap(integer, bigint)Mapping between the Iceberg column ID and its corresponding size in the file
value_countsmap(integer, bigint)Mapping between the Iceberg column ID and its corresponding count of entries in the file
null_value_countsmap(integer, bigint)Mapping between the Iceberg column ID and its corresponding count of NULL values in the file
nan_value_countsmap(integer, bigint)Mapping between the Iceberg column ID and its corresponding count of non numerical values in the file
lower_boundsmap(integer, bigint)Mapping between the Iceberg column ID and its corresponding lower bound in the file
upper_boundsmap(integer, bigint)Mapping between the Iceberg column ID and its corresponding upper bound in the file
key_metadatavarbinaryMetadata about the encryption key used to encrypt this file, if applicable
split_offsetsarray(bigint)List of recommended split locations
equality_idsarray(integer)The set of field IDs used for equality comparison in equality delete files

ALTER TABLE EXECUTE

The connector supports the following commands for use with ALTER TABLE EXECUTE(For details, see Merging Files).

File merging

The optimize command is used for rewriting the active content of the specified table so that it is merged into fewer but larger files. In case that the table is partitioned, the data compaction acts separately on each partition selected for optimization. This operation improves read performance.

All files with a size below the optional file_size_threshold parameter (default value for the threshold is 100MB) are merged:

ALTER TABLE ordersg EXECUTE optimize;

The following statement merges the files in a table that are under 10 megabytes in size:

ALTER TABLE ordersg EXECUTE optimize(file_size_threshold => '10MB');

ALTER TABLE SET PROPERTIES

The connector supports modifying the properties on existing tables using ALTER TABLE SET PROPERTIES.

The following table properties can be updated after a table is created:

  • format
  • partitioning
ALTER TABLE ordersg SET PROPERTIES format ='PARQUET';

Or to set the column country as a partition column on a table:

ALTER TABLE ordersg SET PROPERTIES partitioning = ARRAY[<existing partition columns>, 'country'];

You can use SHOW CREATE TABLE Ordersg to display the current values of the TABLE properties.

openLooKeng to Iceberg type mapping

openLooKeng typeIceberg type
BOOLEANBOOLEAN
INTEGERINT
BIGINTLONG
REALFLOAT
DOUBLEDOUBLE
DECIMAL(p,s)DECIMAL(p,s)
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP WITH TIME ZONETIMESTAMPTZ
VARCHARSTRING
VARBINARYBINARY
ROW(…)STRUCT(…)
ARRAY(e)LIST(e)
MAP(k,v)MAP(k,v)