How to Write Input Plugin

Extend the Fluent::Plugin::Input class and implement the following methods. See Plugin Base Class API to show details of common API for all plugin types.

In most cases, input plugins start timers, threads or network servers to listen any ports in #start, then call router.emit to emit events in callbacks of timers, threads or network servers.

require 'fluent/plugin/input'
module Fluent::Plugin
class SomeInput < Input
# First, register the plugin. NAME is the name of this plugin
# and identifies the plugin in the configuration file.
Fluent::Plugin.register_input('NAME', self)
# config_param defines a parameter. You can refer a parameter via @port instance variable
# :default means this parameter is optional
config_param :port, :integer, default: 8888
# This method is called before starting.
# 'conf' is a Hash that includes configuration parameters.
# If the configuration is invalid, raise Fluent::ConfigError.
def configure(conf)
super
# configured "port" is referred by `@port` or instance method #port
if @port < 1024
raise Fluent::ConfigError, "well known ports cannot be used for this purpose."
end
# You can also refer to raw parameter via conf[name].
@port = conf['port']
...
end
# This method is called when starting.
# Open sockets or files and create a thread here.
def start
super
# my own start-up code
end
# This method is called when shutting down.
def shutdown
# my own shutdown code
super
end
end
end

To submit events, use the router.emit(tag, time, record) method, where tag is the String, time is the Fluent::EventTime (or Integer as unix time) and record is a Hash object.

tag = "myapp.access"
time = Fluent::Engine.now
record = {"message"=>"body"}
router.emit(tag, time, record)

To submit multiple events in one call, use the router.emit_stream(tag, es) and MultiEventStream combo instead.

es = MultiEventStream.new
records.each { |record|
es.add(time, record)
}
router.emit_stream(tag, es)

Record format

Fluentd plugins assume the record is a JSON so the key should be the String, not Symbol. If you emit a symbol keyed record, it may cause a problem.

router.emit(tag, time, {'foo' => 'bar'}) # OK!
router.emit(tag, time, {:foo => 'bar'}) # NG!

Methods

There are no specific method for Input plugins.

Writing Tests

Fluentd input plugin has just one or some points to be tested. Others (parsing configurations, controlling buffers, retries, flushes and many others) are controlled by Fluentd core.

Fluentd also provides test driver for plugins. You can write tests of your own plugins very easily:

# test/plugin/test_in_your_own.rb
require 'test/unit'
require 'fluent/test/driver/input'
# your own plugin
require 'fluent/plugin/in_your_own'
class YourOwnInputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup # this is required to setup router and others
end
# default configuration for tests
CONFIG = %[
param1 value1
param2 value2
]
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Input.new(Fluent::Plugin::YourOwnInput).configure(conf)
end
sub_test_case 'configured with invalid configurations' do
test 'param1 should reject too short string' do
assert_raise Fluent::ConfigError do
create_driver(%[
param1 a
])
end
end
test 'param2 is set correctly' do
d = create_driver
assert_equal "value2", d.instance.param2
end
# ...
end
sub_test_case 'plugin will emit some events' do
test 'test expects plugin emits events 4 times' do
d = create_driver
d.run(expect_emits: 4, timeout: 10)
# this method blocks until input plugin emits events 4 times
# or 10 seconds passes
events = d.events # array of [tag, time, record]
assert_equal "expected_tag", events[0][0]
# ...
end
end
# ...
end

Overview of Tests

Testing for input plugins are mainly for:

  • Configuration/Validation checks for invalid configurations (about

    #configure)

  • Checks for emitted events by input plugins

Plugin test driver provides dummy router, logger and feature to override system configurations, and configuration parser and others to make it easy to test configuration errors or others.

Lifecycle of plugins and test drivers is:

  1. Instantiate plugin driver (and it instantiates plugin)

  2. Configure plugin

  3. Register conditions to stop/break running tests

  4. Run test code (provided as a block for d.run)

  5. Assert results of tests by data provided from driver

Test drivers calls methods for plugin lifecycles at the beginning of 4. (#start) and the end of 4. (#stop, #shutdown, ...). It can be skipped by optional arguments of #run. See Testing API for plugins for details.

For configuration tests, repeat 1-2. For full feature tests, repeat 1-5. Test drivers and helper methods will support it.

If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is a open source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.