OmniData Connector

Overview

The OmniData connector allows querying data stored in the remote Hive data warehouse. It pushes the operators of openLooKeng down to the storage node to achieve near-data calculation, thereby reducing the amount of network transmission data and improving computing performance.

For more information, please see: OmniData and OmniData connector.

Supported File Types

The following file types are supported for the OmniData connector:

  • ORC
  • Parquet
  • Text

Configuration

Create etc/catalog/omnidata.properties with the following configurations, replacing example.net:9083 with the correct host and port for your Hive metastore Thrift service:

connector.name=omnidata-openlookeng
hive.metastore.uri=thrift://example.net:9083

HDFS Configuration

For basic setups, openLooKeng configures the HDFS client automatically and does not require any configuration files. In some cases, such as when using federated HDFS or NameNode high availability, it is necessary to specify additional HDFS client options in order to access your HDFS cluster. To do so, add the hive.config.resources property to reference your HDFS config files:

hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml

Only specify additional configuration files if necessary for your setup. We also recommend reducing the configuration files to have the minimum set of required properties, as additional properties may cause problems.

The configuration files must exist on all openLooKeng nodes. If you are referencing existing Hadoop config files, make sure to copy them to any openLooKeng nodes that are not running Hadoop.

OmniData Configuration Properties

Property NameDescriptionDefault
hive.metastoreThe type of Hive metastorethrift
hive.config.resourcesAn optional comma-separated list of HDFS configuration files. These files must exist on the machines running openLooKeng. Only specify this if absolutely necessary to access HDFS. Example: /etc/hdfs-site.xml
hive.omnidata-enabledAllows push-down operators to execute on the storage side. If disabled, all operators will not be pushed down.true
hive.min-offload-row-numberIf the number of rows in the table is less than the threshold, all operators of the table will not be pushed down.500
hive.filter-offload-enabledAllows the filter operator to be pushed down to the storage side. If disabled, the filter operator will not be pushed down.true
hive.filter-offload-factorOnly when the selection rate of the filter operator is less than the threshold, it will be pushed down.0.25
hive.aggregator-offload-enabledAllows the aggregator operator to be pushed down to the storage side. If disabled, the aggregator operator will not be pushed down.true
hive.aggregator-offload-factorOnly when the aggregation rate of the aggregator operator is less than the threshold, it will be pushed down.0.25

For more configuration, please refer to the [Hive Configuration Properties](./hive.html#Hive Configuration Properties) chapter.

Querying OmniData

The SQL query plan after some operators are pushed down:

lk:tpch_flat_orc_date_1000> explain select sum(l_extendedprice * l_discount) as revenue
				 		 -> from
				 		 -> lineitem
				 		 -> where
				 		 -> l_shipdate >= DATE '1993-01-01'
				 		 -> and l_shipdate < DATE '1994-01-01'
				 		 -> and l_discount between 0.06 - 0.01 and 0.06 + 0.01
				 		 -> and l_quantity < 25;
				 							Query Plan
------------------------------------------------------------------------------------------------------
Output[revenue]
 Layout: [sum:double]
 Estimates: {rows: 4859991664 (40.74GB), cpu: 246.43G, memory: 86.00GB, network: 45.26GB}
 revenue := sum
└─ Aggregate(FINAL)
 Layout: [sum:double]
 Estimates: {rows: 4859991664 (40.74GB), cpu: 246.43G, memory: 86.00GB, network: 45.26GB}
 sum := sum(sum_4)
└─ LocalExchange[SINGLE] ()
 Layout: [sum_4:double]
 Estimates: {rows: 5399990738 (45.26GB), cpu: 201.17G, memory: 45.26GB, network: 45.26GB}
└─ RemoteExchange[GATHER]
 Layout: [sum_4:double]
 Estimates: {rows: 5399990738 (45.26GB), cpu: 201.17G, memory: 45.26GB, network: 45.26GB}
└─ Aggregate(PARTIAL)
 Layout: [sum_4:double]
 Estimates: {rows: 5399990738 (45.26GB), cpu: 201.17G, memory: 45.26GB, network: 0B}
 sum_4 := sum(expr)
└─ ScanProject[table = hive:tpch_flat_orc_date_1000:lineitem offload={ filter=[AND(AND(BETWEEN(l_discount, 0.05, 0.07), LESS_THAN(l_quantity, 25.0)), AND(GREATER_THAN_OR_EQUAL(l_shipdate, 8401), LESS_THAN(l_shipdate, 8766)))]} ]
 Layout: [expr:double]
 Estimates: {rows: 5999989709 (50.29GB), cpu: 100.58G, memory: 0B, network: 0B}/{rows: 5999989709 (50.29GB), cpu: 150.87G, memory: 0B, network: 0B}
 expr := (l_extendedprice) * (l_discount)
 l_extendedprice := l_extendedprice:double:5:REGULAR
 l_discount := l_discount:double:6:REGULAR

OmniData Connector Limitations

  • The OmniData service needs to be deployed on the storage node.
  • Only the pushdown of Filter, Aggregator, and Limit operators are supported.