# Plugin Development

## Installing custom plugins

To install a plugin, please put the ruby script in the `/etc/fluent/plugin` directory.

Alternatively, you can create a Ruby Gem package that includes a `lib/fluent/plugin/<TYPE>_<NAME>.rb` file. The *TYPE* is:

* `in` for input plugins
* `out` for output plugins
* `filter` for filter plugins
* `buf` for buffer plugins
* `parser` for parser plugins
* `formatter` for formatter plugins

For example, an email Output plugin would have the path: `lib/fluent/plugin/out_mail.rb`. The packaged gem can be distributed and installed using RubyGems. For further information, please see the [list of Fluentd plugins](http://www.fluentd.org/plugins) for third-party plugins.

## Overview

The following slides can help the user understand how Fluentd works before they dive into writing their own plugins.

(The slides are taken from [Naotoshi Seo's](https://github.com/fluent/fluentd-docs-gitbook/tree/507e377b7e8e78a312dc49e76bd9a302c33fd058/github.com/sonots/README.md) [RubyKaigi 2014 talk](https://github.com/fluent/fluentd-docs-gitbook/tree/507e377b7e8e78a312dc49e76bd9a302c33fd058/rubykaigi.org/2014/presentation/S-NaotoshiSeo/README.md).)

### Fluentd version and Plugin API

Fluentd now has two active versions, v0.12 and v1.0. v0.12 is old stable and many people still use this version. v1.0 is current stable version and this version has brand-new Plugin API.

The important point is plugin for v0.12 works with v0.12 and v1.0, but new v1.0 API based Plugin doesn't work with v0.12.

This document based on v0.12 Plugin API.

### Send a patch or fork?

If you have a problem with existing plugins or new feature idea, sending a patch is better. If the plugin author is non-active, try to become new plugin maintainer first. Forking a plugin and release alternative plugin, e.g. fluent-plugin-xxx-alt, is final approach.

## Plugin versioning policy

If you don't have a your policy, following semantic versioning is better.

[Semantic Versioning 2.0.0 - Semantic Versioning](http://semver.org/)

The important thing is if your plugin breaks the backward compatibiliy, update major version to avoid user troubles. For example, new v0.14 API based plugin doesn't work with fluentd v0.12. So if you release your existing plugin with new v0.14 API, please update major version, `0.5.0` -> `1.0.0`, `1.2.0` -> `2.0.0`.

## Writing Input Plugins

Extend the **Fluent::Input** class and implement the following methods. In `initialize`, `configure` and `start`, `super` should be called to call Input plugin default behaviour.

```
require 'fluent/input'

module Fluent
  class SomeInput < Input
    # First, register the plugin. NAME is the name of this plugin
    # and identifies the plugin in the configuration file.
    Fluent::Plugin.register_input('NAME', self)

    # config_param defines a parameter. You can refer a parameter via @port instance variable
    # :default means this parameter is optional
    config_param :port, :integer, :default => 8888

    # This method is called before starting.
    # 'conf' is a Hash that includes configuration parameters.
    # If the configuration is invalid, raise Fluent::ConfigError.
    def configure(conf)
      super

      # You can also refer to raw parameter via conf[name].
      @port = conf['port']
      ...
    end

    # This method is called when starting.
    # Open sockets or files and create a thread here.
    def start
      super
      ...
    end

    # This method is called when shutting down.
    # Shutdown the thread and close sockets or files here.
    def shutdown
      super
      ...
    end
  end
end
```

To submit events, use the `router.emit(tag, time, record)` method, where `tag` is the String, `time` is the UNIX time integer and `record` is a Hash object.

```
tag = "myapp.access"
time = Engine.now
record = {"message"=>"body"}
router.emit(tag, time, record)
```

To submit multiple events in one call, use the `router.emit_stream(tag, es)` and `MultiEventStream` combo instead.

```
es = MultiEventStream.new
records.each { |record|
  es.add(time, record)
}
router.emit_stream(tag, es)
```

### Record format

Fluentd plugins assume the record is a JSON so the key should be the String, not Symbol. If you emit a symbol keyed record, it may cause a problem.

```
router.emit(tag, time, {'foo' => 'bar'})  # OK!
router.emit(tag, time, {:foo => 'bar'})   # NG!
```

## Writing Buffered Output Plugins

Extend the **Fluent::BufferedOutput** class and implement the following methods. In `initialize`, `configure` and `start`, `super` should be called to call BufferedOutput plugin default behaviour.

```
require 'fluent/output'

module Fluent
  class SomeOutput < BufferedOutput
    # First, register the plugin. NAME is the name of this plugin
    # and identifies the plugin in the configuration file.
    Fluent::Plugin.register_output('NAME', self)

    # config_param defines a parameter. You can refer a parameter via @path instance variable
    # Without :default, a parameter is required.
    config_param :path, :string

    # This method is called before starting.
    # 'conf' is a Hash that includes configuration parameters.
    # If the configuration is invalid, raise Fluent::ConfigError.
    def configure(conf)
      super

      # You can also refer raw parameter via conf[name].
      @path = conf['path']
      ...
    end

    # This method is called when starting.
    # Open sockets or files here.
    def start
      super
      ...
    end

    # This method is called when shutting down.
    # Shutdown the thread and close sockets or files here.
    def shutdown
      super
      ...
    end

    # This method is called when an event reaches to Fluentd.
    # Convert the event to a raw string.
    def format(tag, time, record)
      [tag, time, record].to_json + "\n"
      ## Alternatively, use msgpack to serialize the object.
      # [tag, time, record].to_msgpack
    end

    # This method is called every flush interval. Write the buffer chunk
    # to files or databases here.
    # 'chunk' is a buffer chunk that includes multiple formatted
    # events. You can use 'data = chunk.read' to get all events and
    # 'chunk.open {|io| ... }' to get IO objects.
    #
    # NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins.
    def write(chunk)
      data = chunk.read
      print data
    end

    ## Optionally, you can use chunk.msgpack_each to deserialize objects.
    #def write(chunk)
    #  chunk.msgpack_each {|(tag,time,record)|
    #  }
    #end
  end
end
```

## Writing Time Sliced Output Plugins

Time Sliced Output plugins are extended versions of buffered output plugins. One example of a time sliced output is the `out_file` plugin.

Note that Time Sliced Output plugins use file buffer by default. Thus the `buffer_path` option is required.

To implement a Time Sliced Output plugin, extend the **Fluent::TimeSlicedOutput** class and implement the following methods.

```
require 'fluent/output'

module Fluent
  class SomeOutput < TimeSlicedOutput
    # configure(conf), start(), shutdown() and format(tag, time, record) are
    # the same as BufferedOutput.
    ...

    # You can use 'chunk.key' to get sliced time. The format of 'chunk.key'
    # can be configured by the 'time_format' option. The default format is %Y%m%d.
    def write(chunk)
      day = chunk.key
      ...
    end
  end
end
```

## Writing Non-buffered Output Plugins

Extend the **Fluent::Output** class and implement the following methods. **Output** plugin is often used for implementing filter like plugin. In `initialize`, `configure` and `start`, `super` should be called to call non-buffered Output plugin default behaviour.

```
require 'fluent/output'

module Fluent
  class SomeOutput < Output
    # First, register the plugin. NAME is the name of this plugin
    # and identifies the plugin in the configuration file.
    Fluent::Plugin.register_output('NAME', self)

    # This method is called before starting.
    def configure(conf)
      super
      ...
    end

    # This method is called when starting.
    def start
      super
      ...
    end

    # This method is called when shutting down.
    def shutdown
      super
      ...
    end

    # This method is called when an event reaches Fluentd.
    # 'es' is a Fluent::EventStream object that includes multiple events.
    # You can use 'es.each {|time,record| ... }' to retrieve events.
    # 'chain' is an object that manages transactions. Call 'chain.next' at
    # appropriate points and rollback if it raises an exception.
    #
    # NOTE! This method is called by Fluentd's main thread so you should not write slow routine here. It causes Fluentd's performance degression.
    def emit(tag, es, chain)
      chain.next
      es.each {|time,record|
        log.info "OK!"
      }
    end
  end
end
```

## Filter Plugins

This section shows how to write custom filters in addition to [the core filter plugins](https://docs.fluentd.org/0.12/developer/broken-reference). The plugin files whose names start with "filter\_" are registered as filter plugins.

Here is the implementation of the most basic filter that passes through all events as-is:

```
require 'fluent/filter'

module Fluent
  class PassThruFilter < Filter
    # Register this filter as "passthru"
    Fluent::Plugin.register_filter('passthru', self)

    # config_param works like other plugins

    def configure(conf)
      super
      # do the usual configuration here
    end

    def start
      super
      # This is the first method to be called when it starts running
      # Use it to allocate resources, etc.
    end

    def shutdown
      super
      # This method is called when Fluentd is shutting down.
      # Use it to free up resources, etc.
    end

    def filter(tag, time, record)
      # This method implements the filtering logic for individual filters
      # It is internal to this class and called by filter_stream unless
      # the user overrides filter_stream.
      #
      # Since our example is a pass-thru filter, it does nothing and just
      # returns the record as-is.
      # If returns nil, that records are ignored.
      record
    end
  end
end
```

In `initialize`, `configure`, `start` and `shutdown`, `super` should be called to call Filter plugin default behaviour.

See [Writing Input plugins](#writing-input-plugins) section for the details of `tag`, `time` and `record`.

### filter\_stream method

Almost plugins could be implemented by overriding `filter` method. But if you want to mutate the event stream itself, you can override `filter_stream` method.

Here is the default implementation of `filter_stream`.

```
def filter_stream(tag, es)
  new_es = MultiEventStream.new
  es.each { |time, record|
    begin
      filtered_record = filter(tag, time, record)
      new_es.add(time, filtered_record) if filtered_record
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  }
  new_es
end
```

`filter_stream` should return [EventStream](https://github.com/fluent/fluentd/blob/master/lib/fluent/event.rb) object.

## Parser Plugins

Fluentd supports [pluggable, customizable formats for input plugins](https://github.com/fluent/fluentd-docs-gitbook/tree/507e377b7e8e78a312dc49e76bd9a302c33fd058/developer/parser-plugin-overview/README.md). The plugin files whose names start with "parser\_" are registered as Parser Plugins.

Here is an example of a custom parser that parses the following newline-delimited log format:

```
<timestamp><SPACE>key1=value1<DELIMITER>key2=value2<DELIMITER>key3=value...
```

e.g., something like this

```
2014-04-01T00:00:00 name=jake age=100 action=debugging
```

While it is not hard to write a regular expression to match this format, it is tricky to extract and save key names.

Here is the code to parse this custom format (let's call it `time_key_value`). It takes one optional parameter called `delimiter`, which is the delimiter for key-value pairs. It also takes `time_format` to parse the time string. In `initialize`, `configure` and `start`, `super` should be called to call Parser plugin default behaviour.

```
require 'fluent/parser'

module Fluent
  class TextParser
    class TimeKeyValueParser < Parser
      # Register this parser as "time_key_value"
      Plugin.register_parser("time_key_value", self)

      config_param :delimiter, :string, :default => " " # delimiter is configurable with " " as default
      config_param :time_format, :string, :default => nil # time_format is configurable

      # This method is called after config_params have read configuration parameters
      def configure(conf)
        super

        if @delimiter.length != 1
          raise ConfigError, "delimiter must be a single character. #{@delimiter} is not."
        end

        # TimeParser class is already given. It takes a single argument as the time format
        # to parse the time string with.
        @time_parser = TimeParser.new(@time_format)
      end

      # This is the main method. The input "text" is the unit of data to be parsed.
      # If this is the in_tail plugin, it would be a line. If this is for in_syslog,
      # it is a single syslog message.
      def parse(text)
        time, key_values = text.split(" ", 2)
        time = @time_parser.parse(time)
        record = {}
        key_values.split(@delimiter).each { |kv|
          k, v = kv.split("=", 2)
          record[k] = v
        }
        yield time, record
      end
    end
  end
end
```

Then, save this code in `parser_time_key_value.rb` in a loadable plugin path. Then, if in\_tail is configured as

```
# Other lines...
<source>
  @type tail
  path /path/to/input/file
  format time_key_value
</source>
```

Then, the log line like `2014-01-01T00:00:00 k=v a=b` is parsed as `2013-01-01 00:00:00 +0000 test: {"k":"v","a":"b"}`.

### Parser API

Current `Parser#parse` API is called with block. We will remove `Parser#parse` with return value API since v0.14 or later.

```
# OK
parser.parse(text) { |time, record| ... }
# NG. This API will be removed
time, record = parser.parse(text) # or parser.call(text)
```

## Text Formatter Plugins

Fluentd supports [pluggable, customizable formats for output plugins](https://github.com/fluent/fluentd-docs-gitbook/tree/507e377b7e8e78a312dc49e76bd9a302c33fd058/developer/formatter-plugin-overview/README.md). The plugin files whose names start with "formatter\_" are registered as Formatter Plugins.

Here is an example of a custom formatter that outputs events as CSVs. It takes a required parameter called "csv\_fields" and outputs the fields. It assumes that the values of the fields are already valid CSV fields. In `initialize`, `configure` and `start`, `super` should be called to call Formatter plugin default behaviour.

```
require 'fluent/formatter'

module Fluent
  module TextFormatter
    class MyCSVFormatter < Formatter
      # Register MyCSVFormatter as "my_csv".
      Plugin.register_formatter("my_csv", self)

      include HandleTagAndTimeMixin # If you wish to use tag_key, time_key, etc.          
      config_param :csv_fields, :array, value_type: :string

      # This method does further processing. Configuration parameters can be
      # accessed either via "conf" hash or member variables.
      def configure(conf)
        super
      end

      # This is the method that formats the data output.
      def format(tag, time, record)
        values = []

        # Look up each required field and collect them from the record
        @csv_fields.each do |field|
          v = record[field]
          if not v
            $log.error "#{field} is missing."
          end
          values << v
        end

        # Output by joining the fields with a comma
        values.join(",")
      end
    end        
  end
end
```

Then, save this code in `formatter_my_csv.rb` in a loadable plugin path. Then, if out\_file is configured as

```
# Other lines...
<match test>
  @type file
  path /path/to/output/file
  format my_csv
  csv_fields k1,k2
</match>
```

and if the record `{"k1": 100, "k2": 200}` is matched, the output file should look like `100,200`

## Error stream

`router` has `emit_error_event` API to rescue invalid events. Emitted events via `emit_error_event` are routed to `@ERROR` label.

There are several use cases:

* Rescue invalid event which hard to apply filter routine, e.g. don't

  have geoip target field.
* Rescue invalid event which hard to serialize in the output, e.g.

  can't convert a record into BSON.

### API

```
 :::text
 router.emit_error_event(tag, time, record, error)
```

* tag: String: recommend to use incoming event tag
* time: Integer: recommend to use incoming event time
* record: Hash: recommend to use incoming event record
* error: Exception: use a raised exception

## config\_param

`config_param` helper defines plugin parameter. You don't need to parse parameters manually. `config_param` syntax is `config_param :name, :type, options`. Here is simple example:

```
config_param :param1, :string
config_param :param2, :integer, default: 10
```

In this example, `param1` is required string parameter. If a user doesn't specify `param1`, fluentd raises an `ConfigError`. On the other hand, `param2` is optional integer parameter. If a user doesn't specify `param2`, fluentd set `10` to `param2` automatically. If a user sets "5" to `param2` parameter, fluentd converts it into integer type automatically.

### Access parameter value

`config_param` sets parsed result to `:name` instance variable after `configure` call. See example below:

```
config_param :param, :string

def configure(conf)
  super # This super is needed to parse conf by config_param

  p @param # You can access parsed result via instance variable. :param is used for variable name
end
```

### Supported types

Fluentd supports following built-in types for plugin parameter:

```
# hello, /path/to/file, etc
config_param :str_param, :string
# -1, 100, 100000, etc
config_param :int_param, :integer
# 0.1, 999.9, etc
config_param :float_param, :float
# true: yes / true, false: no / false
config_param :bool_param, :bool
# json object: {"k":"v"}, {"k", 10}
config_param :hash_param, :hash
# json array: [1, 10], ["foo", "bar"]
config_param :array_param, :array, value_type: :integer
# integer with size unit: 1k, 2m, 3g, 4t
config_param :size_param, :size
# integer with time unit: 1s, 2m, 3h, 4d
config_param :time_param, :time
# fixed list with `list` option: [:text, :gzip]
config_param :enum_param, :enum, list: [:tcp, :udp]
```

### Supported options

* `default`: Specified parameter becomes optional. `type` value or

  `nil` are available: `default: 10`, `default: nil`.
* `secret`: Specified parameter is masked when dump a configuration,

  e.g. start logs, in\_monitor\_agent result: `secret: true`
* `deprecated`: Specified parameter is showed in warning log. Need

  deprecated message for value: `deprecated: "Use xxx instead"`
* `obsoleted`: Specified parameter is showed in error log with

  configuration error. Need obsoleted message for value:

  `obsoleted: "Use xxx instead"`

These options can be combined.

```
config_param :param, :array, default: [1, 2], secret: true, deprecated: "Use new_param instead"
```

## Debugging plugins

Run `fluentd` with the `-vv` option to show debug messages:

```
$ fluentd -vv
```

The **stdout** and **copy** output plugins are useful for debugging. The **stdout** output plugin dumps matched events to the console. It can be used as follows:

```
# You want to debug this plugin.
<source>
  @type your_custom_input_plugin
</source>

# Dump all events to stdout.
<match **>
  @type stdout
</match>
```

The **copy** output plugin copies matched events to multiple output plugins. You can use it in conjunction with the stdout plugin:

```
<source>
  @type forward
</source>

# Use the forward Input plugin and the fluent-cat command to feed events:
#  $ echo '{"event":"message"}' | fluent-cat test.tag
<match test.tag>
  @type copy

  # Dump the matched events.
  <store>
    @type stdout
  </store>

  # Feed the dumped events to your plugin.
  <store>
    @type your_custom_output_plugin
  </store>
</match>
```

You can use **stdout** filter instead of **copy** and **stdout** combination. The result is same as above but more simpler.

```
<source>
  @type forward
</source>

<filter>
  @type stdout
</filter>

<match test.tag>
  @type your_custom_output_plugin
</match>
```

## Writing test cases

Fluentd provides unit test frameworks for plugins:

```
Fluent::Test::InputTestDriver
  Test driver for input plugins.

Fluent::Test::BufferedOutputTestDriver
  Test driver for buffered output plugins.

Fluent::Test::OutputTestDriver
  Test driver for non-buffered output plugins.
```

Please see Fluentd's source code for details.

### Run test

Fluentd test follows standard gem way and uses test-unit library. Use rake command.

```
$ bundle install --path vendor/bundle # Install related libraries.
$ bundle exec rake test
```

If you want to run only one file, use `TEST` environment variable:

```
$ bundle exec rake test TEST=test/plugin/test_out_foo.rb
```

## Further Reading

* [Slides: Dive into Fluentd Plugin](http://www.slideshare.net/repeatedly/dive-into-fluentd-plugin-v012)

If this article is incorrect or outdated, or omits critical information, please [let us know](https://github.com/fluent/fluentd-docs-gitbook/issues?state=open). [Fluentd](http://www.fluentd.org/) is a open source project under [Cloud Native Computing Foundation (CNCF)](https://cncf.io/). All components are available under the Apache 2 License.
