Properties Reference

This section describes the most important config properties that may be used to tune openLooKeng or alter its behavior when required.

General Properties

join-distribution-type

  • Type: string
  • Allowed values: AUTOMATIC, PARTITIONED, BROADCAST
  • Default value: AUTOMATIC

The type of distributed join to use. When set to PARTITIONED, openLooKeng will use hash distributed joins. When set to BROADCAST, it will broadcast the right table to all nodes in the cluster that have data from the left table. Partitioned joins require redistributing both tables using a hash of the join key. This can be slower (sometimes substantially) than broadcast joins, but allows much larger joins. In particular broadcast joins will be faster if the right table is much smaller than the left. However, broadcast joins require that the tables on the right side of the join after filtering fit in memory on each node, whereas distributed joins only need to fit in distributed memory across all nodes. When set to AUTOMATIC, openLooKeng will make a cost based decision as to which distribution type is optimal. It will also consider switching the left and right inputs to the join. In AUTOMATIC mode, openLooKeng will default to hash distributed joins if no cost could be computed, such as if the tables do not have statistics. This can also be specified on a per-query basis using the join_distribution_type session property.

redistribute-writes

  • Type: boolean
  • Default value: true

This property enables redistribution of data before writing. This can eliminate the performance impact of data skew when writing by hashing it across nodes in the cluster. It can be disabled when it is known that the output data set is not skewed in order to avoid the overhead of hashing and redistributing all the data across the network. This can also be specified on a per-query basis using the redistribute_writes session property.

stack-trace-visible

  • Type: boolean
  • Allowed values: true, false
  • Default value: false

This property make exception stack trace which happen in openLooKeng visible or invisible. While it is set to be true, the stack trace is visible for all users. While it is set as default or false, the stack trace is invisible for all users.

openlookeng.admins

  • Type: string
  • Default value: No set

This property is used to set the admin user. The admin user has the authority to obtain all users query history and download all users WEB UI query results. The admin user is not set by default. When multiple admin users need to be set, use a comma to separate the multiple users.

http security headers properties

http-header.content-security-policy

  • Type: string
  • Default value: object-src 'none'

Property to set the http security header content-security-policy .

http-header.referrer-policy

  • Type: string
  • Default value: strict-origin-when-cross-origin

Property to set the http security header referrer-policy.

http-header.x-content-type-options

  • 类型: string
  • Default value: nosniff

Property to set the http security header content-security-policy .

http-header.x-frame-options

  • Type: string
  • Default value: deny

Property to set the http security header content-security-policy.

http-header.x-permitted-cross-domain-policies

  • Type=: string
  • Default value: master-only

Property to set the http security header x-permitted-cross-domain-policies .

http-header.x-xss-protection

  • Type: string
  • Default value: 1; mode=block

Property to set the http security header http-header.x-xss-protection.

Memory Management Properties

query.max-memory-per-node

  • Type: data size
  • Default value: JVM max memory * 0.1

This is the max amount of user memory a query can use on a worker. User memory is allocated during execution for things that are directly attributable to or controllable by a user query. For example, memory used by the hash tables built during execution, memory used during sorting, etc. When the user memory allocation of a query on any worker hits this limit it will be killed.

query.max-total-memory-per-node

  • Type: data size
  • Default value: JVM max memory * 0.3

This is the max amount of user and system memory a query can use on a worker. System memory is allocated during execution for things that are not directly attributable to or controllable by a user query. For example, memory allocated by the readers, writers, network buffers, etc. When the sum of the user and system memory allocated by a query on any worker hits this limit it will be killed. The value of query.max-total-memory-per-node must be greater than query.max-memory-per-node.

query.max-memory

  • Type: data size
  • Default value: 20GB

This is the max amount of user memory a query can use across the entire cluster. User memory is allocated during execution for things that are directly attributable to or controllable by a user query. For example, memory used by the hash tables built during execution, memory used during sorting, etc. When the user memory allocation of a query across all workers hits this limit it will be killed.

query.max-total-memory

  • Type: data size
  • Default value: query.max-memory * 2

This is the max amount of user and system memory a query can use across the entire cluster. System memory is allocated during execution for things that are not directly attributable to or controllable by a user query. For example, memory allocated by the readers, writers, network buffers, etc. When the sum of the user and system memory allocated by a query across all workers hits this limit it will be killed. The value of query.max-total-memory must be greater than query.max-memory.

memory.heap-headroom-per-node

  • Type: data size
  • Default value: JVM max memory * 0.3

This is the amount of memory set aside as headroom/buffer in the JVM heap for allocations that are not tracked by openLooKeng.

query.suspend-query-enabled

  • Type: boolean
  • Default value: false

Enables running query temporary suspension when system is in low resource situation.

query.max-suspended-queries

  • Type: integer
  • Default value: 10

Maximum number of queries to attempt suspension before starting of killing the queries. This property comes in effect only if query.suspend-query-enabled is configured true

Spilling Properties

experimental.spill-enabled

  • Type: boolean
  • Default value: false

Try spilling memory to disk to avoid exceeding memory limits for the query.

Spilling works by offloading memory to disk. This process can allow a query with a large memory footprint to pass at the cost of slower execution times. Spilling is supported for aggregations, joins (inner and outer), sorting, and window functions. This property will not reduce memory usage required for other join types.

Be aware that this is an experimental feature and should be used with care.

This config property can be overridden by the spill_enabled session property.

experimental.spill-order-by

  • Type: boolean
  • Default value: true

Try spilling memory to disk to avoid exceeding memory limits for the query when running sorting operators. This property must be used in conjunction with the experimental.spill-enabled property.

This config property can be overridden by the spill_order_by session property.

experimental.spill-window-operator

  • Type: boolean
  • Default value: true

Try spilling memory to disk to avoid exceeding memory limits for the query when running window operators; This property must be used in conjunction with the experimental.spill-enabled property.

This config property can be overridden by the spill_window_operator session property.

