Fluentd
Search…
How to Write Tests for Plugin
This article explains how to write Fluentd plugin test code using test-unit.
You can write test code with any other testing framework such as RSpec, minitest, etc.

Basics of Plugin Testing

Fluentd provides useful Test Drivers according to plugin type. We can write maintainable test code for plugins using them. We can write helper.rb for output plugin as follows:
1
$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
2
require "test-unit"
3
require "fluent/test"
4
require "fluent/test/driver/output"
5
require "fluent/test/helpers"
6
7
Test::Unit::TestCase.include(Fluent::Test::Helpers)
8
Test::Unit::TestCase.extend(Fluent::Test::Helpers)
Copied!
Note that Fluentd provides useful Test Drivers for input, output, filter, parser and formatter.
The recommended fluentd plugin project structure is:
1
.
2
├── Gemfile
3
├── LICENSE
4
├── README.md
5
├── Rakefile
6
├── fluent-plugin-<your_fluentd_plugin_name>.gemspec
7
├── lib
8
│ └── fluent
9
│ └── plugin
10
│ └── <plugin_type>_<your_fluentd_plugin_name>.rb
11
└── test
12
├── helper.rb
13
└── plugin
14
└── test_<plugin_type>_<your_fluentd_plugin_name>.rb
Copied!

Plugin Test Driver Overview

There are some useful Test Drivers for plugin testing. We can write test code for plugins as following:
1
# Load the module that defines common initialization method (Required)
2
require 'fluent/test'
3
# Load the module that defines helper methods for testing (Required)
4
require 'fluent/test/helpers'
5
# Load the test driver (Required)
6
require 'fluent/test/driver/output'
7
# Load the plugin (Required)
8
require 'fluent/plugin/out_file'
9
10
class FileOutputTest < Test::Unit::TestCase
11
include Fluent::Test::Helpers
12
13
def setup
14
Fluent::Test.setup # Setup test for Fluentd (Required)
15
# Setup test for plugin (Optional)
16
# ...
17
end
18
19
def teardown
20
# Terminate test for plugin (Optional)
21
end
22
23
def create_driver(conf = CONFIG)
24
Fluent::Test::Driver::Output.new(Fluent::Plugin::FileOutput).configure(conf)
25
end
26
27
# Configuration related test group
28
sub_test_case 'configuration' do
29
test 'basic configuration' do
30
d = create_driver(basic_configuration)
31
assert_equal 'something', d.instance.parameter_name
32
end
33
end
34
35
# Another test group goes here
36
sub_test_case 'path' do
37
test 'normal' do
38
d = create_driver('...')
39
d.run(default_tag: 'test') do
40
d.feed(event_time, record)
41
end
42
events = d.events
43
assert_equal(1, events.size)
44
end
45
end
46
end
Copied!

Testing Utility Methods

You can get a plugin instance by calling Test Driver instance's #instance method. If utility methods are private, use __send__.
1
# ...
2
class FileOutputTest < Test::Unit::TestCase
3
# ...
4
# Group by utility method
5
sub_test_case '#compression_suffix' do
6
test 'returns empty string for nil (no compression method specified)' do
7
d = create_driver
8
assert_equal('', d.instance.compression_suffix(nil))
9
end
10
end
11
end
Copied!

Test Driver Base API

The methods in this section are available for all Test Driver.

initialize(klass, opts: {}, &block)

Initializes Test Driver instance.
    klass: A class of Fluentd plugin
    opts: Overwrite system config. This parameter is useful for testing multi
    workers.
    block: Customize plugin behavior. We can overwrite plugin code in this
    block.
Example:
1
def create_driver(conf={})
2
d = Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput) do
3
attr_accessor :exceptions
4
# Almost same as to reopen plugin class
5
def prefer_buffered_processing
6
false
7
end
8
def process(tag, es)
9
# Drop events
10
end
11
end
12
d.configure(conf)
13
end
Copied!

configure(conf, syntax: :v1)

Configures plugin instance managed by the Test Driver.
    conf: Fluent::Config::Element or String
    syntax: { :v1, :v0, :ruby } :v0 is obsolete.
