Versions | v0.14 (td-agent3)

This page is for v0.14, not the latest stable version which is v0.12. For the latest stable version of this article, click here.


Writing Input Plugins

Extend the Fluent::Plugin::Input class and implement the following methods. See Plugin Base Class API to show details of common API for all plugin types.

In most cases, input plugins start timers, threads or network servers to listen any ports in #start, then call router.emit to emit events in callbacks of timers, threads or network servers.

require 'fluent/plugin/input'

module Fluent::Plugin
  class SomeInput < Input
    # First, register the plugin. NAME is the name of this plugin
    # and identifies the plugin in the configuration file.
    Fluent::Plugin.register_input('NAME', self)

    # config_param defines a parameter. You can refer a parameter via @port instance variable
    # :default means this parameter is optional
    config_param :port, :integer, default: 8888

    # This method is called before starting.
    # 'conf' is a Hash that includes configuration parameters.
    # If the configuration is invalid, raise Fluent::ConfigError.
    def configure(conf)
      super

      # configured "port" is referred by `@port` or instance method #port
      if @port < 1024
        raise Fluent::ConfigError, "well known ports cannot be used for this purpose."
      end

      # You can also refer to raw parameter via conf[name].
      @port = conf['port']
      ...
    end

    # This method is called when starting.
    # Open sockets or files and create a thread here.
    def start
      super

      # my own start-up code
    end

    # This method is called when shutting down.
    def shutdown
      # my own shutdown code

      super
    end
  end
end

To submit events, use the router.emit(tag, time, record) method, where tag is the String, time is the Fluent::EventTime (or Integer as unix time) and record is a Hash object.

tag = "myapp.access"
time = Fluent::Engine.now
record = {"message"=>"body"}
router.emit(tag, time, record)

To submit multiple events in one call, use the router.emit_stream(tag, es) and MultiEventStream combo instead.

es = MultiEventStream.new
records.each { |record|
  es.add(time, record)
}
router.emit_stream(tag, es)

Record format

Fluentd plugins assume the record is a JSON so the key should be the String, not Symbol. If you emit a symbol keyed record, it may cause a problem.

router.emit(tag, time, {'foo' => 'bar'})  # OK!
router.emit(tag, time, {:foo => 'bar'})   # NG!

Table of Contents

Methods

There are no specific method for Input plugins.

Writing Tests

Fluentd input plugin has just one or some points to be tested. Others (parsing configurations, controlling buffers, retries, flushes and many others) are controlled by Fluentd core.

Fluentd also provides test driver for plugins. You can write tests of your own plugins very easily:

# test/plugin/test_in_your_own.rb

require 'test/unit'
require 'fluent/test/driver/input'

# your own plugin
require 'fluent/plugin/in_your_own'

class YourOwnInputTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup  # this is required to setup router and others
  end

  # default configuration for tests
  CONFIG = %[
    param1 value1
    param2 value2
  ]

  def create_driver(conf = CONFIG)
    Fluent::Test::Driver::Input.new(Fluent::Plugin::YourOwnInput).configure(conf)
  end

  sub_test_case 'configured with invalid configurations' do
    test 'param1 should reject too short string' do
      assert_raise Fluent::ConfigError do
        create_driver(%[
          param1 a
        ])
      end
    end

    test 'param2 is set correctly' do
      d = create_driver
      assert_equal "value2", d.instance.param2
    end

    # ...
  end

  sub_test_case 'plugin will emit some events' do
    test 'test expects plugin emits events 4 times' do
      d = create_driver

      d.run(expect_emits: 4, timeout: 10)
      # this method blocks until input plugin emits events 4 times
      # or 10 seconds passes

      events = d.events # array of [tag, time, record]
      assert_equal "expected_tag", events[0][0]
      # ...
    end
  end

  # ...
end

Overview of Tests

Testing for output plugins are mainly for:

  • Configuration/Validation checks for invalid configurations (about #configure)
  • Checks for emitted events by input plugins

Plugin test driver provides dummy router, logger and feature to override system configurations, and configuration parser and others to make it easy to test configuration errors or others.

Lifecycle of plugins and test drivers is:

  1. Instantiate plugin driver (and it instantiates plugin)
  2. Configure plugin
  3. Register conditions to stop/break running tests
  4. Run test code (provided as a block for d.run)
  5. Assert results of tests by data provided from driver

Test drivers calls methods for plugin lifecycles at the beginning of 4. (#start) and the end of 4. (#stop, #shutdown, …). It can be skipped by optional arguments of #run. See Testing API for plugins for details.

For configuration tests, repeat 1-2. For full feature tests, repeat 1-5. Test drivers and helper methods will support it.

Last updated: 2016-06-13 06:11:23 UTC

Versions | v0.14 (td-agent3)

If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is a open source project under Cloud Native Computing Foundation (CNCF), originally invented by Treasure Data, Inc. All components are available under the Apache 2 License.

Interested in the Fluentd Newsletters?