experimental.spill-build-for-outer-join-enabled

  • Type: boolean
  • Default value: false

Enables spill feature for right-outer and full-outer join operations.

This config property can be overridden by the spill_build_for_outer_join_enabled session property.

experimental.inner-join-spill-filter-enabled

  • Type: boolean
  • Default value: false

Enables bloom filter based build-side spill matching for probe side spill decision.

This config property can be overridden by the inner_join_spill_filter_enabled session property.

experimental.spill-reuse-tablescan

  • Type: boolean
  • Default value: false

Try spilling memory to disk to avoid exceeding memory limits for the query when running Reuse Exchange; This property must be used in conjunction with the experimental.spill-enabled property.

This config property can be overridden by the spill_reuse_tablescan session property.

experimental.spiller-spill-path

  • Type: string
  • No default value. Must be set when spilling is enabled

Directory where spilled content will be written. It can be a comma separated list to spill simultaneously to multiple directories, which helps to utilize multiple drives installed in the system.

When experimental.spiller-spill-to-hdfs is to true, experimental.spiller-spill-path must contain only a single directory.

It is not recommended to spill to system drives. Most importantly, do not spill to the drive on which the JVM logs are written, as disk overutilization might cause JVM to pause for lengthy periods, causing queries to fail.

experimental.spiller-max-used-space-threshold

  • Type: double
  • Default value: 0.9

If disk space usage ratio of a given spill path is above this threshold, this spill path will not be eligible for spilling.

experimental.spiller-threads

  • Type: integer
  • Default value: 4

Number of spiller threads. Increase this value if the default is not able to saturate the underlying spilling device (for example, when using RAID).

experimental.max-spill-per-node

  • Type: data size
  • Default value: 100 GB

Max spill space to be used by all queries on a single node.

experimental.query-max-spill-per-node

  • Type: data size
  • Default value: 100 GB

Max spill space to be used by a single query on a single node.

experimental.aggregation-operator-unspill-memory-limit

  • Type: data size
  • Default value: 4 MB

Limit for memory used for unspilling a single aggregation operator instance.

experimental.spill-threshold-reuse-tablescan

  • Type: int
  • Default value: 10 (in MB)

Limit for memory used for caching pages in Reuse Exchange.

experimental.spill-compression-enabled

  • Type: boolean
  • Default value: false

Enables data compression for pages spilled to disk

experimental.spill-encryption-enabled

  • Type: boolean
  • Default value: false

Enables using a randomly generated secret key (per spill file) to encrypt and decrypt data spilled to disk

experimental.spill-direct-serde-enabled

  • Type: boolean
  • Default value: false

Enables to serialize/read the page directly to/from the stream.

experimental.spill-prefetch-read-pages

  • Type: integer
  • Default value: 1

Sets number of pages prefetched while reading from spilled files.

experimental.spill-use-kryo-serialization

  • Type: boolean
  • Default value: false

Enables Kryo based serialization for spill to disk, instead of default java serializer.

experimental.revocable-memory-selection-threshold

  • Type: data size
  • Default value: 512 MB

Sets memory selection threshold for revocable memory of operator to directly allocate revocable memory for remaining bytes ready to revoke.

experimental.prioritize-larger-spilts-memory-revoke

  • Type: boolean
  • Default value: true

Enables to prioritize splits with larger revocable memory.

experimental.spill-non-blocking-orderby

  • Type: boolean
  • Default value: false

Enables order by operator to use asynchronous mechanism to spill, i.e it can accumulate input even when a spill is in progress and initiate a secondary spill when the secondary data accumulate exceeds a threshold or when the primary spill is completed, the default value of the threshold is the minimum between 20MB and 5% of available free memory. This property must be used in conjunction with the experimental.spill-enabled property.

This config property can be overridden by the spill_non_blocking_orderby session property.

experimental.spiller-spill-to-hdfs

  • Type: boolean
  • Default value: false

Enables spilling into HDFS. When this property is set to true the property experimental.spiller-spill-profile must be set and also experimental.spiller-spill-path must contain only a single path.

experimental.spiller-spill-profile

  • Type: string
  • No default value. Must be set when spilling to hdfs is enabled

This property defines the filesystem profile used to spill. The corresponding profile must exist in etc/filesystem. For example, if this property is set as experimental.spiller-spill-profile=spill-hdfs, a profile describing this filesystem spill-hdfs.properties must be created in etc/filesystem with necessary information including authentication type, config, and keytabs (if applicable, refer filesystem for details).

This property is required when experimental.spiller-spill-to-hdfs is set to true. It must be included in configuration files for all coordinators and all workers. The specified file system must be accessible by all workers, and they must be able to read from and write to the path declared in experimental.spiller-spill-path folder in the specified file system.

Exchange Properties

Exchanges transfer data between openLooKeng nodes for different stages of a query. Adjusting these properties may help to resolve inter-node communication issues or improve network utilization.

exchange.client-threads

  • Type: integer
  • Minimum value: 1
  • Default value: 25

Number of threads used by exchange clients to fetch data from other openLooKeng nodes. A higher value can improve performance for large clusters or clusters with very high concurrency, but excessively high values may cause a drop in performance due to context switches and additional memory usage.

exchange.concurrent-request-multiplier

  • Type: integer
  • Minimum value: 1
  • Default value: 3

Multiplier determining the number of concurrent requests relative to available buffer memory. The maximum number of requests is determined using a heuristic of the number of clients that can fit into available buffer space based on average buffer usage per request times this multiplier. For example, with an exchange.max-buffer-size of 32 MB and 20 MB already used and average size per request being 2MB, the maximum number of clients is multiplier * ((32MB - 20MB) / 2MB) = multiplier * 6. Tuning this value adjusts the heuristic, which may increase concurrency and improve network utilization.

exchange.max-buffer-size

  • Type: data size
  • Default value: 32MB

