The out_exec_filter Buffered Output plugin 1) executes an external program using an event as input; and, 2) reads a new event from the program output.
By default, it passes tab-separated values (TSV) to the standard input and reads TSV from the standard output.
It is included in Fluentd's core.
Example Configuration
Please see the Configuration File article for the basic structure and syntax of the configuration file.
When using the JSON format in <parse> section, this plugin uses the Yajl library to parse the program output. Yajl buffers data internally so the output is not always instantaneous.
If the buffering by Yajl parser is problematic for you even though you expect instantaneous response, you can tweak stream_buffer_size parameter in <parse> section.
The command (program) to execute. The out_exec_filter plugin passes the incoming event to the program input and receives the filtered event from the program output.
num_children
type
default
version
integer
1
0.14.0
The number of spawned processes for command.
If the number is larger than 2, fluentd uses spawned processes by round robin fashion.
child_respawn
type
default
version
string
nil
0.14.0
Respawn command when the command exits. By default, it is disabled.
If you specify a positive number, it tries to respawn until specified times. If you specify inf or -1, it tries to respawn forever.
tag
type
default
version
string
nil
0.14.0
The tag of the event.
read_block_size
type
default
version
size
10240
0.14.9
The default block size to read if parser requires partial read.
suppress_error_log_interval
type
default
version
time
0
0.14.0
Suppress error logs during this interval.
By default, all the logs are emitted.
in_format
This parameter is deprecated. Use <format> section.
The format used to map the incoming event to the program input.
out_format
This parameter is deprecated. Use <parse> section.
The format used to process the program output.
<format> Section
The format used to map the incoming events to the program input.
require 'json'
require 'msgpack'
begin
while line = STDIN.gets # continue to read a event from stdin
line.chomp!
# Input format depends on exec_filter's in_format setting
json = JSON.parse(line)
# main processing. You can do anything, mutate record, access to database and etc.
json['new_field'] = "Hey from exec_filter script!"
# Write data to stdout. Output format depends on exec_filter's out_format setting
STDOUT.print MessagePack.pack(json)
# Call flush to avoid buffering events
STDOUT.flush
end
rescue Interrupt # Ignore Interrupt exception because it happens during exec_filter shutdown
end