exec_filter

The out_exec_filter Buffered Output plugin 1) executes an external program using an event as input; and, 2) reads a new event from the program output.

By default, it passes tab-separated values (TSV) to the standard input and reads TSV from the standard output.

It is included in Fluentd's core.

Example Configuration

<match pattern>
  @type exec_filter
  command cmd arg arg
  <format>
    @type tsv
    keys k1,k2,k3
  </format>
  <parse>
    @type tsv
    keys k1,k2,k3,k4
  </parse>
  <inject>
    tag_key k1
    time_key k2
    time_format %Y-%m-%d %H:%M:%S
  </inject>
</match>

Please see the Configuration File article for the basic structure and syntax of the configuration file.

When using the JSON format in <parse> section, this plugin uses the Yajl library to parse the program output. Yajl buffers data internally so the output is not always instantaneous.

Supported Modes

Plugin Helpers

Parameters

@type

The value must be exec_filter.

command

type

default

version

string

required parameter

0.14.0

The command (program) to execute. The out_exec_filter plugin passes the incoming event to the program input and receives the filtered event from the program output.

num_children

type

default

version

integer

1

0.14.0

The number of spawned processes for command.

If the number is larger than 2, fluentd uses spawned processes by round robin fashion.

child_respawn

type

default

version

string

nil

0.14.0

Respawn command when the command exits. By default, it is disabled.

If you specify a positive number, it tries to respawn until specified times. If you specify inf or -1, it tries to respawn forever.

tag

type

default

version

string

nil

0.14.0

The tag of the event.

read_block_size

type

default

version

size

10240

0.14.9

The default block size to read if parser requires partial read.

suppress_error_log_interval

type

default

version

time

0

0.14.0

Suppress error logs during this interval.

By default, all the logs are emitted.

in_format

This parameter is deprecated. Use <format> section.

The format used to map the incoming event to the program input.

out_format

This parameter is deprecated. Use <parse> section.

The format used to process the program output.

<format> Section

The format used to map the incoming events to the program input.

See Format Section Configurations for more details.

@type

type

default

version

string

tsv

0.14.9

Overwrites the default value in this plugin.

<parse> Section

The format used to process the program output.

See Parse Section Configurations for more details.

@type

type

default

version

string

tsv

0.14.9

Overwrites the default value in this plugin.

time_key

type

default

version

string

nil

0.14.9

Overwrites the default value in this plugin.

time_format

type

default

version

string

nil

0.14.9

Overwrites the default value in this plugin.

localtime

type

default

version

bool

true

0.14.9

Overwrites the default value in this plugin.

<inject> Section

See Inject Section Configurations for more details.

time_type

type

default

version

enum

float

0.14.9

Overwrites the default value in this plugin.

<extract> Section

See Extract Section Configurations for more details.

time_type

type

default

version

enum

float

0.14.9

Overwrite default value in this plugin.

<buffer> Section

See Buffer Section Configurations for more details.

flush_mode

type

default

version

enum

interval

0.14.9

Overwrites the default value in this plugin.

flush_interval

type

default

version

integer

1

0.14.9

Overwrites the default value in this plugin.

Script Example

Here is an example written in Ruby:

require 'json'
require 'msgpack'

begin
  while line = STDIN.gets # continue to read a event from stdin
    line.chomp!

    # Input format depends on exec_filter's in_format setting
    json = JSON.parse(line)

    # main processing. You can do anything, mutate record, access to database and etc.
    json['new_field'] = "Hey from exec_filter script!"

    # Write data to stdout. Output format depends on exec_filter's out_format setting
    STDOUT.print MessagePack.pack(json)

    # Call flush to avoid buffering events
    STDOUT.flush
  end
rescue Interrupt # Ignore Interrupt exception because it happens during exec_filter shutdown
end

Corresponding configuration:

<match test.**>
  @type exec_filter
  command ruby /path/to/ruby_script.rb
  tag filtered.exec
  <format>
    @type json
  </format>
  <parse>
    @type msgpack
  </parse>
  <buffer>
    flush_interval 10s
  </buffer>
</match>

You may convert this script into your preferred language accordingly.

If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.

Last updated