This article explains how to write Fluentd plugin test code using test-unit
.
You can write test code with any other testing framework such as RSpec
, minitest
, etc.
Fluentd provides useful Test Drivers according to plugin type. We can write maintainable test code for plugins using them. We can write helper.rb for output plugin as follows:
$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))require "test-unit"require "fluent/test"require "fluent/test/driver/output"require "fluent/test/helpers"Test::Unit::TestCase.include(Fluent::Test::Helpers)Test::Unit::TestCase.extend(Fluent::Test::Helpers)
Note that Fluentd provides useful Test Drivers for input
, output
, filter
, parser
and formatter
.
The recommended fluentd plugin project structure is:
.├── Gemfile├── LICENSE├── README.md├── Rakefile├── fluent-plugin-<your_fluentd_plugin_name>.gemspec├── lib│ └── fluent│ └── plugin│ └── <plugin_type>_<your_fluentd_plugin_name>.rb└── test├── helper.rb└── plugin└── test_<plugin_type>_<your_fluentd_plugin_name>.rb
There are some useful Test Drivers for plugin testing. We can write test code for plugins as following:
# Load the module that defines common initialization method (Required)require 'fluent/test'# Load the module that defines helper methods for testing (Required)require 'fluent/test/helpers'# Load the test driver (Required)require 'fluent/test/driver/output'# Load the plugin (Required)require 'fluent/plugin/out_file'class FileOutputTest < Test::Unit::TestCaseinclude Fluent::Test::Helpersdef setupFluent::Test.setup # Setup test for Fluentd (Required)# Setup test for plugin (Optional)# ...enddef teardown# Terminate test for plugin (Optional)enddef create_driver(conf = CONFIG)Fluent::Test::Driver::Output.new(Fluent::Plugin::FileOutput).configure(conf)end# Configuration related test groupsub_test_case 'configuration' dotest 'basic configuration' dod = create_driver(basic_configuration)assert_equal 'something', d.instance.parameter_nameendend# Another test group goes heresub_test_case 'path' dotest 'normal' dod = create_driver('...')d.run(default_tag: 'test') dod.feed(event_time, record)endevents = d.eventsassert_equal(1, events.size)endendend
You can get a plugin instance by calling Test Driver instance's #instance
method. If utility methods are private
, use __send__
.
# ...class FileOutputTest < Test::Unit::TestCase# ...# Group by utility methodsub_test_case '#compression_suffix' dotest 'returns empty string for nil (no compression method specified)' dod = create_driverassert_equal('', d.instance.compression_suffix(nil))endendend
The methods in this section are available for all Test Driver.
Initializes Test Driver instance.
klass
: A class of Fluentd plugin
opts
: Overwrite system config. This parameter is useful for testing multi
workers.
block
: Customize plugin behavior. We can overwrite plugin code in this
block.
Example:
def create_driver(conf={})d = Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput) doattr_accessor :exceptions# Almost same as to reopen plugin classdef prefer_buffered_processingfalseenddef process(tag, es)# Drop eventsendendd.configure(conf)end
Configures plugin instance managed by the Test Driver.
conf
: Fluent::Config::Element
or String
syntax
: { :v1
, :v0
, :ruby
} :v0
is obsolete.
Example:
def create_driver(conf={})Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf)enddef test_processconf = %[path /path/to/somethinghost localhostport 24229]d = create_driver(conf)end
Registers the conditions to stop the running Test Driver gracefully.
All registered conditions must be true
before Test Driver stops.
Registers the conditions to stop the running Test Driver.
Test Driver should stop running if some of the breaking conditions are true
.
Returns true
when some of the breaking conditions are true
. Otherwise false
.
Runs the Test Driver. This Test Driver will stop running immediately after evaluating the block
if given.
Otherwise, you must register the conditions to stop the running Test Driver.
This method may be overridden in subclasses.
timeout
: timeout (seconds)
start
: if true
, start the Test Driver. Otherwise, invoke instance_start
method to start it.
shutdown
: if true
, shut down the running Test Driver.
Example:
# Run Test Driver and feed an event (output)d = create_driverd.run dod.feed(time, record)end# Emit multiple events (output)d = create_driverd.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: false) { d.feed(time, { "k1" => 1 })}d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false) { d.feed(time, { "k1" => 2 })}d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: true ) { d.feed(time, { "k1" => 3 })}
Returns true
when all the stopping conditions are true
. Otherwise false
.
Returns logs managed by this Test Driver.
Returns the plugin instance managed by this Test Driver.
filter
output
multi_output
Run Test Driver. This Test Driver will stop running immediately after evaluating block
if given.
Otherwise, you must register conditions to stop running Test Driver.
This method may be overridden in subclasses.
expect_emits
: set the number of expected emits
expect_records
: set the number of expected records
timeout
: timeout (seconds)
start
: if true
, start the Test Driver. Otherwise, invoke the
instance_start
method to start it.
shutdown
: if true
, shut down the running Test Driver.
Example:
# Run Test Driver and feed an event (owner plugin)d = create_driverd.run dod.feed(time, record)end# Emit multiple events (owner plugin)d = create_driverd.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: false) { d.feed(time, { "k1" => 1 })}d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false) { d.feed(time, { "k1" => 2 })}d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true, shutdown: true ) { d.feed(time, { "k1" => 3 })}
Returns the events filtered by the given tag.
tag
: filter by this tag. If omitted, it returns all the events.
Returns the event streams filtered by the given tag.
tag
: filter by this tag. If omitted, it returns all the event streams.
Returns the number of invoking router.emit
.
If you want to delay the stopping of the Test Driver until a certain number of records are emitted, you can use d.run(expected_records: n)
instead.
Example:
d.run dod.feed('test', record)endassert_equal(1, d.emit_count)
Returns the number of records.
If you want to delay the stopping of the Test Driver until a certain number of records are emitted, you can use d.run(expected_records: n)
instead.
Returns error events filtered by the given tag.
tag
: filter by this tag. If omitted, it returns all error events.
Configures plugin instance managed by this Test Driver.
conf
: Fluent::Config::Element
, String
, or Hash
syntax
: Supported: { :v1
, :v0
, :ruby
} :v0
is obsolete.
filter
output
multi_output
Runs EventFeeder
.
default_tag
: the default tag of the event
Example:
d.run(default_tag: 'test') dod.feed(event_time, { 'message' => 'Hello, Fluentd!' })end
Feeds an event to plugin instance.
tag
: the tag of the event
time
: event timestamp
record
: event record
Example:
d.run dod.feed('test', event_time, { 'message' => 'Hello, Fluentd!' })end
Feeds an array of event stream to plugin instance.
tag
: the tag of the event
array_event_stream
: array of [time, record]
time
: event timestamp
record
: event record
Example:
d.run dod.feed('test', [[event_time, { 'message' => 'Hello, user1!' }],[event_time, { 'message' => 'Hello, user2!' }]])end
Feeds an event stream to plugin instance.
tag
: the tag of the event
es
: event stream object
Example:
es = Fluent::OneEventStream.new(event_time, { 'message' => 'Hello, Fluentd!' })d.run dod.feed('test', es)end
Feeds an event with default tag to plugin instance.
record
: event record
Example:
d.run(default_tag: 'test') dod.feed({ 'message' => 'Hello, Fluentd!' })# Same as above ^d.feed(event_time, { 'message' => 'Hello, Fluentd!' })end
Feeds an event with default tag to plugin instance.
time
: event timestamp
record
: event record
Example:
d.run(default_tag: 'test') dod.feed(event_time, { 'message' => 'Hello, Fluentd!' })end
Feeds an array of event stream with default tag to plugin instance.
array_event_stream
: array of [time, record]
time
: event timestamp
record
: event record
Example:
d.run(default_tag: 'test') dod.feed([[event_time, { 'message' => 'Hello, user1!' }],[event_time, { 'message' => 'Hello, user2!' }]])end
Feeds an event stream with default tag to plugin instance.
es
: event stream object
Example:
es = Fluent::OneEventStream.new(event_time, { 'message' => 'Hello, Fluentd!' })d.run(default_tag: 'test') dod.feed(es)end
Collects the filtered records.
d = create_driver(config)d.run dod.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })endd.filtered_records
flush
: flush forcibly
wait_flush_completion
: if true
, waiting for flush
to complete
force_flush_retry
: if true
, retrying flush forcibly
Run Test Driver. This Test Driver will be stop running immediately after evaluating the block
if given.
Otherwise, you must register conditions to stop running Test Driver.
Example:
d = create_driver(config)d.run dod.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })end
Returns the formatted records.
Example:
d = create_driver(config)d.run dod.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })endd.formatted
Flushes forcibly.
Example:
d = create_driver(config)d.run dod.feed('filter.test', event_time, { 'foo' => 'bar', 'message' => msg })endd.flush
Asserts the EventTime
instance.
expected
: expected EventTime
instance
actual
: actual EventTime
instance
message
: message to display when assertion fails
Example:
parser = create_parserparser.parse(text) do |time, record|assert_equal_event_time(event_time('2017-12-27 09:43:50.123456789'), time)end
Create Fluent::Config::Element
instance.
name
: element name such as match
, filter
, source
, buffer
, inject
,
format
, parse
, etc.
argument
: argument for section defined by config_argument
params
: parameters for section defined by config_element
elements
: child elements of this config element
Example:
conf = config_element('match', '**', {'path' => "#{TMP_DIR}/prohibited/${tag}/file.%Y%m%d.log",}, [config_element('buffer', 'time,tag', {'time_key' => 86400,'timekey_zone' => '+0000'})])d = create_driver(conf)
Creates a Fluent::EventTime
instance.
str
: time represented as String
format
: parse str
as time
according to this format. See also
Example:
time = event_timetime = event_time('2016-10-03 23:58:09 UTC')time = event_time('2016-04-11 16:40:01 +0000')time = event_time('2016-04-17 11:15:00 -0700')time = event_time('2011-01-02 13:14:15')time = event_time('Sep 11 00:00:00', format: '%b %d %H:%M:%S')time = event_time('28/Feb/2013:12:00:00 +0900', format: '%d/%b/%Y:%H:%M:%S %z')time = event_time('2017-02-06T13:14:15.003Z', format: '%Y-%m-%dT%H:%M:%S.%L%z')
Processes the given block with tz
. This method overrides ENV['TZ']
while processing its block
.
tz
: timezone. This is set to ENV['TZ']
.
Example:
time = with_timezone('UTC+02') doparser = Fluent::TimeParser.new('%Y-%m-%d %H:%M:%S.%N', true)parser.parse('2016-09-02 18:42:31.123456789')endassert_equal_event_time(time, event_time('2016-09-02 18:42:31.123456789 -02:00', format: '%Y-%m-%d %H:%M:%S.%N %z'))
Processes block
with the given parameters. This method overrides the system configuration while processing its block
.
This is useful for testing Fluentd's internal behavior related to multi workers.
root_dir
: root directory
workers
: the number of workers
worker_id
: ID of workers
Example:
class Dummy < Fluent::Plugin::Outputendd = Dummy.newwith_worker_config(workers: 2, worker_id: 1) dod.configure(conf)end# ...
Converts time
to String
.
This is useful for testing the formatter.
time
: Fluent::EventTime
instance. See also
Time.at
.
localtime
: If true
, processes time
as localtime
. Otherwise UTC.
format
: See also
formatter = configure_formatter(conf)formatted = formatter.format(tag, time, record)assert_equal("#{time2str(time)}\t#{JSON.dump(record)}\n", formatted)
type
: Available types: { :factory
, :packer
, :unpacker
}
Shorthand for:
Fluent::MessagePackFactory.factory
Fluent::MessagePackFactory.packer
Fluent::MessagePackFactory.unpacker
Example:
events = []factory = msgpack(:factory)factory.unpacker.feed_each(binary) do |obj|events << objend
Captures the standard output while processing the given block.
This is useful for testing Fluentd utility commands.
Example:
captured_string = capture_stdout do# Print something to STDOUTputs 'Hello!'endassert_equal('Hello!\n', capture_stdout)
You must test the input plugins' router#emit
method. But you do not have to test this method explicitly. Its testing code pattern is encapsulated in the Input Test Driver.
You can write input plugins test like this:
require 'fluent/test'require 'fluent/test/driver/input'require 'fluent/test/helpers'require 'fluent/plugin/input_my'class MyInputTest < Test::Unit::TestCaseinclude Fluent::Test::Helperssetup doFluent::Test.setupenddef create_driver(conf = {})Fluent::Test::Driver::Input.new(Fluent::Plugin::MyInput).configure(conf)endtest 'emit' dod = create_driver(config)d.run(timeout: 0.5)d.events.each do |tag, time, record|assert_equal('input.test', tag)assert_equal({ 'foo' => 'bar' }, record)assert(time.is_a?(Fluent::EventTime))endendend
You must test filter plugins' #filter
method. But you do not have to test this method explicitly. Its testing code pattern is encapsulated in Filter Test Driver.
You can write filter plugins test like this:
require 'fluent/test'require 'fluent/test/driver/filter'require 'fluent/test/helpers'require 'fluent/plugin/filter_my'class MyInputTest < Test::Unit::TestCaseinclude Fluent::Test::Helperssetup doFluent::Test.setupenddef create_driver(conf = {})Fluent::Test::Driver::Filter.new(Fluent::Plugin::MyFilter).configure(conf)endtest 'filter' dod = create_driver(config)time = event_timed.run dod.feed('filter.test', time, { 'foo' => 'bar', 'message' => msg })endassert_equal(1, d.filtered_records.size)endend
You must test output plugins' #process
or #write
or #try_write
method. But you do not have to test this method explicitly. Its testing code pattern is encapsulated in the Output Test Driver.
You can write output plugins test like this:
require 'fluent/test'require 'fluent/test/driver/output'require 'fluent/test/helpers'require 'fluent/plugin/output_my'class MyInputTest < Test::Unit::TestCaseinclude Fluent::Test::Helperssetup doFluent::Test.setupenddef create_driver(conf = {})Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf)endtest 'emit' dod = create_driver(config)time = event_timed.run dod.feed('output.test', time, { 'foo' => 'bar', 'message' => msg })endassert_equal(1, d.events.size)endend
You must test the parser plugins' #parse
method.
You can write parser plugins test like this:
require 'fluent/test'require 'fluent/test/driver/parser'require 'fluent/test/helpers'require 'fluent/plugin/parser_my'class MyParserTest < Test::Unit::TestCaseinclude Fluent::Test::Helperssetup doFluent::Test.setupenddef create_driver(conf = {})Fluent::Test::Driver::Parser.new(Fluent::Plugin::MyParser).configure(conf)enddef create_parser(conf)create_driver(conf).instanceendtest 'parse' doparser = create_parser(conf)parser.parse(text) do |time, record|assert_equal(event_time('2017-12-26 11:56:50.1234567879'), time)assert_equal({ 'message' => 'Hello, Fluentd!' }, record)endendend
You must test the formatter plugins' #format
method.
You can write formatter plugins test like this:
require 'fluent/test'require 'fluent/test/driver/formatter'require 'fluent/test/helpers'require 'fluent/plugin/formatter_my'class MyFormatterTest < Test::Unit::TestCaseinclude Fluent::Test::Helperssetup doFluent::Test.setupenddef create_driver(conf = {})Fluent::Test::Driver::Formatter.new(Fluent::Plugin::MyFormatter).configure(conf)enddef create_formatter(conf)create_driver(conf).instanceendtest 'format' doformatter = create_formatter(conf)formatted = formatter.format(tag, time, record)assert_equal('message:awesome\tgreeting:hello', formatted)endend
Testing logs is easy.
Code example:
# d is a Test Driver instanceassert_equal(1, d.logs.size)logs = d.logsassert dologs.any? { |log| log.include?(expected_log) }endassert dologs.last.match?(/This is last log/)end
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.