This article explains how to write Fluentd plugin test code using test-unit.
You can write test code with any other testing framework such as RSpec, minitest, etc.
Basics of Plugin Testing
Fluentd provides useful Test Drivers according to plugin type. We can write maintainable test code for plugins using them. We can write helper.rb for output plugin as follows:
type: Available types: { :factory, :packer, :unpacker }
Shorthand for:
Fluent::MessagePackFactory.factory
Fluent::MessagePackFactory.packer
Fluent::MessagePackFactory.unpacker
Example:
capture_stdout(&block)
Captures the standard output while processing the given block.
This is useful for testing Fluentd utility commands.
Example:
Testing Input Plugins
You must test the input plugins' router#emit method. But you do not have to test this method explicitly. Its testing code pattern is encapsulated in the Input Test Driver.
You can write input plugins test like this:
Testing Filter Plugins
You must test filter plugins' #filter method. But you do not have to test this method explicitly. Its testing code pattern is encapsulated in Filter Test Driver.
You can write filter plugins test like this:
Testing Output Plugins
You must test output plugins' #process or #write or #try_write method. But you do not have to test this method explicitly. Its testing code pattern is encapsulated in the Output Test Driver.
You can write output plugins test like this:
Testing Parser Plugins
You must test the parser plugins' #parse method.
You can write parser plugins test like this:
Testing Formatter Plugins
You must test the formatter plugins' #format method.
# Load the module that defines common initialization method (Required)
require 'fluent/test'
# Load the module that defines helper methods for testing (Required)
require 'fluent/test/helpers'
# Load the test driver (Required)
require 'fluent/test/driver/output'
# Load the plugin (Required)
require 'fluent/plugin/out_file'
class FileOutputTest < Test::Unit::TestCase
include Fluent::Test::Helpers
def setup
Fluent::Test.setup # Setup test for Fluentd (Required)
# Setup test for plugin (Optional)
# ...
end
def teardown
# Terminate test for plugin (Optional)
end
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::FileOutput).configure(conf)
end
# Configuration related test group
sub_test_case 'configuration' do
test 'basic configuration' do
d = create_driver(basic_configuration)
assert_equal 'something', d.instance.parameter_name
end
end
# Another test group goes here
sub_test_case 'path' do
test 'normal' do
d = create_driver('...')
d.run(default_tag: 'test') do
d.feed(event_time, record)
end
events = d.events
assert_equal(1, events.size)
end
end
end
# ...
class FileOutputTest < Test::Unit::TestCase
# ...
# Group by utility method
sub_test_case '#compression_suffix' do
test 'returns empty string for nil (no compression method specified)' do
d = create_driver
assert_equal('', d.instance.compression_suffix(nil))
end
end
end
def create_driver(conf={})
d = Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput) do
attr_accessor :exceptions
# Almost same as to reopen plugin class
def prefer_buffered_processing
false
end
def process(tag, es)
# Drop events
end
end
d.configure(conf)
end
def create_driver(conf={})
Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf)
end
def test_process
conf = %[
path /path/to/something
host localhost
port 24229
]
d = create_driver(conf)
end
# Run Test Driver and feed an event (output)
d = create_driver
d.run do
d.feed(time, record)
end
# Emit multiple events (output)
d = create_driver
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: false) { d.feed(time, { "k1" => 1 })}
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false) { d.feed(time, { "k1" => 2 })}
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: true ) { d.feed(time, { "k1" => 3 })}
# Run Test Driver and feed an event (owner plugin)
d = create_driver
d.run do
d.feed(time, record)
end
# Emit multiple events (owner plugin)
d = create_driver
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: false) { d.feed(time, { "k1" => 1 })}
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false) { d.feed(time, { "k1" => 2 })}
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: true ) { d.feed(time, { "k1" => 3 })}
d.run do
d.feed('test', record)
end
assert_equal(1, d.emit_count)
d.run(default_tag: 'test') do
d.feed(event_time, { 'message' => 'Hello, Fluentd!' })
end
d.run do
d.feed('test', event_time, { 'message' => 'Hello, Fluentd!' })
end
d.run do
d.feed('test', [
[event_time, { 'message' => 'Hello, user1!' }],
[event_time, { 'message' => 'Hello, user2!' }]
])
end
es = Fluent::OneEventStream.new(event_time, { 'message' => 'Hello, Fluentd!' })
d.run do
d.feed('test', es)
end
d.run(default_tag: 'test') do
d.feed({ 'message' => 'Hello, Fluentd!' })
# Same as above ^
d.feed(event_time, { 'message' => 'Hello, Fluentd!' })
end
d.run(default_tag: 'test') do
d.feed(event_time, { 'message' => 'Hello, Fluentd!' })
end
d.run(default_tag: 'test') do
d.feed([
[event_time, { 'message' => 'Hello, user1!' }],
[event_time, { 'message' => 'Hello, user2!' }]
])
end
es = Fluent::OneEventStream.new(event_time, { 'message' => 'Hello, Fluentd!' })
d.run(default_tag: 'test') do
d.feed(es)
end
d = create_driver(config)
d.run do
d.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })
end
d.filtered_records
d = create_driver(config)
d.run do
d.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })
end
d = create_driver(config)
d.run do
d.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })
end
d.formatted
d = create_driver(config)
d.run do
d.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })
end
d.flush
parser = create_parser
parser.parse(text) do |time, record|
assert_equal_event_time(event_time('2017-12-27 09:43:50.123456789'), time)
end
time = event_time
time = event_time('2016-10-03 23:58:09 UTC')
time = event_time('2016-04-11 16:40:01 +0000')
time = event_time('2016-04-17 11:15:00 -0700')
time = event_time('2011-01-02 13:14:15')
time = event_time('Sep 11 00:00:00', format: '%b %d %H:%M:%S')
time = event_time('28/Feb/2013:12:00:00 +0900', format: '%d/%b/%Y:%H:%M:%S %z')
time = event_time('2017-02-06T13:14:15.003Z', format: '%Y-%m-%dT%H:%M:%S.%L%z')
time = with_timezone('UTC+02') do
parser = Fluent::TimeParser.new('%Y-%m-%d %H:%M:%S.%N', true)
parser.parse('2016-09-02 18:42:31.123456789')
end
assert_equal_event_time(time, event_time('2016-09-02 18:42:31.123456789 -02:00', format: '%Y-%m-%d %H:%M:%S.%N %z'))
class Dummy < Fluent::Plugin::Output
end
d = Dummy.new
with_worker_config(workers: 2, worker_id: 1) do
d.configure(conf)
end
# ...
formatter = configure_formatter(conf)
formatted = formatter.format(tag, time, record)
assert_equal("#{time2str(time)}\t#{JSON.dump(record)}\n", formatted)
events = []
factory = msgpack(:factory)
factory.unpacker.feed_each(binary) do |obj|
events << obj
end
captured_string = capture_stdout do
# Print something to STDOUT
puts 'Hello!'
end
assert_equal('Hello!\n', capture_stdout)
require 'fluent/test'
require 'fluent/test/driver/input'
require 'fluent/test/helpers'
require 'fluent/plugin/input_my'
class MyInputTest < Test::Unit::TestCase
include Fluent::Test::Helpers
setup do
Fluent::Test.setup
end
def create_driver(conf = {})
Fluent::Test::Driver::Input.new(Fluent::Plugin::MyInput).configure(conf)
end
test 'emit' do
d = create_driver(config)
d.run(timeout: 0.5)
d.events.each do |tag, time, record|
assert_equal('input.test', tag)
assert_equal({ 'foo' => 'bar' }, record)
assert(time.is_a?(Fluent::EventTime))
end
end
end
require 'fluent/test'
require 'fluent/test/driver/filter'
require 'fluent/test/helpers'
require 'fluent/plugin/filter_my'
class MyInputTest < Test::Unit::TestCase
include Fluent::Test::Helpers
setup do
Fluent::Test.setup
end
def create_driver(conf = {})
Fluent::Test::Driver::Filter.new(Fluent::Plugin::MyFilter).configure(conf)
end
test 'filter' do
d = create_driver(config)
time = event_time
d.run do
d.feed('filter.test', time, { 'foo' => 'bar', 'message' => msg })
end
assert_equal(1, d.filtered_records.size)
end
end
require 'fluent/test'
require 'fluent/test/driver/output'
require 'fluent/test/helpers'
require 'fluent/plugin/output_my'
class MyInputTest < Test::Unit::TestCase
include Fluent::Test::Helpers
setup do
Fluent::Test.setup
end
def create_driver(conf = {})
Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf)
end
test 'emit' do
d = create_driver(config)
time = event_time
d.run do
d.feed('output.test', time, { 'foo' => 'bar', 'message' => msg })
end
assert_equal(1, d.events.size)
end
end
require 'fluent/test'
require 'fluent/test/driver/parser'
require 'fluent/test/helpers'
require 'fluent/plugin/parser_my'
class MyParserTest < Test::Unit::TestCase
include Fluent::Test::Helpers
setup do
Fluent::Test.setup
end
def create_driver(conf = {})
Fluent::Test::Driver::Parser.new(Fluent::Plugin::MyParser).configure(conf)
end
def create_parser(conf)
create_driver(conf).instance
end
test 'parse' do
parser = create_parser(conf)
parser.parse(text) do |time, record|
assert_equal(event_time('2017-12-26 11:56:50.1234567879'), time)
assert_equal({ 'message' => 'Hello, Fluentd!' }, record)
end
end
end
require 'fluent/test'
require 'fluent/test/driver/formatter'
require 'fluent/test/helpers'
require 'fluent/plugin/formatter_my'
class MyFormatterTest < Test::Unit::TestCase
include Fluent::Test::Helpers
setup do
Fluent::Test.setup
end
def create_driver(conf = {})
Fluent::Test::Driver::Formatter.new(Fluent::Plugin::MyFormatter).configure(conf)
end
def create_formatter(conf)
create_driver(conf).instance
end
test 'format' do
formatter = create_formatter(conf)
formatted = formatter.format(tag, time, record)
assert_equal('message:awesome\tgreeting:hello', formatted)
end
end
# d is a Test Driver instance
assert_equal(1, d.logs.size)
logs = d.logs
assert do
logs.any? { |log| log.include?(expected_log) }
end
assert do
logs.last.match?(/This is last log/)
end