Example:
1
def create_driver(conf={})
2
Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf)
3
end
4
5
def test_process
6
conf = %[
7
path /path/to/something
8
host localhost
9
port 24229
10
]
11
d = create_driver(conf)
12
end
Copied!

end_if(&block)

Registers the conditions to stop the running Test Driver gracefully.
All registered conditions must be true before Test Driver stops.

break_if(&block)

Registers the conditions to stop the running Test Driver.
Test Driver should stop running if some of the breaking conditions are true.

broken?

Returns true when some of the breaking conditions are true. Otherwise false.

run(timeout: nil, start: true, shutdown: true, &block)

Runs the Test Driver. This Test Driver will stop running immediately after evaluating the block if given.
Otherwise, you must register the conditions to stop the running Test Driver.
This method may be overridden in subclasses.
    timeout: timeout (seconds)
    start: if true, start the Test Driver. Otherwise, invoke instance_start
    method to start it.
    shutdown: if true, shut down the running Test Driver.
Example:
1
# Run Test Driver and feed an event (output)
2
d = create_driver
3
d.run do
4
d.feed(time, record)
5
end
6
7
# Emit multiple events (output)
8
d = create_driver
9
10
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: false) { d.feed(time, { "k1" => 1 })}
11
12
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false) { d.feed(time, { "k1" => 2 })}
13
14
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: true ) { d.feed(time, { "k1" => 3 })}
Copied!

stop?

Returns true when all the stopping conditions are true. Otherwise false.

logs

Returns logs managed by this Test Driver.

instance

Returns the plugin instance managed by this Test Driver.

Test Driver Base Owner API

    filter
    output
    multi_output

run(expect_emits: nil, expect_records: nil, timeout: nil, start: true, shutdown: true, &block)

Run Test Driver. This Test Driver will stop running immediately after evaluating block if given.
Otherwise, you must register conditions to stop running Test Driver.
This method may be overridden in subclasses.
    expect_emits: set the number of expected emits
    expect_records: set the number of expected records
    timeout: timeout (seconds)
    start: if true, start the Test Driver. Otherwise, invoke the
    instance_start method to start it.
    shutdown: if true, shut down the running Test Driver.
Example:
1
# Run Test Driver and feed an event (owner plugin)
2
d = create_driver
3
d.run do
4
d.feed(time, record)
5
end
6
7
# Emit multiple events (owner plugin)
8
d = create_driver
9
10
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: false) { d.feed(time, { "k1" => 1 })}
11
12
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false) { d.feed(time, { "k1" => 2 })}
13
14
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: true ) { d.feed(time, { "k1" => 3 })}
Copied!

events(tag: nil)

Returns the events filtered by the given tag.
    tag: filter by this tag. If omitted, it returns all the events.

event_streams(tag: nil)

Returns the event streams filtered by the given tag.
    tag: filter by this tag. If omitted, it returns all the event streams.

emit_count

Returns the number of invoking router.emit.
If you want to delay the stopping of the Test Driver until a certain number of records are emitted, you can use d.run(expected_records: n) instead.
Example:
1
d.run do
2
d.feed('test', record)
3
end
4
assert_equal(1, d.emit_count)
Copied!

record_count

Returns the number of records.
If you want to delay the stopping of the Test Driver until a certain number of records are emitted, you can use d.run(expected_records: n) instead.

error_events(tag: nil)

Returns error events filtered by the given tag.
    tag: filter by this tag. If omitted, it returns all error events.

Test Driver Base owned API

configure(conf, syntax: :v1)

Configures plugin instance managed by this Test Driver.
    conf: Fluent::Config::Element, String, or Hash
    syntax: Supported: { :v1, :v0, :ruby } :v0 is obsolete.

Test Driver Event Feeder API

    filter
    output
    multi_output

run(default_tag: nil, **kwargs, &block)

Runs EventFeeder.
    default_tag: the default tag of the event
Example:
1
d.run(default_tag: 'test') do
2
d.feed(event_time, { 'message' => 'Hello, Fluentd!' })
3
end
Copied!

feed(tag, time, record)

Feeds an event to plugin instance.
    tag: the tag of the event
    time: event timestamp
    record: event record