Size of buffer in the exchange client that holds data fetched from other nodes before it is processed. A larger buffer can increase network throughput for larger clusters and thus decrease query processing time, but will reduce the amount of memory available for other usages.

exchange.max-response-size

  • Type: data size
  • Minimum value: 1MB
  • Default value: 16MB

Maximum size of a response returned from an exchange request. The response will be placed in the exchange client buffer which is shared across all concurrent requests for the exchange.

Increasing the value may improve network throughput if there is high latency. Decreasing the value may improve query performance for large clusters as it reduces skew due to the exchange client buffer holding responses for more tasks (rather than hold more data from fewer tasks).

sink.max-buffer-size

  • Type: data size
  • Default value: 32MB

Output buffer size for task data that is waiting to be pulled by upstream tasks. If the task output is hash partitioned, then the buffer will be shared across all of the partitioned consumers. Increasing this value may improve network throughput for data transferred between stages if the network has high latency or if there are many nodes in the cluster.

query-resource-tracking

  • Type: boolean
  • Default value: false

Disable query level resource tracking and mitigation actions.

query-no-resource-retry-count

  • Type: integer
  • Default value: 5

Query shall be retried at the given number of times when resources available is not enough. This is enabled only when query-resource-tracking is set to be true. In case the value specified is less than 5, it will automatically be reverted to default value 5.

Failure Recovery handling Properties

Failure Retry Policies

failure.recovery.retry.profile

  • Type: String
  • Default value: default

This property defines the failure detection profile used to determine if failure has happened for a http client. The value <profile-name> set for this property has to correspond to <profile-name>.properties file in etc/failure-retry-policy/. In case no such profile is available, and this property is not set, “default” profile is used. For example, failure.recovery.retry.profile=test requires test.properties file to be present in etc/failure-retry-policy. The file test.properties must contain failure.recovery.retry.type specified.

failure.recovery.retry.type

  • Type: String
  • Default value: timeout

The failure detection mechanism in use. Default is timeout based failure detection.

timeout based failure detection.

Using this mechanism, HTTP client failures are retried for a specific duration before considering it as a permanent failure. Additional properties max.error.duration can be defined for this type of failure detection.

max-retry based failure detection.

Using this mechanism, HTTP client failures are retried for a specific number of times before considering it as a permanent failure. Additional properties max.retry.count and max.error.duration can be defined for this type of failure detection. Using this type of failure detection is configured to be used, max.retry.count times retry is performed before consulting the failure detector module. When the remote node is failed as per the failure detector module, HTTP client considers it a permanent failure. Otherwise, i.e. When remote worker node is alive but not sending response, retry happens for max.error.duration before considering it as permanent failure.

max.error.duration

  • Type: duration
  • Default value: 300s

The maximum amount of time coordinator waits for inter-task related errors to be resolved before it’s considered a permanent failure.

max.retry.count

  • Type: integer
  • Default value: 100

The maximum number of retry for failed task performed by the coordinator before consulting the failure detector module about the remote node status. This parameter is the minimum count before consulting the failure detection module. Hence, the actual number of failures may vary slightly based on the cluster size, and load on the cluster. This property is used only for max-retry based failure detection profiles. The minimum value for this parameter is 100.

Gossip Protocol Configurations for Failure Detection

failure-detection-protocol

  • Type: String
  • Default value: heartbeat

This property defines the type of failure detector in use. Default configuration is heartbeat failure detector. Gossip protocol can be enabled by specifying this parameter in config.properties file, with the value gossip. All nodes (i.e. coordinator as well as workers) in a cluster should have this property specified in their respective etc/config.properties file.

failure-detector.heartbeat-interval

  • Type: Duration
  • Default value: 500ms (500 miliseconds)

This is the interval of gossip between two nodes in the cluster. In gossip protocol, two workers are expected to gossip with higher frequency than the coordinator and a worker. In config.properties for the coordinator, this property can be set with a reasonably higher value, such as 5s (5 seconds). In workers, this property can be left to use the default value.

failure-detector.worker-gossip-probe-interval

  • Type: Duration
  • Default value: 5s (5 seconds)

Gossip protocol uses monitoring tasks (same as the heartbeat failure detector) to keep tab on the other nodes. This property specifies the interval of refreshing the monitoring tasks to trigger worker to worker gossip. This property, if needed to be configured with any other value than the default, should be specified only for the worker nodes. This parameter should have higher value than failure-detector.heartbeat-interval.

failure-detector.coordinator-gossip-probe-interval

  • Type: Duration
  • Default value: 5s (5 seconds)

Gossip protocol uses monitoring tasks (same as the heartbeat failure detector) to keep tab on the other nodes. This property specifies the interval of refreshing the monitoring tasks to trigger coordinator to worker gossip. This property, if needed to be configured with any other value than the default, should be specified only for the coordinator. This parameter should have higher value than failure-detector.heartbeat-interval and failure-detector.worker-gossip-probe-interval.

failure-detector.coordinator-gossip-collate-interval

  • Type: Duration
  • Default value: 2s (2 seconds)

This property specifies the interval in which the coordinator collates all the gossips it obtained from all the workers. This property has to be specified only for the coordinator. This parameter should have higher value than failure-detector.heartbeat-interval.

failure-detector.gossip-group-size

  • Type: Integer
  • Default value: Integer.MAX_VALUE

A worker should gossip with how many other workers in the cluster, is defined by this parameter. Any value higher than the cluster-size (i.e. the number of workers) implies all-to-all gossip. To keep the network overhead low, this value should be reasonably low for a big cluster (e.g. 10 for a cluster size of 100). On each refresh of the worker-monitoring tasks at the coordinator, the coordinator defines the list of worker URIs of size failure-detector.gossip-group-size to trigger worker-to-worker gossip.

Task Properties

task.concurrency

  • Type: integer
  • Restrictions: must be a power of two
  • Default value: 16

