控制条件 #
在摄取管道中,您可以使用可选的 if 参数来控制处理器是否运行。这允许根据传入的文档内容有条件地执行处理器。条件以 Painless 脚本的形式编写,并针对文档上下文( ctx )进行评估。
基本执行条件 #
每个处理器都可以包含一个 if 子句。如果条件评估为 true ,则处理器运行;否则,它将被跳过。
示例:删除调试级别日志 #
以下管道将删除任何 log_level 字段等于 debug 的文档:
PUT _ingest/pipeline/drop_debug_logs
{
"processors": [
{
"drop": {
"if": "ctx.log_level == 'debug'"
}
}
]
}
示例:索引请求 #
POST logs/_doc/1?pipeline=drop_debug_logs
{
"message": "User logged in",
"log_level": "debug"
}
此文档被删除,因为条件评估结果为 true
{
"_index": "logs",
"_id": "1",
"_version": -3,
"result": "noop",
"_shards": {
"total": 0,
"successful": 0,
"failed": 0
}
}
字段嵌套时的空值安全检查 #
当处理嵌套字段时,避免空指针异常很重要。在 Painless 脚本中使用空值安全 ?. 运算符。
示例:根据嵌套字段删除文档 #
以下删除处理器仅在嵌套 app.env 字段存在且等于 debug 时执行:
PUT _ingest/pipeline/drop_debug_env
{
"processors": [
{
"drop": {
"if": "ctx.app?.env == 'debug'"
}
}
]
}
如果未配置 null 安全的 ?. 运算符,索引任何不包含 app.env 字段的文档将触发以下空指针异常:
{
"error": "IngestProcessorException[ScriptException[runtime error]; nested: NullPointerException[Cannot invoke \"Object.getClass()\" because \"callArgs[0]\" is null];]",
"status": 400
}
处理扁平字段 #
如果你的文档包含扁平字段,例如, "app.env": "debug" ,请使用 dot_expander 处理器将其转换为嵌套结构:
PUT _ingest/pipeline/drop_debug_env
{
"processors": [
{
"dot_expander": {
"field": "app.env"
}
},
{
"drop": {
"if": "ctx.app?.env == 'debug'"
}
}
]
}
安全的方法调用 #
避免在可能为 null 的值上调用方法。请使用常量或 null 检查代替:
{
"drop": {
"if": "ctx.app?.env != null && ctx.app.env.contains('debug')"
}
}
完整示例:多步骤管道处理 #
以下摄取管道使用三个处理器:
set: 如果在user字段中没有提供值,则将user字段设置为guest。set: 如果提供了status_code并且它高于400,则将error字段设置为true。drop: 如果app.env字段等于debug,则删除整个文档。
PUT _ingest/pipeline/logs_processing
{
"processors": [
{
"set": {
"field": "user",
"value": "guest",
"if": "ctx.user == null"
}
},
{
"set": {
"field": "error",
"value": true,
"if": "ctx.status_code != null && ctx.status_code >= 400"
}
},
{
"drop": {
"if": "ctx.app?.env == 'debug'"
}
}
]
}
模拟管道 #
以下模拟请求将条件逻辑应用于三个文档:
POST _ingest/pipeline/logs_processing/_simulate
{
"docs": [
{
"_source": {
"message": "Successful login",
"status_code": 200
}
},
{
"_source": {
"message": "Database error",
"status_code": 500,
"user": "alice"
}
},
{
"_source": {
"message": "Debug mode trace",
"app": { "env": "debug" }
}
}
]
}
返回内容展示了处理器如何根据每个条件进行处理:
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"status_code": 200,
"message": "Successful login",
"user": "guest"
},
"_ingest": {
"timestamp": "2025-04-16T14:04:35.923159885Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"status_code": 500,
"message": "Database error",
"error": true,
"user": "alice"
},
"_ingest": {
"timestamp": "2025-04-16T14:04:35.923198551Z"
}
}
},
null
]
}