bulk_indexing

bulk_indexing #

Description #

The bulk_indexing processor is used to asynchronously consume bulk requests in queues.

Configuration Example #

A simple example is as follows:

pipeline:
- name: bulk_request_ingest
  auto_start: true
  keep_running: true
  processor:
    - bulk_indexing:
        queue_selector.labels:
          type: bulk_reshuffle
          level: cluster

Parameter Description #

NameTypeDescription
elasticsearchstringThe default Elasticsearch cluster ID, which will be used if elasticsearch is not specified in the queue Labels
idle_timeout_in_secondsintTimeout duration of the consumption queue, which is set to 1 by default.
max_connection_per_nodeintMaximum number of connections allowed by the target node. The default value is 1.
max_worker_sizeintThe maximum size of workers allowed to run at the same time, default 10
bulk.batch_size_in_kbintSize of a bulk request, in KB.
bulk.batch_size_in_mbintSize of a bulk request, in MB.
bulk.batch_size_in_docsintNum of docs in bulk request, default 1000
bulk.compressboolWhether to enable request compression.
bulk.retry_delay_in_secondsintWaiting time for request retry, default 1.
bulk.reject_retry_delay_in_secondsintWaiting time for request rejection, default 1.
bulk.max_retry_timesintMaximum retry count.
bulk.invalid_queuestringQueue for storing requests, for which 4xx is returned because of invalid requests.
bulk.dead_letter_queuestringRequest queue, for which the maximum retry count is exceeded.
bulk.remove_duplicated_newlinesboolWhether to remove duplicated newlines in bulk requests
queue_selector.labelsmapA group of queues filtered by label, in which data needs to be consumed. alias queues
queue_selector.idsarraySpecifies the UUID of the queue to consume, an array of string
queue_selector.keysarraySpecifies the unique Key path of the queue to consume, string array
queuesmapA group of queues filtered by label, equals to queue_selector.labels
waiting_afterarrayWhether to wait for the specified queue to finish consumption before starting consumption, UUID of the queue, string array
detect_active_queueboolWhether to automatically detect new queues that meet the conditions, default true
detect_intervalboolThe time interval for automatically detecting new queues that meet the conditions, in milliseconds, default 5000
num_of_slicesintThreads consuming a single queue in parallel, maximum slice size at runtime
slicesarrayAllowed slice numbers, int array
skip_info_missingboolWhether to ignore queue data consumption when conditions are not met, for example, the node, index, or shard information does not exist, that is, whether to consume queue data after information is obtained. The default value is false. Otherwise, one Elasticsearch node is selected to send requests.
skip_empty_queueboolWhether to skip consumption of empty queue, default true
consumer.sourcestringconsumer source
consumer.idstringconsumer UUID
consumer.namestringconsumer name
consumer.groupstringconsumer group name
consumer.fetch_min_bytesintMinimum size in bytes to pull messages, default 1
consumer.fetch_max_bytesintThe maximum byte size of the pull message, the default is 10485760, which is 10MB
consumer.fetch_max_messagesintPull the maximum number of messages, default 1
consumer.fetch_max_wait_msintPull maximum waiting time, in milliseconds, default 10000
consumer.eof_retry_delay_in_msintRetry interval when hit EOF, default 500
bulk.response_handle.save_success_resultsboolWhether to save success results,default false
bulk.response_handle.output_bulk_statsboolWhether to save bulk stats, default false
bulk.response_handle.include_index_statsboolWhether to include index stats,default true
bulk.response_handle.include_action_statsboolWhether to include action stats,default true
bulk.response_handle.save_error_resultsboolWhether to save error results,default true
bulk.response_handle.include_error_detailsboolWhether to save dedicate request level error messages,default true
bulk.response_handle.max_error_details_countboolThe max count of error details,default 50
bulk.response_handle.save_busy_resultsboolWhether to save 429 results,default true
bulk.response_handle.bulk_result_message_queuestringThe queue to save bulk results,default bulk_result_messages
bulk.response_handle.max_request_body_sizeintMax size of request body before truncated,default 10240
bulk.response_handle.max_response_body_sizeintMax size of response body before truncated,default 10240
bulk.response_handle.retry_rules.retry_429boolWhether to retry 429 requests,default true
bulk.response_handle.retry_rules.retry_4xxboolWhether to retry 4xx (except 429) requests,default false ` |
bulk.response_handle.retry_rules.defaultboolWhether to retry other requests not specified in retry_rules, defualt true
bulk.response_handle.retry_rules.permitted.status[]intRetry requests with specified status codes
bulk.response_handle.retry_rules.permitted.keyword[]stringRetry when response contains specified keywords
bulk.response_handle.retry_rules.denied.status[]intDon’t retry requests with specified status codes
bulk.response_handle.retry_rules.denied.keyword[]stringDon’t retry when response contains specified keywords
Edit Edit this page