Example:
1
d.run do
2
d.feed('test', event_time, { 'message' => 'Hello, Fluentd!' })
3
end
Copied!

feed(tag, array_event_stream)

Feeds an array of event stream to plugin instance.
    tag: the tag of the event
    array_event_stream: array of [time, record]
      time: event timestamp
      record: event record
Example:
1
d.run do
2
d.feed('test', [
3
[event_time, { 'message' => 'Hello, user1!' }],
4
[event_time, { 'message' => 'Hello, user2!' }]
5
])
6
end
Copied!

feed(tag, es)

Feeds an event stream to plugin instance.
    tag: the tag of the event
    es: event stream object
Example:
1
es = Fluent::OneEventStream.new(event_time, { 'message' => 'Hello, Fluentd!' })
2
d.run do
3
d.feed('test', es)
4
end
Copied!

feed(record)

Feeds an event with default tag to plugin instance.
    record: event record
Example:
1
d.run(default_tag: 'test') do
2
d.feed({ 'message' => 'Hello, Fluentd!' })
3
# Same as above ^
4
d.feed(event_time, { 'message' => 'Hello, Fluentd!' })
5
end
Copied!

feed(time, record)

Feeds an event with default tag to plugin instance.
    time: event timestamp
    record: event record
Example:
1
d.run(default_tag: 'test') do
2
d.feed(event_time, { 'message' => 'Hello, Fluentd!' })
3
end
Copied!

feed(array_event_stream)

Feeds an array of event stream with default tag to plugin instance.
    array_event_stream: array of [time, record]
      time: event timestamp
      record: event record
Example:
1
d.run(default_tag: 'test') do
2
d.feed([
3
[event_time, { 'message' => 'Hello, user1!' }],
4
[event_time, { 'message' => 'Hello, user2!' }]
5
])
6
end
Copied!

feed(es)

Feeds an event stream with default tag to plugin instance.
    es: event stream object
Example:
1
es = Fluent::OneEventStream.new(event_time, { 'message' => 'Hello, Fluentd!' })
2
d.run(default_tag: 'test') do
3
d.feed(es)
4
end
Copied!

Test Driver Filter API

filtered_records

Collects the filtered records.
1
d = create_driver(config)
2
d.run do
3
d.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })
4
end
5
d.filtered_records
Copied!

Test Driver Output API

run(flush: true, wait_flush_completion: true, force_flush_retry: false, **kwargs, &block)

    flush: flush forcibly
    wait_flush_completion: if true, waiting for flush to complete
    force_flush_retry: if true, retrying flush forcibly
Run Test Driver. This Test Driver will be stop running immediately after evaluating the block if given.
Otherwise, you must register conditions to stop running Test Driver.
Example:
1
d = create_driver(config)
2
d.run do
3
d.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })
4
end
Copied!

formatted

Returns the formatted records.
Example:
1
d = create_driver(config)
2
d.run do
3
d.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })
4
end
5
d.formatted
Copied!

flush

Flushes forcibly.
Example:
1
d = create_driver(config)
2
d.run do
3
d.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })
4
end
5
d.flush
Copied!

Test Helpers

assert_equal_event_time(expected, actual, message = nil)

Asserts the EventTime instance.
    expected: expected EventTime instance
    actual: actual EventTime instance
    message: message to display when assertion fails
Example:
1
parser = create_parser
2
parser.parse(text) do |time, record|
3
assert_equal_event_time(event_time('2017-12-27 09:43:50.123456789'), time)
4
end
Copied!

config_element(name = 'test', argument = '', params = {}, elements = [])

Create Fluent::Config::Element instance.
    name: element name such as match, filter, source, buffer, inject,
    format, parse, etc.
    argument: argument for section defined by config_argument
    params: parameters for section defined by config_element
    elements: child elements of this config element
Example:
1
conf = config_element('match', '**', {
2
'path' => "#{TMP_DIR}/prohibited/${tag}/file.%Y%m%d.log",
3
}, [
4
config_element('buffer', 'time,tag', {
5
'time_key' => 86400,
6
'timekey_zone' => '+0000'
7
})
8
]
9
)
10
d = create_driver(conf)
Copied!

