bulk_indexing #
Description #
The bulk_indexing processor is used to asynchronously consume bulk requests in queues.
Configuration Example #
A simple example is as follows:
pipeline:
- name: bulk_request_ingest
auto_start: true
keep_running: true
processor:
- bulk_indexing:
queue_selector.labels:
type: bulk_reshuffle
level: cluster
Parameter Description #
Name | Type | Description |
---|---|---|
elasticsearch | string | The default Elasticsearch cluster ID, which will be used if elasticsearch is not specified in the queue Labels |
idle_timeout_in_seconds | int | Timeout duration of the consumption queue, which is set to 1 by default. |
max_connection_per_node | int | Maximum number of connections allowed by the target node. The default value is 1 . |
max_worker_size | int | The maximum size of workers allowed to run at the same time, default 10 |
bulk.batch_size_in_kb | int | Size of a bulk request, in KB . |
bulk.batch_size_in_mb | int | Size of a bulk request, in MB . |
bulk.batch_size_in_docs | int | Num of docs in bulk request, default 1000 |
bulk.compress | bool | Whether to enable request compression. |
bulk.retry_delay_in_seconds | int | Waiting time for request retry, default 1 . |
bulk.reject_retry_delay_in_seconds | int | Waiting time for request rejection, default 1 . |
bulk.max_retry_times | int | Maximum retry count. |
bulk.invalid_queue | string | Queue for storing requests, for which 4xx is returned because of invalid requests. |
bulk.dead_letter_queue | string | Request queue, for which the maximum retry count is exceeded. |
bulk.remove_duplicated_newlines | bool | Whether to remove duplicated newlines in bulk requests |
queue_selector.labels | map | A group of queues filtered by label, in which data needs to be consumed. alias queues |
queue_selector.ids | array | Specifies the UUID of the queue to consume, an array of string |
queue_selector.keys | array | Specifies the unique Key path of the queue to consume, string array |
queues | map | A group of queues filtered by label, equals to queue_selector.labels |
waiting_after | array | Whether to wait for the specified queue to finish consumption before starting consumption, UUID of the queue, string array |
detect_active_queue | bool | Whether to automatically detect new queues that meet the conditions, default true |
detect_interval | bool | The time interval for automatically detecting new queues that meet the conditions, in milliseconds, default 5000 |
num_of_slices | int | Threads consuming a single queue in parallel, maximum slice size at runtime |
slices | array | Allowed slice numbers, int array |
skip_info_missing | bool | Whether to ignore queue data consumption when conditions are not met, for example, the node, index, or shard information does not exist, that is, whether to consume queue data after information is obtained. The default value is false . Otherwise, one Elasticsearch node is selected to send requests. |
skip_empty_queue | bool | Whether to skip consumption of empty queue, default true |
consumer.source | string | consumer source |
consumer.id | string | consumer UUID |
consumer.name | string | consumer name |
consumer.group | string | consumer group name |
consumer.fetch_min_bytes | int | Minimum size in bytes to pull messages, default 1 |
consumer.fetch_max_bytes | int | The maximum byte size of the pull message, the default is 10485760 , which is 10MB |
consumer.fetch_max_messages | int | Pull the maximum number of messages, default 1 |
consumer.fetch_max_wait_ms | int | Pull maximum waiting time, in milliseconds, default 10000 |
consumer.eof_retry_delay_in_ms | int | Retry interval when hit EOF, default 500 |
bulk.response_handle.save_success_results | bool | Whether to save success results,default false |
bulk.response_handle.output_bulk_stats | bool | Whether to save bulk stats, default false |
bulk.response_handle.include_index_stats | bool | Whether to include index stats,default true |
bulk.response_handle.include_action_stats | bool | Whether to include action stats,default true |
bulk.response_handle.save_error_results | bool | Whether to save error results,default true |
bulk.response_handle.include_error_details | bool | Whether to save dedicate request level error messages,default true |
bulk.response_handle.max_error_details_count | bool | The max count of error details,default 50 |
bulk.response_handle.save_busy_results | bool | Whether to save 429 results,default true |
bulk.response_handle.bulk_result_message_queue | string | The queue to save bulk results,default bulk_result_messages |
bulk.response_handle.max_request_body_size | int | Max size of request body before truncated,default 10240 |
bulk.response_handle.max_response_body_size | int | Max size of response body before truncated,default 10240 |
bulk.response_handle.retry_rules.retry_429 | bool | Whether to retry 429 requests,default true |
bulk.response_handle.retry_rules.retry_4xx | bool | Whether to retry 4xx (except 429) requests,default false ` | |
bulk.response_handle.retry_rules.default | bool | Whether to retry other requests not specified in retry_rules , defualt true |
bulk.response_handle.retry_rules.permitted.status | []int | Retry requests with specified status codes |
bulk.response_handle.retry_rules.permitted.keyword | []string | Retry when response contains specified keywords |
bulk.response_handle.retry_rules.denied.status | []int | Don’t retry requests with specified status codes |
bulk.response_handle.retry_rules.denied.keyword | []string | Don’t retry when response contains specified keywords |