Default local concurrency for parallel operators such as joins and aggregations. This value should be adjusted up or down based on the query concurrency and worker resource utilization. Lower values are better for clusters that run many queries concurrently because the cluster will already be utilized by all the running queries, so adding more concurrency will result in slow downs due to context switching and other overhead. Higher values are better for clusters that only run one or a few queries at a time. This can also be specified on a per-query basis using the task_concurrency session property.

task.http-response-threads

  • Type: integer
  • Minimum value: 1
  • Default value: 100

Maximum number of threads that may be created to handle HTTP responses. Threads are created on demand and are cleaned up when idle, thus there is no overhead to a large value if the number of requests to be handled is small. More threads may be helpful on clusters with a high number of concurrent queries, or on clusters with hundreds or thousands of workers.

task.http-timeout-threads

  • Type: integer
  • Minimum value: 1
  • Default value: 3

Number of threads used to handle timeouts when generating HTTP responses. This value should be increased if all the threads are frequently in use. This can be monitored via the io.prestosql.server:name=AsyncHttpExecutionMBean:TimeoutExecutor JMX object. If ActiveCount is always the same as PoolSize, increase the number of threads.

task.info-update-interval

  • Type: duration
  • Minimum value: 1ms
  • Maximum value: 10s
  • Default value: 3s

Controls staleness of task information, which is used in scheduling. Larger values can reduce coordinator CPU load, but may result in suboptimal split scheduling.

task.max-partial-aggregation-memory

  • Type: data size
  • Default value: 16MB

Maximum size of partial aggregation results for distributed aggregations. Increasing this value can result in less network transfer and lower CPU utilization by allowing more groups to be kept locally before being flushed, at the cost of additional memory usage.

task.max-worker-threads

  • Type: integer
  • Default value: Node CPUs * 2

Sets the number of threads used by workers to process splits. Increasing this number can improve throughput if worker CPU utilization is low and all the threads are in use, but will cause an increased heap space usage. Setting the value too high may cause a drop in performance due to a context switching. The number of active threads is available via the RunningSplits property of the io.prestosql.execution.executor:name=TaskExecutor.RunningSplits JXM object.

task.min-drivers

  • Type: integer
  • Default value: task.max-worker-threads * 2

The target number of running leaf splits on a worker. This is a minimum value because each leaf task is guaranteed at least 3 running splits. Non-leaf tasks are also guaranteed to run in order to prevent deadlocks. A lower value may improve responsiveness for new tasks, but can result in underutilized resources. A higher value can increase resource utilization, but uses additional memory.

task.writer-count

  • Type: integer
  • Restrictions: must be a power of two
  • Default value: 1

The number of concurrent writer threads per worker per query. Increasing this value may increase write speed, especially when a query is not I/O bound and can take advantage of additional CPU for parallel writes (some connectors can be bottlenecked on CPU when writing due to compression or other factors). Setting this too high may cause the cluster to become overloaded due to excessive resource utilization. This can also be specified on a per-query basis using the task_writer_count session property.

Node Scheduler Properties

node-scheduler.max-splits-per-node

  • Type: integer
  • Default value: 100

The target value for the total number of splits that can be running for each worker node.

Using a higher value is recommended if queries are submitted in large batches (e.g., running a large group of reports periodically) or for connectors that produce many splits that complete quickly. Increasing this value may improve query latency by ensuring that the workers have enough splits to keep them fully utilized.

Setting this too high will waste memory and may result in lower performance due to splits not being balanced across workers. Ideally, it should be set such that there is always at least one split waiting to be processed, but not higher.

node-scheduler.max-pending-splits-per-task

  • Type: integer
  • Default value: 10

The number of outstanding splits that can be queued for each worker node for a single stage of a query, even when the node is already at the limit for total number of splits. Allowing a minimum number of splits per stage is required to prevent starvation and deadlocks.

This value must be smaller than node-scheduler.max-splits-per-node, will usually be increased for the same reasons, and has similar drawbacks if set too high.

node-scheduler.min-candidates

  • Type: integer
  • Minimum value: 1
  • Default value: 10

The minimum number of candidate nodes that will be evaluated by the node scheduler when choosing the target node for a split. Setting this value too low may prevent splits from being properly balanced across all worker nodes. Setting it too high may increase query latency and increase CPU usage on the coordinator.

node-scheduler.network-topology

  • Type: string
  • Allowed values: legacy, flat
  • Default value: legacy

Sets the network topology to use when scheduling splits. legacy will ignore the topology when scheduling splits. flat will try to schedule splits on the host where the data is located by reserving 50% of the work queue for local splits. It is recommended to use flat for clusters where distributed storage runs on the same nodes as openLooKeng workers.

Optimizer Properties

optimizer.dictionary-aggregation

  • Type: boolean
  • Default value: false

Enables optimization for aggregations on dictionaries. This can also be specified on a per-query basis using the dictionary_aggregation session property.

optimizer.optimize-hash-generation

  • Type: boolean
  • Default value: true

Compute hash codes for distribution, joins, and aggregations early during execution, allowing result to be shared between operations later in the query. This can reduce CPU usage by avoiding computing the same hash multiple times, but at the cost of additional network transfer for the hashes. In most cases it will decrease overall query processing time. This can also be specified on a per-query basis using the optimize_hash_generation session property.

It is often helpful to disable this property when using EXPLAIN in order to make the query plan easier to read.

optimizer.optimize-metadata-queries

  • Type: boolean
  • Default value: false

Enable optimization of some aggregations by using values that are stored as metadata. This allows openLooKeng to execute some simple queries in constant time. Currently, this optimization applies to max, min and approx_distinct of partition keys and other aggregation insensitive to the cardinality of the input (including DISTINCT aggregates). Using this may speed up some queries significantly.

The main drawback is that it can produce incorrect results if the connector returns partition keys for partitions that have no rows. In particular, the Hive connector can return empty partitions if they were created by other systems (openLooKeng cannot create them).