event_time(str = nil, format: nil)

Creates a Fluent::EventTime instance.
    str: time represented as String
    format: parse str as time according to this format. See also
Example:
1
time = event_time
2
time = event_time('2016-10-03 23:58:09 UTC')
3
time = event_time('2016-04-11 16:40:01 +0000')
4
time = event_time('2016-04-17 11:15:00 -0700')
5
time = event_time('2011-01-02 13:14:15')
6
time = event_time('Sep 11 00:00:00', format: '%b %d %H:%M:%S')
7
time = event_time('28/Feb/2013:12:00:00 +0900', format: '%d/%b/%Y:%H:%M:%S %z')
8
time = event_time('2017-02-06T13:14:15.003Z', format: '%Y-%m-%dT%H:%M:%S.%L%z')
Copied!

with_timezone(tz, &block)

Processes the given block with tz. This method overrides ENV['TZ'] while processing its block.
    tz: timezone. This is set to ENV['TZ'].
Example:
1
time = with_timezone('UTC+02') do
2
parser = Fluent::TimeParser.new('%Y-%m-%d %H:%M:%S.%N', true)
3
parser.parse('2016-09-02 18:42:31.123456789')
4
end
5
assert_equal_event_time(time, event_time('2016-09-02 18:42:31.123456789 -02:00', format: '%Y-%m-%d %H:%M:%S.%N %z'))
Copied!

with_worker_config(root_dir: nil, workers: nil, worker_id: nil, &block)

Processes block with the given parameters. This method overrides the system configuration while processing its block.
This is useful for testing Fluentd's internal behavior related to multi workers.
    root_dir: root directory
    workers: the number of workers
    worker_id: ID of workers
Example:
1
class Dummy < Fluent::Plugin::Output
2
end
3
4
d = Dummy.new
5
with_worker_config(workers: 2, worker_id: 1) do
6
d.configure(conf)
7
end
8
9
# ...
Copied!

time2str(time, localtime: false, format: nil)

Converts time to String.
This is useful for testing the formatter.
    time: Fluent::EventTime instance. See also
    localtime: If true, processes time as localtime. Otherwise UTC.
    format: See also
1
formatter = configure_formatter(conf)
2
formatted = formatter.format(tag, time, record)
3
assert_equal("#{time2str(time)}\t#{JSON.dump(record)}\n", formatted)
Copied!

msgpack(type)

    type: Available types: { :factory, :packer, :unpacker }
Shorthand for:
    Fluent::MessagePackFactory.factory
    Fluent::MessagePackFactory.packer
    Fluent::MessagePackFactory.unpacker
Example:
1
events = []
2
factory = msgpack(:factory)
3
factory.unpacker.feed_each(binary) do |obj|
4
events << obj
5
end
Copied!

capture_stdout(&block)

Captures the standard output while processing the given block.
This is useful for testing Fluentd utility commands.
Example:
1
captured_string = capture_stdout do
2
# Print something to STDOUT
3
puts 'Hello!'
4
end
5
6
assert_equal('Hello!\n', capture_stdout)
Copied!

Testing Input Plugins

You must test the input plugins' router#emit method. But you do not have to test this method explicitly. Its testing code pattern is encapsulated in the Input Test Driver.
You can write input plugins test like this:
1
require 'fluent/test'
2
require 'fluent/test/driver/input'
3
require 'fluent/test/helpers'
4
require 'fluent/plugin/input_my'
5
6
class MyInputTest < Test::Unit::TestCase
7
include Fluent::Test::Helpers
8
9
setup do
10
Fluent::Test.setup
11
end
12
13
def create_driver(conf = {})
14
Fluent::Test::Driver::Input.new(Fluent::Plugin::MyInput).configure(conf)
15
end
16
17
test 'emit' do
18
d = create_driver(config)
19
d.run(timeout: 0.5)
20
21
d.events.each do |tag, time, record|
22
assert_equal('input.test', tag)
23
assert_equal({ 'foo' => 'bar' }, record)
24
assert(time.is_a?(Fluent::EventTime))
25
end
26
end
27
end
Copied!

