Comment on page
How to Write Input Plugin
Extend
Fluent::Plugin::Input
class and implement its methods.In most cases, input plugins start timers, threads, or network servers to listen on ports in
#start
method and then call router.emit
in the callbacks of timers, threads or network servers to emit events.Example:
require 'fluent/plugin/input'
module Fluent::Plugin
class SomeInput < Input
# First, register the plugin. 'NAME' is the name of this plugin
# and identifies the plugin in the configuration file.
Fluent::Plugin.register_input('NAME', self)
# `config_param` defines a parameter.
# You can refer to a parameter like an instance variable e.g. @port.
# `:default` means that the parameter is optional.
config_param :port, :integer, default: 8888
# `configure` is called before `start`.
# 'conf' is a `Hash` that includes the configuration parameters.
# If the configuration is invalid, raise `Fluent::ConfigError`.
def configure(conf)
super
# The configured 'port' is referred by `@port` or instance method `#port`.
if @port < 1024
raise Fluent::ConfigError, "well-known ports cannot be used."
end
# You can also refer to raw parameter via `conf[name]`.
@port = conf['port']
# ...
end
# `start` is called when starting and after `configure` is successfully completed.
# Open sockets or files and create threads here.
def start
super
# Startup code goes here!
end
# `shutdown` is called while closing down.
def shutdown
# Shutdown code goes here!
super
end
end
end
To submit events, use
router.emit(tag, time, record)
method, where:tag
is aString
,time
is theFluent::EventTime
orInteger
as Unix time; and,record
is aHash
object.
Example:
tag = "myapp.access"
time = Fluent::Engine.now
record = {"message"=>"body"}
router.emit(tag, time, record)
To submit multiple events, use
router.emit_stream(tag, es)
method, where:tag
is aString
; and,es
is aMultiEventStream
object.
Example:
es = MultiEventStream.new
records.each do |record|
es.add(time, record)
end
router.emit_stream(tag, es)
Fluentd plugins assume the record is in JSON format so the key should be the
String
, not Symbol
. If you emit a record with a key as Symbol
, it may cause a problem.Example:
# Good
router.emit(tag, time, {'foo' => 'bar'})
# Bad
router.emit(tag, time, {:foo => 'bar'})
There are no specific methods for the Input plugins.
Fluentd input plugin has one or more points to be tested. Others aspects (parsing configurations, controlling buffers, retries, flushes, etc.) are controlled by the Fluentd core.
Fluentd also provides the test drivers for plugins. You can write tests for your own plugins very easily:
# test/plugin/test_in_your_own.rb
require 'test/unit'
require 'fluent/test'
require 'fluent/test/driver/input'
# Your own plugin
require 'fluent/plugin/in_your_own'
class YourOwnInputTest < Test::Unit::TestCase
def setup
# This line is required to set up router, and other required components.
Fluent::Test.setup
end
# Default configuration for tests
CONFIG = %[
param1 value1
param2 value2
]
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Input.new(Fluent::Plugin::YourOwnInput).configure(conf)
end
sub_test_case 'configured with invalid configurations' do
test 'param1 should reject too short string' do
assert_raise Fluent::ConfigError do
create_driver(%[
param1 a
])
end
end
test 'param2 is set correctly' do
d = create_driver
assert_equal 'value2', d.instance.param2
end
# ...
end
sub_test_case 'plugin will emit some events' do
test 'test expects plugin emits events 4 times' do
d = create_driver
# This method blocks until the input plugin emits events 4 times
# or 10 seconds lapse.
d.run(expect_emits: 4, timeout: 10)
# An array of `[tag, time, record]`
events = d.events
assert_equal 'expected_tag', events[0][0]
# ...
end
end
# ...
end
Testing for input plugins is mainly for:
- Validation of configuration (i.e.
#configure
) - Validation of the emitted events
To make testing easy, the test driver provides a dummy router, a logger and the functionality to override system and parser configurations, etc.
The lifecycle of plugin and test driver is:
- 1.Instantiate plugin driver which then instantiates the plugin
- 2.Configure plugin
- 3.Register conditions to stop/break running tests
- 4.Run test code (provided as a block for
d.run
) - 5.Assert results of tests by data provided by the driver
Test driver calls methods for plugin lifecycle at the beginning of Step # 4 (i.e.
#start
) and at the end (i.e. #stop
, #shutdown
, etc.). It can be skipped by optional arguments of #run
.For:
- configuration tests, repeat steps # 1-2
- full feature tests, repeat steps # 1-5
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.
Last modified 8mo ago