optimizer.push-aggregation-through-join

  • Type: boolean
  • Default value: true

When an aggregation is above an outer join and all columns from the outer side of the join are in the grouping clause, the aggregation is pushed below the outer join. This optimization is particularly useful for correlated scalar subqueries, which get rewritten to an aggregation over an outer join. For example:

SELECT * FROM item i
       WHERE i.i_current_price > (
           SELECT AVG(j.i_current_price) FROM item j
               WHERE i.i_category = j.i_category);

Enabling this optimization can substantially speed up queries by reducing the amount of data that needs to be processed by the join. However, it may slow down some queries that have very selective joins. This can also be specified on a per-query basis using the push_aggregation_through_join session property.

optimizer.push-table-write-through-union

  • Type: boolean
  • Default value: true

Parallelize writes when using UNION ALL in queries that write data. This improves the speed of writing output tables in UNION ALL queries because these writes do not require additional synchronization when collecting results. Enabling this optimization can improve UNION ALL speed when write speed is not yet saturated. However, it may slow down queries in an already heavily loaded system. This can also be specified on a per-query basis using the push_table_write_through_union session property.

optimizer.join-reordering-strategy

  • Type: string
  • Allowed values: AUTOMATIC, ELIMINATE_CROSS_JOINS, NONE
  • Default value: AUTOMATIC

The join reordering strategy to use. NONE maintains the order the tables are listed in the query. ELIMINATE_CROSS_JOINS reorders joins to eliminate cross joins where possible and otherwise maintains the original query order. When reordering joins it also strives to maintain the original table order as much as possible. AUTOMATIC enumerates possible orders and uses statistics-based cost estimation to determine the least cost order. If stats are not available or if for any reason a cost could not be computed, the ELIMINATE_CROSS_JOINS strategy is used. This can also be specified on a per-query basis using the join_reordering_strategy session property.

optimizer.max-reordered-joins

  • Type: integer
  • Default value: 9

When optimizer.join-reordering-strategy is set to cost-based, this property determines the maximum number of joins that can be reordered at once.

Warning

The number of possible join orders scales factorially with the number of relations, so increasing this value can cause serious performance issues.

hetu.query-pushdown

  • Type: boolean
  • Default value: true

Switch for controlling the push-down feature of the JDBC connector and DC connector.

optimizer.reuse-table-scan

  • Type: boolean
  • Default value: false

Use Reuse Exchange to cache data in memory if the query contains tables or Common Table Expressions(CTE) which are present more than one time with the same projections and filters on them. Enabling this feature will reduce the time taken to execute the query by caching data in memory and avoiding reading from disk multiple times. This can also be specified on a per-query basis using the reuse_table_scan session property.

Note: when cte_reuse_enabled or optimizer.cte-reuse-enabled is enabled reuse exchange will be disabled.

optimizer.cte-reuse-enabled

  • Type: boolean
  • Default value: false

Enable this flag to execute Common Table Expressions (CTE) only once irrespective of number of times same CTE is being used in the main query. This will help to improve query execution performance when same CTE is used more than once. This can also be specified on a per-query basis using the cte_reuse_enabled session property.

optimizer.sort-based-aggregation-enabled

  • Type: boolean
  • Default value: false

Sort based aggregation is used when underlying source is in pre-sorted order, this is used instead of Hash aggregation which take more footprint to build hash tables. Sort based aggregation used less memory foot print when compared to hash aggregation. Conditions when Sort based aggregation in case of Hive

    1. Grouping columns should be same or less than sorted columns and it should be in the same order.
    1. Joins case probe side table should be sorted and join criteria should be same or less than sorted columns and it should be in the same order.
    1. bucket_count is 1 bucketed_by columns should be same or less than Grouping columns and it should be in the same order.
    1. bucket_count is more than 1 bucketed_by columns should be same as Grouping columns and it should be in the same order.
    1. In case of partition table, Grouping columns should contain all partitions in same order following by the subset of sorted by columns.
    1. When distinct is used, Grouping columns followed by a distinct column should be subset of sorted by columns.

This can also be specified on a per-query basis using the sort_based_aggregation_enabled session property.

Note: This is supported only for Hive connector.

optimizer.transform-self-join-to-window

  • Type: boolean
  • Default value: true

Enable optimization to remove self join with window function using lead/lag methods.

optimizer.transform-self-join-aggregates-to-window

  • Type: boolean
  • Default value: true

Enable optimization to remove self joins with aggregations with window function using lead/lag methods.

optimizer.join-partitioned-build-min-row-count

  • Type: long
  • Default value: 1,000,000

Use gathering exchange for exchanges with less than the specified rows produced, else replace with partitioning exchange.

optimizer.use-exact-partitioning

  • Type: boolean
  • Default value: false

When enabled this forces data repartitioning unless the partitioning of upstream stage matches exactly what downstream stage expects.

cte-materialization-enabled

  • Type: boolean
  • Default value: false

Enable CTE materialization cache feature. This can also be specified on a per-query basis using the cte_materialization_enabled session property.

cte-materialization-threshold-size

  • Type: Data Size
  • Default value: 128MB

Maximum allowed size to be stored as part of cte materialization cache per CTE per query.

adaptive-partial-aggregation.enabled

  • Type: boolean
  • Default value: true

Enable adaptive partial aggregation feature.

adaptive-partial-aggregation.min-rows

  • Type: long
  • Default value: 100,000

Minimum number of processed rows before partial aggregation might be adaptively turned off.

adaptive-partial-aggregation.unique-rows-ratio-threshold

  • Type: double
  • Default value: 0.8

Ratio between aggregation output and input rows above which partial aggregation might be adaptively turned off.

optimizer.join-multi-clause-independence-factor

  • Type: double
  • Default value: 0.25

Scales the strength of independence assumption for selectivity estimates of multi-clause joins.

optimizer.filter-conjunction-independence-factor

  • Type: double
  • Default value: 0.75