Testing Filter Plugins

You must test filter plugins' #filter method. But you do not have to test this method explicitly. Its testing code pattern is encapsulated in Filter Test Driver.
You can write filter plugins test like this:
1
require 'fluent/test'
2
require 'fluent/test/driver/filter'
3
require 'fluent/test/helpers'
4
require 'fluent/plugin/filter_my'
5
6
class MyInputTest < Test::Unit::TestCase
7
include Fluent::Test::Helpers
8
9
setup do
10
Fluent::Test.setup
11
end
12
13
def create_driver(conf = {})
14
Fluent::Test::Driver::Filter.new(Fluent::Plugin::MyFilter).configure(conf)
15
end
16
17
test 'filter' do
18
d = create_driver(config)
19
time = event_time
20
d.run do
21
d.feed('filter.test', time, { 'foo' => 'bar', 'message' => msg })
22
end
23
24
assert_equal(1, d.filtered_records.size)
25
end
26
end
Copied!

Testing Output Plugins

You must test output plugins' #process or #write or #try_write method. But you do not have to test this method explicitly. Its testing code pattern is encapsulated in the Output Test Driver.
You can write output plugins test like this:
1
require 'fluent/test'
2
require 'fluent/test/driver/output'
3
require 'fluent/test/helpers'
4
require 'fluent/plugin/output_my'
5
6
class MyInputTest < Test::Unit::TestCase
7
include Fluent::Test::Helpers
8
9
setup do
10
Fluent::Test.setup
11
end
12
13
def create_driver(conf = {})
14
Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf)
15
end
16
17
test 'emit' do
18
d = create_driver(config)
19
time = event_time
20
d.run do
21
d.feed('output.test', time, { 'foo' => 'bar', 'message' => msg })
22
end
23
24
assert_equal(1, d.events.size)
25
end
26
end
Copied!

Testing Parser Plugins

You must test the parser plugins' #parse method.
You can write parser plugins test like this:
1
require 'fluent/test'
2
require 'fluent/test/driver/parser'
3
require 'fluent/test/helpers'
4
require 'fluent/plugin/parser_my'
5
6
class MyParserTest < Test::Unit::TestCase
7
include Fluent::Test::Helpers
8
9
setup do
10
Fluent::Test.setup
11
end
12
13
def create_driver(conf = {})
14
Fluent::Test::Driver::Parser.new(Fluent::Plugin::MyParser).configure(conf)
15
end
16
17
def create_parser(conf)
18
create_driver(conf).instance
19
end
20
21
test 'parse' do
22
parser = create_parser(conf)
23
parser.parse(text) do |time, record|
24
assert_equal(event_time('2017-12-26 11:56:50.1234567879'), time)
25
assert_equal({ 'message' => 'Hello, Fluentd!' }, record)
26
end
27
end
28
end
Copied!

Testing Formatter Plugins

You must test the formatter plugins' #format method.
You can write formatter plugins test like this:
1
require 'fluent/test'
2
require 'fluent/test/driver/formatter'
3
require 'fluent/test/helpers'
4
require 'fluent/plugin/formatter_my'
5
6
class MyFormatterTest < Test::Unit::TestCase
7
include Fluent::Test::Helpers
8
9
setup do
10
Fluent::Test.setup
11
end
12
13
def create_driver(conf = {})
14
Fluent::Test::Driver::Formatter.new(Fluent::Plugin::MyFormatter).configure(conf)
15
end
16
17
def create_formatter(conf)
18
create_driver(conf).instance
19
end
20
21
test 'format' do
22
formatter = create_formatter(conf)
23
formatted = formatter.format(tag, time, record)
24
assert_equal('message:awesome\tgreeting:hello', formatted)
25
end
26
end
Copied!

Tests for Logs

Testing logs is easy.
Code example:
1
# d is a Test Driver instance
2
assert_equal(1, d.logs.size)
3
logs = d.logs
4
5
assert do
6
logs.any? { |log| log.include?(expected_log) }
7
end
8
9
assert do
10
logs.last.match?(/This is last log/)
11
end
Copied!
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.
Last modified 4mo ago