Fluentd
1.0
Search
K

exec_filter

The out_exec_filter Buffered Output plugin 1) executes an external program using an event as input; and, 2) reads a new event from the program output.
By default, it passes tab-separated values (TSV) to the standard input and reads TSV from the standard output.
It is included in Fluentd's core.

Example Configuration

<match pattern>
@type exec_filter
command cmd arg arg
<format>
@type tsv
keys k1,k2,k3
</format>
<parse>
@type tsv
keys k1,k2,k3,k4
</parse>
<inject>
tag_key k1
time_key k2
time_format %Y-%m-%d %H:%M:%S
</inject>
</match>
Please see the Configuration File article for the basic structure and syntax of the configuration file.
When using the JSON format in <parse> section, this plugin uses the Yajl library to parse the program output. Yajl buffers data internally so the output is not always instantaneous.

Supported Modes

Plugin Helpers

Parameters

@type

The value must be exec_filter.

command

type
default
version
string
required parameter
0.14.0
The command (program) to execute. The out_exec_filter plugin passes the incoming event to the program input and receives the filtered event from the program output.

num_children

type
default
version
integer
1
0.14.0
The number of spawned processes for command.
If the number is larger than 2, fluentd uses spawned processes by round robin fashion.

child_respawn

type
default
version
string
nil
0.14.0
Respawn command when the command exits. By default, it is disabled.
If you specify a positive number, it tries to respawn until specified times. If you specify inf or -1, it tries to respawn forever.

tag

type
default
version
string
nil
0.14.0
The tag of the event.

read_block_size

type
default
version
size
10240
0.14.9
The default block size to read if parser requires partial read.

suppress_error_log_interval

type
default
version
time
0
0.14.0
Suppress error logs during this interval.
By default, all the logs are emitted.

in_format

This parameter is deprecated. Use <format> section.
The format used to map the incoming event to the program input.

out_format

This parameter is deprecated. Use <parse> section.
The format used to process the program output.

<format> Section

The format used to map the incoming events to the program input.
See Format Section Configurations for more details.

@type

type
default
version
string
tsv
0.14.9
Overwrites the default value in this plugin.

<parse> Section

The format used to process the program output.
See Parse Section Configurations for more details.

@type

type
default
version
string
tsv
0.14.9
Overwrites the default value in this plugin.

time_key

type
default
version
string
nil
0.14.9
Overwrites the default value in this plugin.

time_format

type
default
version
string
nil
0.14.9
Overwrites the default value in this plugin.

localtime

type
default
version
bool
true
0.14.9
Overwrites the default value in this plugin.

<inject> Section

See Inject Section Configurations for more details.

time_type

type
default
version
enum
float
0.14.9
Overwrites the default value in this plugin.

<extract> Section

See Extract Section Configurations for more details.

time_type

type
default
version
enum
float
0.14.9
Overwrite default value in this plugin.

<buffer> Section

See Buffer Section Configurations for more details.

flush_mode

type
default
version
enum
interval
0.14.9
Overwrites the default value in this plugin.

flush_interval

type
default
version
integer
1
0.14.9
Overwrites the default value in this plugin.

Script Example

Here is an example written in Ruby:
require 'json'
require 'msgpack'
begin
while line = STDIN.gets # continue to read a event from stdin
line.chomp!
# Input format depends on exec_filter's in_format setting
json = JSON.parse(line)
# main processing. You can do anything, mutate record, access to database and etc.
json['new_field'] = "Hey from exec_filter script!"
# Write data to stdout. Output format depends on exec_filter's out_format setting
STDOUT.print MessagePack.pack(json)
# Call flush to avoid buffering events
STDOUT.flush
end
rescue Interrupt # Ignore Interrupt exception because it happens during exec_filter shutdown
end
Corresponding configuration:
<match test.**>
@type exec_filter
command ruby /path/to/ruby_script.rb
tag filtered.exec
<format>
@type json
</format>
<parse>
@type msgpack
</parse>
<buffer>
flush_interval 10s
</buffer>
</match>
You may convert this script into your preferred language accordingly.
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.
Last modified 2yr ago