Versions | v1.0 (td-agent3)

Writing Parser Plugins

Fluentd supports pluggable, customizable formats for input plugins. The plugin files whose names start with “parser_” are registered as Parser Plugins. See Plugin Base Class API to show details of common API for all plugin types.

Here is an example of a custom parser that parses the following newline-delimited log format:

<timestamp><SPACE>key1=value1<DELIMITER>key2=value2<DELIMITER>key3=value...

e.g., something like this

2014-04-01T00:00:00 name=jake age=100 action=debugging

While it is not hard to write a regular expression to match this format, it is tricky to extract and save key names.

Here is the code to parse this custom format (let’s call it time_key_value). It takes one optional parameter called delimiter, which is the delimiter for key-value pairs. It also takes time_format to parse the time string.

require 'fluent/plugin/parser'

module Fluent::Plugin
  class TimeKeyValueParser < Parser
    # Register this parser as "time_key_value"
    Plugin.register_parser("time_key_value", self)

    config_param :delimiter, :string, default: " "   # delimiter is configurable with " " as default
    config_param :time_format, :string, default: nil # time_format is configurable

    def configure(conf)
      super

      if @delimiter.length != 1
        raise ConfigError, "delimiter must be a single character. #{@delimiter} is not."
      end

      # TimeParser class is already given. It takes a single argument as the time format
      # to parse the time string with.
      @time_parser = TimeParser.new(@time_format)
    end

    def parse(text)
      time, key_values = text.split(" ", 2)
      time = @time_parser.parse(time)
      record = {}
      key_values.split(@delimiter).each { |kv|
        k, v = kv.split("=", 2)
        record[k] = v
      }
      yield time, record
    end
  end
end

Then, save this code in parser_time_key_value.rb in a loadable plugin path. Then, if in_tail is configured as

# Other lines...
<source>
  @type tail
  path /path/to/input/file
  <parse>
    @type time_key_value
  </parse>
</source>

Then, the log line like 2014-01-01T00:00:00 k=v a=b is parsed as 2013-01-01 00:00:00 +0000 test: {"k":"v","a":"b"}.

For more details about <parse> section, see Parse section configurations.

Table of Contents

How To Use Parsers From Plugins

Parser plugins are designed to be used from other plugins, like Input, Filter and Output. There is a Parser plugin helper for that purpose (v0.14.1 or later):

# in class definition
helpers :parser

# in #configure
@parser = parser_create(type: 'json')

# in input loop or #filter or ...
@parser.parse do |time, record|
  # ...
end

See Parser Plugin Helper API for details.

Methods

Parser plugins have a method to parse input (text) data to a structured record (Hash) with time.

#parse(text, &block)

Parser#parse gets input data as an argument, and call callback block to feed results of parser. The input text may contain 2 or more records, so the parser plugin might call block 2 or more times for an argument.

Parser plugins must implement this method.

Writing Tests

Fluentd parser plugin has just one or some points to be tested. Others (parsing configurations, controlling buffers, retries, flushes and many others) are controlled by Fluentd core.

Fluentd also provides test driver for plugins. You can write tests of your own plugins very easily:

::ruby
# test/plugin/test_parser_your_own.rb

require 'test/unit'
require 'fluent/test/driver/parser

# your own plugin
require 'fluent/plugin/parser_your_own'

class ParserYourOwnTest < Test::Unit::TestCase
  def setup
    # common setup
  end

  COPNFIG = %[
    pattern apache
  ]

  def create_driver(conf = CONF)
    Fluent::Test::Driver::Parser.new(Fluent::Plugin::YourOwnParser).configure(conf)
  end

  sub_test_case 'configured with invalid configurations' do
    test 'empty' do
      assert_raise(Fluent::ConfigError) do
        create_driver('')
      end
    end
    # ...
  end

  sub_test_case 'plugin will parse text' do
    test 'record has a field' do
      d = create_driver(CONFIG)
      text = '192.168.0.1 - - [28/Feb/2013:12:00:00 +0900] "GET / HTTP/1.1" 200 777'
      expected_time = event_time('28/Feb/2013:12:00:00 +0900', format: '%d/%b/%Y:%H:%M:%S %z')
      expected_record = {
        "method" => "GET",
        # ...
      }
      d.instance.parse(text) do |time, record|
        assert_equal(expected_time, time)
        assert_equal(expected_record, record)
      end
    end
  end
end

Overview of Tests

Testing for parser plugins are mainly for:

  • Configuration/Validation checks for invalid configurations (about #configure)
  • Checks for parsed time and record by parser plugins

Plugin test driver provides logger and feature to override system configurations, and configuration parser and others to make it easy to test configuration errors or others.

Lifecycle of plugins and test drivers is:

  1. Instantiate plugin driver (and it instantiates plugin)
  2. Configure plugin
  3. Run test code
  4. Assert results of tests by data provided from driver

Test drivers instantiate the plugin. See Testing API for plugins for details.

For configuration tests, repeat 1-2. For full feature tests, repeat 1-4. Test drivers and helper methods will support it.

Last updated: 2018-04-25 10:29:32 +0000

Versions | v1.0 (td-agent3)

If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is a open source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.