Scales the strength of independence assumption for selectivity estimates of the conjunction of multiple filters.

Regular Expression Function Properties

The following properties allow tuning the regexp.

regex-library

  • Type: string
  • Allowed values: JONI, RE2J
  • Default value: JONI

Which library to use for regular expression functions. JONI is generally faster for common usage, but can require exponential time for certain expression patterns. RE2J uses a different algorithm which guarantees linear time, but is often slower.

re2j.dfa-states-limit

  • Type: integer
  • Minimum value: 2
  • Default value: 2147483647

The maximum number of states to use when RE2J builds the fast but potentially memory intensive deterministic finite automaton (DFA) for regular expression matching. If the limit is reached, RE2J will fall back to the algorithm that uses the slower, but less memory intensive non-deterministic finite automaton (NFA). Decreasing this value decreases the maximum memory footprint of a regular expression search at the cost of speed.

re2j.dfa-retries

  • Type: integer
  • Minimum value: 0
  • Default value: 5

The number of times that RE2J will retry the DFA algorithm when it reaches a states limit before using the slower, but less memory intensive NFA algorithm for all future inputs for that search. If hitting the limit for a given input row is likely to be an outlier, you want to be able to process subsequent rows using the faster DFA algorithm. If you are likely to hit the limit on matches for subsequent rows as well, you want to use the correct algorithm from the beginning so as not to waste time and resources. The more rows you are processing, the larger this value should be.

Heuristic Index Properties

Heuristic index is external index module that which can be used to filter to out rows at the connector level. Bitmap, Bloom, MinMaxIndex are list of indexes provided by openLooKeng. As of now, bitmap index supports supports hive connector for tables with ORC storage format.

hetu.heuristicindex.filter.enabled

  • Type: boolean
  • Default value: false

This property enables heuristic index. There is also a session property heuristicindex_filter_enabled which can be set per session. Note: the session property should ONLY be used to turn on and off index filtering temporarily when this global property in config file is set as true. The session property CANNOT be used to turn on index filter when it’s not enabled globally.

hetu.heuristicindex.filter.cache.max-memory

  • Type: data size
  • Default value: 10GB

Caching the index files provides better performance, index files are read only and modified very rarely. Caching saves time spent on reading the files from indexstore. This property controls the maximum memory used by the index cache. When limit exceeded, existing entries will be removed from cache based on LRU and new entry will be added to cache.

hetu.heuristicindex.filter.cache.soft-reference

  • Type: boolean
  • Default value: true

Caching the index files provides better performance, however it utilizes memory. Enabling this property allows the Garbage Collector to remove entries from the cache if memory is running low.

Be aware that this is an experimental feature and should be used with care.

hetu.heuristicindex.filter.cache.ttl

  • Type: Duration
  • Default value: 24h

The time period after which index cache expires.

hetu.heuristicindex.filter.cache.loading-threads

  • Type: integer
  • Default value: 10

The number of threads used to load indices in parallel.

hetu.heuristicindex.filter.cache.loading-delay

  • Type: Duration
  • Default value: 10s

The delay to wait before async loading task starts to load index cache from indexstore.

hetu.heuristicindex.filter.cache.preload-indices

  • Typestring
  • Default value: ``

Preload the specified indices (comma-separated) when the server starts. Put ALL to load all indices.

hetu.heuristicindex.indexstore.uri

  • Type: string
  • Default value: /opt/hetu/indices/

Directory under which all index files are stored. Each index will be stored in its own subdirectory.

hetu.heuristicindex.indexstore.filesystem.profile

  • Type string

This property defines the filesystem profile used to read and write index. The corresponding profile must exist in etc/filesystem. For example, if this property is set as hetu.heuristicindex.filter.indexstore.filesystem.profile=index-hdfs1, a profile describing this filesystem access index-hdfs1.properties must be created in etc/filesystem with necessary information including authentication type, config, and keytabs (if applicable).

LOCAL filesystem type should only be used during testing or in single node clusters.

HDFS filesystem type should be used in production in order for the index to be accessible by all nodes in the cluster. All nodes should be configured to use the same filesystem profile.

Execution Plan Cache Properties

Execution plan cache feature allows the coordinator to reuse execution plans between identical queries, instead of constructing another execution plan, thus reducing the amount of query pre-processing required.

hetu.executionplan.cache.enabled

  • Type: boolean
  • Default value: false

Enable or disable execution plan cache. Disabled by default.

hetu.executionplan.cache.limit

  • Type: integer
  • Default value: 10000

Maximum number of execution plans to keep in the cache

hetu.executionplan.cache.timeout

  • Type: integer
  • Default value: 86400000 ms

Time in milliseconds to expire cached execution plans after the last access

hetu.execution.cte-materialization.enabled

  • Type: boolean
  • Default value: false

Enable caching materialization of Common Table Execution(CTE) thereby, by-passing frequently executed sub-plans.

hetu.execution.cte-materialization.max-size

  • Type: long
  • Default value: 2147483648

Maximum size of the total data stored in CTE materialization storage cache.

hetu.execution.cte-materialization.schema-name

  • Type: string
  • Default value: cache

Schema which shall include the materialized cache tables.

hetu.execution.cte-materialization.connector-name

  • Type: string
  • Default value: hive

Catalog name which shall include the materialized cache tables.

SplitCacheMap Properties

SplitCacheMap must be enabled to support caching row data. When enabled, the coordinator stores table, partition and split scheduling metadata that helps with cache affinity scheduling.

hetu.split-cache-map.enabled

  • Type: boolean
  • Default value: false

This property enables split caching functionality. If state store is enabled, the split cache map configuration is automatically replicated in state store as well. In case of HA setup with multiple coordinators, the state store is used to share split cache map between the coordinators.

hetu.split-cache-map.state-update-interval

  • Type: integer
  • Default value: 2 seconds

This property controls how frequently the split cache map is updated in state store. It is primarily applicable for HA deployment.

