Fluentd
Search…
How to Write Input Plugin
Extend Fluent::Plugin::Input class and implement its methods.
See Plugin Base Class API for details on the common APIs for all plugin types.
In most cases, input plugins start timers, threads, or network servers to listen on ports in #start method and then call router.emit in the callbacks of timers, threads or network servers to emit events.
Example:
1
require 'fluent/plugin/input'
2
3
module Fluent::Plugin
4
class SomeInput < Input
5
# First, register the plugin. 'NAME' is the name of this plugin
6
# and identifies the plugin in the configuration file.
7
Fluent::Plugin.register_input('NAME', self)
8
9
# `config_param` defines a parameter.
10
# You can refer to a parameter like an instance variable e.g. @port.
11
# `:default` means that the parameter is optional.
12
config_param :port, :integer, default: 8888
13
14
# `configure` is called before `start`.
15
# 'conf' is a `Hash` that includes the configuration parameters.
16
# If the configuration is invalid, raise `Fluent::ConfigError`.
17
def configure(conf)
18
super
19
20
# The configured 'port' is referred by `@port` or instance method `#port`.
21
if @port < 1024
22
raise Fluent::ConfigError, "well-known ports cannot be used."
23
end
24
25
# You can also refer to raw parameter via `conf[name]`.
26
@port = conf['port']
27
# ...
28
end
29
30
# `start` is called when starting and after `configure` is successfully completed.
31
# Open sockets or files and create threads here.
32
def start
33
super
34
35
# Startup code goes here!
36
end
37
38
# `shutdown` is called while closing down.
39
def shutdown
40
# Shutdown code goes here!
41
42
super
43
end
44
end
45
end
Copied!
To submit events, use router.emit(tag, time, record) method, where:
    tag is a String,
    time is the Fluent::EventTime or Integer as Unix time; and,
    record is a Hash object.
Example:
1
tag = "myapp.access"
2
time = Fluent::Engine.now
3
record = {"message"=>"body"}
4
router.emit(tag, time, record)
Copied!
To submit multiple events, use router.emit_stream(tag, es) method, where:
    tag is a String; and,
    es is a MultiEventStream object.
Example:
1
es = MultiEventStream.new
2
records.each do |record|
3
es.add(time, record)
4
end
5
router.emit_stream(tag, es)
Copied!

Record Format

Fluentd plugins assume the record is in JSON format so the key should be the String, not Symbol. If you emit a record with a key as Symbol, it may cause a problem.
Example:
1
# Good
2
router.emit(tag, time, {'foo' => 'bar'})
3
4
# Bad
5
router.emit(tag, time, {:foo => 'bar'})
Copied!

Methods

There are no specific methods for the Input plugins.

Writing Tests

Fluentd input plugin has one or more points to be tested. Others aspects (parsing configurations, controlling buffers, retries, flushes, etc.) are controlled by the Fluentd core.
Fluentd also provides the test drivers for plugins. You can write tests for your own plugins very easily:
1
# test/plugin/test_in_your_own.rb
2
3
require 'test/unit'
4
require 'fluent/test/driver/input'
5
6
# Your own plugin
7
require 'fluent/plugin/in_your_own'
8
9
class YourOwnInputTest < Test::Unit::TestCase
10
def setup
11
# This line is required to set up router, and other required components.
12
Fluent::Test.setup
13
end
14
15
# Default configuration for tests
16
CONFIG = %[
17
param1 value1
18
param2 value2
19
]
20
21
def create_driver(conf = CONFIG)
22
Fluent::Test::Driver::Input.new(Fluent::Plugin::YourOwnInput).configure(conf)
23
end
24
25
sub_test_case 'configured with invalid configurations' do
26
test 'param1 should reject too short string' do
27
assert_raise Fluent::ConfigError do
28
create_driver(%[
29
param1 a
30
])
31
end
32
end
33
34
test 'param2 is set correctly' do
35
d = create_driver
36
assert_equal 'value2', d.instance.param2
37
end
38
# ...
39
end
40
41
sub_test_case 'plugin will emit some events' do
42
test 'test expects plugin emits events 4 times' do
43
d = create_driver
44
45
# This method blocks until the input plugin emits events 4 times
46
# or 10 seconds lapse.
47
d.run(expect_emits: 4, timeout: 10)
48
49
# An array of `[tag, time, record]`
50
events = d.events
51
52
assert_equal 'expected_tag', events[0][0]
53
# ...
54
end
55
end
56
# ...
57
end
Copied!

Overview of Tests

Testing for input plugins is mainly for:
    Validation of configuration (i.e. #configure)
    Validation of the emitted events
To make testing easy, the test driver provides a dummy router, a logger and the functionality to override system and parser configurations, etc.
The lifecycle of plugin and test driver is:
    1.
    Instantiate plugin driver which then instantiates the plugin
    2.
    Configure plugin
    3.
    Register conditions to stop/break running tests
    4.
    Run test code (provided as a block for d.run)
    5.
    Assert results of tests by data provided by the driver
Test driver calls methods for plugin lifecycle at the beginning of Step # 4 (i.e. #start) and at the end (i.e. #stop, #shutdown, etc.). It can be skipped by optional arguments of #run.
See Testing API for Plugins for details.
For:
    configuration tests, repeat steps # 1-2
    full feature tests, repeat steps # 1-5
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.
Last modified 4mo ago