Auto-Vacuum

Auto-Vacuum enables the system to automatically manage vacuum jobs by constantly monitoring the tables which needs vacuum in order to maintain optimal performance. Engine gets the tables from data sources that are eligible for vacuum and trigger vacuum operation for those tables.

auto-vacuum.enabled

  • Type: boolean
  • Default value: false

This is to enable auto-vacuum functionality.

Note: This should be configured only in coordinator.

auto-vacuum.scan.interval

  • Type: Duration
  • Default value: 10m

It is scheduled Interval to get vacuum table info from the data sources and trigger vacuum for those tables. Timer is started when the server starts and it will keep on scheduling for configured interval. Min value is 15s and Max value is 24h.

Note: This should be configured only in coordinator.

auto-vacuum.scan.threads

  • Type: integer
  • Default value: 3

Number of threads used for Auto vacuum functionality. Min value is 1 and Max value is 16.

Note: This should be configured only in coordinator.

CTE (Common Table Expression) Properties

cte.cte-max-queue-size

  • Type: int
  • Default value: 1024

The maximum number of pages per processing queue. The number of processing queues is equal to the number of the CTE references in the main query. This can also be specified on a per-query basis using the cte_max_queue_size session property.

cte.cte-max-prefetch-queue-size

  • Type: int
  • Default value: 512

The maximum number of pages which can be held by the prefetch queue when processing queue is full. Prefetch queues are used to eagerly fetch the data so that query execution need not wait on I/O. This can also be specified on a per-query basis using the cte_max_prefetch_queue_size session property.

Note: This should be configured in all workers.

Sort Base aggregation Properties

sort.prcnt-drivers-for-partial-aggr

  • Type: int
  • Default value: 5

In Sort based aggregation percentage of number of drivers that are used for unfinalized/partial values. This can also be specified on a per-query basis using the prcnt_drivers_for_partial_aggr session property.

Note: This should be configured on all nodes .

Query Manager

query.remote-task.max-error-duration

  • Type: duration
  • Default value: 5m

The maximum time coordinator waits for remote-task related error to be resolved before it’s considered a failure.

Note: For snapshot recovery query.remote-task.max-error-duration should be greater than exchange.max-error-duration.

query.execution-policy

  • Type: string
  • Default value: all-at-once

Specifies the execution policy enforced by the scheduler. One of following set of execution policies can be configured: -

  1. all-at-once: This policy makes available all stages for scheduler to process and start.
  2. phased: This policy follows the stage dependency based on the producer source, and schedule all independent stages together.
  3. prioritize-utilization: This policy follows the stage dependency in addition to producer source, it also looks at dynamic filters producers for dependent paths.

Query Recovery

recovery_enabled

  • Type: boolean
  • Default value: false

This session property is used to enable or disable the recovery framework, which enables to restart/resume the query in case of failure.

snapshot_enabled

  • Type: boolean
  • Default value: false

This session property is enabled to capture snapshots during query execution, when recovery framework is enabled. Without recovery framework enabled this flag has no significance

hetu.experimental.snapshot.profile

  • Type: string

This property defines the filesystem profile used to stored snapshots. The corresponding profile must exist in etc/filesystem. For example, if this property is set as hetu.experimental.snapshot.profile=snapshot-hdfs1, a profile describing this filesystem snapshot-hdfs1.properties must be created in etc/filesystem with necessary information including authentication type, config, and keytabs (if applicable). Please refer to the filesystem section for details.

This property is required if any query is executed with distributed snapshot turned on. It must be included in configuration files for all coordinators and all workers. The specified file system must be accessible by all workers, and they must be able to read from and write to the /tmp/hetu/snapshot folder in the specified file system.

This is an experimental property. In the future it may be allowed to store snapshots in non-file-system locations, e.g. in a connector.

hetu.recovery.maxRetries

  • Type: int
  • Default value: 10

This property defines the maximum number of error recovery attempts for a query. When the limit is reached, the query fails.

This can also be specified on a per-query basis using the recovery_max_retries session property.

hetu.recovery.retryTimeout

  • Type: duration
  • Default value: 10m (10 minutes)

This property defines the maximum amount of time for the system to wait until all tasks are successfully restored. If any task is not ready within this timeout, then the recovery attempt is considered a failure, and the query will try to resume from an earlier snapshot if available.

This can also be specified on a per-query basis using the recovery_retry_timeout session property.

hetu.snapshot.useKryoSerialization

  • Type: boolean
  • Default value: false

Enables Kryo based serialization for snapshot, instead of default java serializer.

experimental.eliminate-duplicate-spill-files

  • Type: boolean
  • Default value: false

Enables elimination of duplicate spill files storage as part of snapshot capture.

HTTP Client Configurations

http.client.idle-timeout

  • Type: duration
  • Default value: 30s (30 seconds)

This property defines the time for which a given http client shall stay connected without any operations performed over it. After the specified time elapse with no activity, then the client connection is closed and related resources are released.

(Note: this parameter should be configured with higher time when in high load environment)

http.client.request-timeout

  • Type: duration
  • Default value: 10s (10 seconds)

This property defines the time threshold for a given http client for which response should be received. After the configured time elapsed and no response received, then client connection consider that to be failure in submission of request.

(Note: this parameter should be configured with higher time when in high load environment)

Connector Properties configuration

case-insensitive-name-matching

  • Type: boolean
  • Default value: false

Case-insensitive matching between database and collection names. The default is case sensitive.

Fault Tolerant Execution

exchange-manager.name

  • Type: string
  • Default value: filesystem

The name of task snapshot manager.

exchange-filesystem-type

  • Type: string
  • Default value: exchange

The file system client name used by the task snapshot. If exchange-filesystem-type=exchange, the client named exchange will be used to store the Task snapshot during query execution. The configuration file path for file system clients is etc/filesystem/exchange.properties (the properties file name needs to be consistent with exchange-filesystem-type), see Exchange File System Client.

exchange.base-directories

  • Type: URI

The initial file directory which is utilized to store the snapshots of tasks. If there is more than one root directory, commas are used to separate them.

exchange.compression-enabled

  • Type: boolean
  • Default value: false

The property enables the compression feature of task snapshot.

exchange.max-page-storage-size

  • Type: data size
  • Default value: 16MB

Max storage size of a page written to a sink, including the page itself and its size represented by an int

exchange.sink-buffer-pool-min-size

  • Type: int
  • Default value: 10

The minimum buffer pool size for an exchange sink. The larger the buffer pool size, the larger the write parallelism and memory usage.

exchange.sink-buffers-per-partition

  • Type: int
  • Minimum value: 2
  • Default value: 2

The number of buffers per partition in the buffer pool. The larger the buffer pool size, the larger the write parallelism and memory usage.

exchange.sink-max-file-size

  • Type: data size
  • Default value: 1GB

Max size of files written by exchange sinks.

exchange.source-concurrent-readers

  • Type: int
  • Default value: 4

Number of concurrent readers to read from spooling storage. The larger the number of concurrent readers, the larger the read parallelism and memory usage.

exchange.max-output-partition-count

  • Type: int
  • Minimum value: 1
  • Default value: 50

Max number of distinct partitions to be created by the exchange.

exchange.file-listing-parallelism

  • Type: int
  • Default value: 50

Max parallelism of file listing calls when enumerating spooling files

retry-policy

  • Type: string
  • Allowed values: NONE, TASK
  • Default value: NONE

Configures what is retried in the event of failure, TASK to retry tasks individually if they fail.

task-retry-attempts-overall

  • Type: int
  • Default value: null(no limit)

Maximum number retries across all tasks within a given query before declaring the query as failed.

task-retry-attempts-per-task

  • Type: int
  • Minimum value: 0
  • Default value: 4

Maximum number of times openLooKeng may attempt to retry a single task before declaring the query as failed.

retry-initial-delay

  • Type: duration
  • Default value: 10s

Minimum time that a failed task must wait before it is retried. May be overridden with the retry_initial_delay session property.

retry-max-delay

  • Type: duration
  • Default value: 1m

Maximum time that a failed task must wait before it is retried. Wait time is increased on each subsequent failure. May be overridden with the retry_max_delay session property.

retry-delay-scale-factor

  • Type: double
  • Default value: 2.0

Factor by which retry delay is increased on each task failure. May be overridden with the retry_delay_scale_factor session property.

max-tasks-waiting-for-node-per-stage

  • Type: int
  • Minimum value: 1
  • Default value: 5

Allow for up to configured number of tasks to wait for node allocation per stage, before pausing scheduling for other tasks from this stage.

fault-tolerant-execution-target-task-input-size

  • Type: data size
  • Default value: 4GB

Target size in bytes of all task inputs for a single fault-tolerant task. Applies to tasks that read input from spooled data written by other tasks. May be overridden for the current session with the fault_tolerant_execution_target_task_input_size session property.

fault-tolerant-execution-target-task-split-count

  • Type: int
  • Minimum value: 1
  • Default value: 64

Target number of standard splits processed by a single task that reads data from source tables. Value is interpreted with split weight taken into account. If the weight of splits produced by a catalog denotes that they are lighter or heavier than “standard” split, then the number of splits processed by single task is adjusted accordingly. May be overridden for the current session with the fault_tolerant_execution_target_task_split_count session property.

fault-tolerant-execution-preserve-input-partitions-in-write-stage

  • Type: boolean
  • Default value: true

Ensure single task reads single hash partitioned input partition for stages which write table data

fault-tolerant-execution-min-task-split-count

  • Type: int
  • Minimum value: 1
  • Default value: 16

Minimum number of splits processed by a single task. This value is not split weight-adjusted and serves as protection against situations where catalogs report an incorrect split weight. May be overridden for the current session with the fault_tolerant_execution_min_task_split_count session property.

fault-tolerant-execution-max-task-split-count

  • Type: int
  • Minimum value: 1
  • Default value: 256

Maximum number of splits processed by a single task. This value is not split weight-adjusted and serves as protection against situations where catalogs report an incorrect split weight. May be overridden for the current session with the fault_tolerant_execution_max_task_split_count session property.

fault-tolerant-execution-partition-count

  • Type: int
  • Minimum value: 1
  • Default value: 50

Number of partitions to use for distributed joins and aggregations. May be overridden for the current session with the fault_tolerant_execution_partition_count session property.

fault-tolerant-execution-task-descriptor-storage-max-memory

  • Type: data size
  • Default value: (JVM heap size * 0.15)

Maximum amount of memory to be used to store task descriptors for fault tolerant queries on coordinator. Extra memory is needed to be able to reschedule tasks in case of a failure.

node-scheduler.allowed-no-matching-node-period

  • Type: duration
  • Default value: 2m

Wait time for scheduler before failing a query for which hard task requirements cannot be satisfied.

node-scheduler.allocator-type

  • Type: string
  • Allowed values: BIN_PACKING, FIXED_COUNT
  • Default value: BIN_PACKING

Configures node allocator type.

Exchange File System Client

fs.client.type

  • Type: string
  • Allowed values: local, hdfs
  • Default value: local

The type of file system client is configured by the property. if fs.client.type=local, recover framework uses local file system client, else if fs.client.type=hdfs recover framework uses hdfs file system client.

hdfs.config.resources

  • Type: string
  • Default value: etc/filesystem/core-site.xml,etc/filesystem/hdfs-site.xml

The URI of resource configuration file that the hdfs file system depends on.

(Note that this is a configuration property required by the hdfs file system, and when there is more than one resource configuration file, it is separated by ‘,’.)

hdfs.authenticcation.type

  • Type: string
  • Default value: NONE

The property specifies the authentication type for the hdfs file system.

(Note that this is a configuration property required by the hdfs file system.)