Versions | v1.0 (td-agent3)

Writing Plugin Test Code

This article explains how to write Fluentd plugin test code using test-unit.

You can write test code with any other testing framework such as RSpec, minitest and etc.

Table of Contents

Basics of Plugin Testing

Fluentd provides useful Test Drivers according to plugin type. We can write maintainable test code for plugins using them. We can write helper.rb for output plugin as follows:

$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
require "test-unit"
require "fluent/test"
require "fluent/test/driver/output"
require "fluent/test/helpers"

Test::Unit::TestCase.include(Fluent::Test::Helpers)
Test::Unit::TestCase.extend(Fluent::Test::Helpers)

Note that Fluentd provides useful Test Drivers for input, output, filter, parser, and formatter.

The recommended fluentd plugin project structure is:

.
├── Gemfile
├── LICENSE
├── README.md
├── Rakefile
├── fluent-plugin-<your_fluentd_plugin_name>.gemspec
├── lib
│   └── fluent
│       └── plugin
│           └── <plugin_type>_<your_fluentd_plugin_name>.rb
└── test
    ├── helper.rb
    └── plugin
        └── test_<plugin_type>_<your_fluentd_plugin_name>.rb

Plugin Test Driver Overview

There are useful Test Drivers for plugin testing. We can write test code for plugins as following:

# Load the module that defines common initialization method (Required)
require 'fluent/test'
# Load the module that defines helper methods for testing (Required)
require 'fluent/test/helpers'
# Load the test driver (Required)
require 'fluent/test/driver/output'
# Load the plugin (Required)
require 'fluent/plugin/out_file'

class FileOutputTest < Test::Unit::TestCase
  include Fluent::Test::Helpers

  def setup
    Fluent::Test.setup   # setup test for Fluentd (Required)
    # setup test for plugin (Optional)
    # ...
  end

  def teardown
    # terminate test for plugin (Optional)
  end

  def create_driver(conf = CONFIG)
    Fluent::Test::Driver::Output.new(Fluent::Plugin::FileOutput).configure(conf)
  end

  # configuration related test group
  sub_test_case 'configuration' do
    test 'basic configuration' do
      d = create_driver(basic_configuration)
      assert_equal 'somethig', d.instance.parameter_name
    end
  end

  # Another test group goes here
  sub_test_case 'path' do
    test 'normal' do
      d = create_driver('...')
      d.run(default_tag: 'test') do
        d.feed(event_time, record)
      end
      events = d.events
      assert_equal(1, events.size)
    end
  end
end

Testing Utility Methods

You can get your plugin instance when you call Test Driver instance’s #instance method. If utility methods are private, you can use __send__.

# ...
class FileOutputTest < Test::Unit::TestCase
  # ...
  # Group by utility method
  sub_test_case '#compression_suffix' do
    test 'returns empty string for nil (no compression method specified)' do
      d = create_driver
      assert_equal('', d.instance.compression_suffix(nil))
    end
  end
end

Test Driver Base API

The methods in this section are available for all Test Driver.

initialize(klass, opts: {}, &block)

Initialize Test Driver instance.

  • klass: A class of Fluentd plugin
  • opts: Overwrite system config. This parameter is useful for testing multi workers.
  • block: Customize plugin behavior. We can overwrite plugin code in this block.

Code example:

def create_driver(conf={})
  d = Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput) do
    attr_accessor :exceptions
    # almost same as reopen plugin class
    def prefer_buffered_processing
      false
    end
    def process(tag, es)
      # drop events
    end
  end
  d.configure(conf)
end

configure(conf, syntax: :v1)

Configure plugin instance managed by this Test Driver.

  • conf: Fluent::Config::Element or string
  • syntax: Available syntax are :v1, :v0, :ruby. But :v0 is obsoleted.

Code example:

def create_driver(conf={})
  Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf)
end

def test_process
  conf = %[
    path /path/to/something
    host localhost
    port 24229
  ]
  d = create_driver(conf)
end

end_if(&block)

Register conditions to stop running Test Driver accurately.

All registered conditions must be true before stop Test Driver.

break_if(&block)

Register conditions to stop running Test Driver.

Test Driver should stop running if some of the breaking conditions are true.

broken?

Return true when some of the breaking conditions are true. Otherwise false.

run(timeout: nil, start: true, shutdown: true, &block)

Run Test Driver. This Test Driver will be stop running immediately after evaluating block if block given.

Otherwise, you must register conditions to stop running Test Driver.

This method may be overridden in subclasses.

  • timeout: timeout in seconds.
  • start: if true, start running Test Driver. Otherwise you should start running Test Driver like d.instance_start
  • shutdown: if true, shutdown running Test Driver.

Code example:

# Run Test Driver and feed an event (output)
d = create_driver
d.run do
  d.feed(time, record)
end

# emit multiple events (output)
d = create_driver
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true,  shutdown: false) { d.feed(time, { "k1" => 1 })}
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false) { d.feed(time, { "k1" => 2 })}
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true,  shutdown: true ) { d.feed(time, { "k1" => 3 })}

stop?

Return true when all of the stopping conditions are true. Otherwise false.

logs

Returns logs managed by this Test Driver.

instance

Returns the plugin instance managed by this Test Driver.

Test Driver Base Owner API

filter, output, multi_output

run(expect_emits: nil, expect_records: nil, timeout: nil, start: true, shutdown: true, &block)

Run Test Driver. This Test Driver will be stop running immediately after evaluating block if block given.

Otherwise, you must register conditions to stop running Test Driver.

This method may be overridden in subclasses.

  • expect_emits: Set the number of expected emits.
  • expect_records: Set the number of expected records.
  • timeout: timeout in seconds.
  • start: if true, start running Test Driver. Otherwise you should start running Test Driver like d.instance_start
  • shutdown: if true, shutdown running Test Driver.

Code example:

# Run Test Driver and feed an event (owner plugin)
d = create_driver
d.run do
  d.feed(time, record)
end

# emit multiple events (owner plugin)
d = create_driver
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true,  shutdown: false) { d.feed(time, { "k1" => 1 })}
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: false, shutdown: false) { d.feed(time, { "k1" => 2 })}
d.run(default_tag: 'test', expect_emits: 1, timeout: 10, start: true,  shutdown: true ) { d.feed(time, { "k1" => 3 })}

events(tag: nil)

Returns the events filtered by given tag.

  • tag: filter by this tag. Returns all events when this parameter is omitted.

event_streams(tag: nil)

Returns the event streams filtered by given tag.

  • tag: filter by this tag. Returns all event streams when omit this parameter.

emit_count

Returns the number of invoking router.emit.

If you want to wait stopping Test Driver until events are emitted as many as you expected, you can use d.run(expected_emits: n) instead.

Code example:

d.run do
  d.feed("test", record)
end
assert_equal(1, d.emit_count)

record_count

Returns the number of records.

If you want to wait stopping Test Driver until records are emitted as many as you expected, you can use d.run(expected_records: n) instead.

error_events(tag: nil)

Returns error events filtered by given tag.

  • tag: filter by this tag. Returns all error events when omit this parameter.

Test Driver Base owned API

configure(conf, syntax: :v1)

Configure plugin instance managed by this Test Driver.

  • conf: Fluent::Config::Element or string, hash
  • syntax: Available syntax are :v1, :v0, :ruby. But :v0 is obsoleted.

Test Driver Event Feeder API

filter, output, multi_output

run(default_tag: nil, **kwargs, &block)

Run EventFeeder.

  • default_tag: the default tag of the event

Code example:

d.run(default_tag: "test") do
  d.feed(event_time, { "message" => "Hello, Fluentd!!" })
end

feed(tag, time, record)

Feed an event to plugin instance.

  • tag: the tag of the event
  • time: event timestamp
  • record: event record

Code example:

d.run do
  d.feed("test", event_time, { "message" => "Hello, Fluentd!!" })
end

feed(tag, array_event_stream)

Feed an array of event stream to plugin instance.

  • tag: the tag of the event
  • array_event_stream: array of [time, record]
    • time: event timestamp
    • record: event record

Code example:

d.run do
  d.feed("test", [
    [event_time, { "message" => "Hello, user1!!" }],
    [event_time, { "message" => "Hello, user2!!" }]
  ])
end

feed(tag, es)

Feed an event stream to plugin instance.

  • tag: the tag of the event
  • es: event stream object

Code example:

es = Fluent::OneEventStream.new(event_time, { "message" => "Hello, Fluentd!!" })
d.run do
  d.feed("test", es)
end

feed(record)

Feed an event with default tag to plugin instance.

  • record: event record

Code example:

d.run(default_tag: "test") do
  d.feed({ "message" => "Hello, Fluentd!!" })
  # Above is same as below
  d.feed(event_time, { "message" => "Hello, Fluentd!!" })
end

feed(time, record)

Feed an event with default tag to plugin instance.

  • time: event timestamp
  • record: event record

Code example:

d.run(default_tag: "test") do
  d.feed(event_time, { "message" => "Hello, Fluentd!!" })
end

feed(array_event_stream)

Feed an array of event stream with default tag to plugin instance.

  • array_event_stream: array of [time, record]
    • time: event timestamp
    • record: event record

Code example:

d.run(default_tag: "test") do
  d.feed([
    [event_time, { "message" => "Hello, user1!!" }],
    [event_time, { "message" => "Hello, user2!!" }]
  ])
end

feed(es)

Feed an event stream with default tag to plugin instance.

  • es: event stream object

Code example:

es = Fluent::OneEventStream.new(event_time, { "message" => "Hello, Fluentd!!" })
d.run(default_tag: "test") do
  d.feed(es)
end

Test Driver Filter API

filtered_records

Collect filtered records.

::::ruby
d = create_driver(config)
d.run do
  d.feed("filter.test", event_time, {'foo' => 'bar', 'message' => msg})
end
d.filtered_records

Test Driver Output API

run(flush: true, wait_flush_completion: true, force_flush_retry: false, **kwargs, &block)

  • flush: flush forcibly.
  • wait_flush_completion: if true, waiting for flush completion.
  • force_flush_retry: if true, retrying flush forcibly.

Run Test Driver. This Test Driver will be stop running immediately after evaluating block if block given.

Otherwise, you must register conditions to stop running Test Driver.

Code example:

::::ruby
d = create_driver(config)
d.run do
  d.feed("filter.test", event_time, {'foo' => 'bar', 'message' => msg})
end

formatted

Returns formatted records.

Code example:

::::ruby
d = create_driver(config)
d.run do
  d.feed("filter.test", event_time, {'foo' => 'bar', 'message' => msg})
end
d.formatted

flush

Flush forcibly.

Code example:

::::ruby
d = create_driver(config)
d.run do
  d.feed("filter.test", event_time, {'foo' => 'bar', 'message' => msg})
end
d.flush

Test helpers

assert_equal_event_time(expected, actual, message = nil)

Assert EventTime instance.

  • expected: expected EventTime instance
  • actual: actual EventTime instance
  • message: message that is displayed when assertion failure

Code example:

parser = create_parser
parser.parse(text) do |time, record|
  assert_equal_event_time(event_time("2017-12-27 09:43:50.123456789"), time)
end

config_element(name = ‘test’, argument = ‘’, params = {}, elements = [])

Create Fluent::Config::Element instance.

  • name: element name such as “match”, “filter”, “source”, “buffer”, “inject”, “format”, “parse” and etc.
  • argument: argument for section defined by config_argument
  • params: parameters for section defined by config_element
  • elements: child elements of this element

Code example:

conf = config_element('match', '**', {
  'path' => "#{TMP_DIR}/prohibited/${tag}/file.%Y%m%d.log",
}, [
     config_element('buffer', 'time,tag', {
      'time_key' => 86400,
      'timekey_zone' => '+0000'
    })
  ]
)
d = create_driver(conf)

event_time(str = nil, format: nil)

Create Fluent::EventTime instance.

  • str: time represented as string
  • format: parse str as time according to this format. See also Time.strptime

Code example:

time = event_time
time = event_time("2016-10-03 23:58:09 UTC")
time = event_time('2016-04-11 16:40:01 +0000')
time = event_time("2016-04-17 11:15:00 -0700")
time = event_time("2011-01-02 13:14:15")
time = event_time('Sep 11 00:00:00', format: '%b %d %H:%M:%S')
time = event_time('28/Feb/2013:12:00:00 +0900', format: '%d/%b/%Y:%H:%M:%S %z')
time = event_time("2017-02-06T13:14:15.003Z", format: '%Y-%m-%dT%H:%M:%S.%L%z')

with_timezone(tz)

Process given block with tz. This method overwrites ENV['TZ'] while processing block.

  • tz: Timezone. This is set to ENV['TZ'].

Code example:

time = with_timezone("UTC+02") do
  parser = Fluent::TimeParser.new("%Y-%m-%d %H:%M:%S.%N", true)
  parser.parse("2016-09-02 18:42:31.123456789")
end
assert_equal_event_time(time, event_time("2016-09-02 18:42:31.123456789 -02:00", format: '%Y-%m-%d %H:%M:%S.%N %z'))

with_worker_config(root_dir: nil, workers: nil, worker_id: nil, &block)

Process block with given parameters. This method overwrites system config while processing block.

This is useful for testing Fluentd internal behavior related to multi workers.

  • root_dir: Root directory.
  • workers: Number of workers.
  • worker_id: Id of workers.

Code example:

class Dummy < Fluent::Plugin::Output
end
d = Dummy.new
with_worker_config(workers: 2, worker_id: 1) do
  d.configure(conf)
end
...

time2str(time, localtime: false, format: nil)

Convert time to string.

This is useful for testing formatter.

  • time: Fluent::EventTime instance. See also Time.at
  • localtime: If true, process time as localtime. Otherwise UTC.
  • format: See also Time#strftime.

    :::ruby formatter = configure_formatter(conf) formatted = formatter.format(tag, time, record) assert_equal(“#{time2str(time)}\t#{JSON.dump(record)}\n”, formatted)

msgpack(type)

Shorthand for Fluent::MessagePackFactory.factory, Fluent::MessagePackFactory.packer and Fluent::MessagePackFactory.unpacker.

  • type: Available types are :factory, :packer, :unpacker.

Code example:

events = []
factory = msgpack(:factory)
factory.unpacker.feed_each(binary) do |obj|
  events << obj
end

capture_stdout

Capture stdout while processing given block.

This is useful for testing Fluentd utility commands.

Code example:

captured_string = capture_stdout do
  # Print something to STDOUT
  puts "Hello!"
end
assert_equal("Hello!\n", capture_stdout)

Testing Input Plugins

You must test input plugins' router#emit method. But you don’t have to test this method explicitly. This testing code pattern is encapsulated with Input Test Driver.

You can write input plugins test like below.

Code example:

require 'fluent/test'
require 'fluent/test/driver/input'
require 'fluent/test/helpers'
require 'fluent/plugin/input_my'

class MyInputTest < Test::Unit::TestCase
  include Fluent::Test::Helpers

  setup do
    Fluent::Test.setup
  end

  def create_driver(conf={})
    Fluent::Test::Driver::Input.new(Fluent::Plugin::MyInput).configure(conf)
  end

  test 'emit' do
    d = create_driver(config)
    d.run(timeout: 0.5)

    d.events.each do |tag, time, record|
      assert_equal("input.test", tag)
      assert_equal({"foo"=>"bar"}, record)
      assert(time.is_a?(Fluent::EventTime))
    end
  end
end

Testing Filter Plugins

You must test filter plugins' #filter method. But you don’t have to test this method explicitly. This testing code pattern is encapsulated with Filter Test Driver.

You can write filter plugins test like below.

Code example:

require 'fluent/test'
require 'fluent/test/driver/filter'
require 'fluent/test/helpers'
require 'fluent/plugin/filter_my'

class MyInputTest < Test::Unit::TestCase
  include Fluent::Test::Helpers

  setup do
    Fluent::Test.setup
  end

  def create_driver(conf={})
    Fluent::Test::Driver::Filter.new(Fluent::Plugin::MyFilter).configure(conf)
  end

  test 'filter' do
    d = create_driver(config)
    time = event_time
    d.run do
      d.feed("filter.test", time, {'foo' => 'bar', 'message' => msg})
    end

    assert_equal(1, d.filtered_records.size)
  end
end

Testing Output Plugins

You must test output plugins' #process or #write or #try_write method. But you don’t have to test this method explicitly. This testing code pattern is encapsulated with Output Test Driver.

You can write output plugins test like below.

Code example:

require 'fluent/test'
require 'fluent/test/driver/output'
require 'fluent/test/helpers'
require 'fluent/plugin/output_my'

class MyInputTest < Test::Unit::TestCase
  include Fluent::Test::Helpers

  setup do
    Fluent::Test.setup
  end

  def create_driver(conf={})
    Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf)
  end

  test 'emit' do
    d = create_driver(config)
    time = event_time
    d.run do
      d.feed("output.test", time, {'foo' => 'bar', 'message' => msg})
    end

    assert_equal(1, d.events.size)
  end
end

Testing Parser Plugins

You must test parser plugins' #parse method.

You can write parser plugins test like below.

Code example:

require 'fluent/test'
require 'fluent/test/driver/parser'
require 'fluent/test/helpers'
require 'fluent/plugin/parser_my'

class MyParserTest < Test::Unit::TestCase
  include Fluent::Test::Helpers

  setup do
    Fluent::Test.setup
  end

  def create_driver(conf={})
    Fluent::Test::Driver::Parser.new(Fluent::Plugin::MyParser).configure(conf)
  end

  def create_parser(conf)
    create_driver(conf).instance
  end

  test "parse" do
    parser = create_parser(conf)
    parser.parse(text) do |time, record|
      assert_equal(event_time("2017-12-26 11:56:50.1234567879"), time)
      assert_equal({ "message" => "Hello, Fluentd!!" }, record)
    end
  end
end

Testing Formatter Plugins

You must test formatter plugins' #format method.

You can write formatter plugins test like below.

Code example:

require 'fluent/test'
require 'fluent/test/driver/formatter'
require 'fluent/test/helpers'
require 'fluent/plugin/formatter_my'

class MyFormatterTest < Test::Unit::TestCase
  include Fluent::Test::Helpers

  setup do
    Fluent::Test.setup
  end

  def create_driver(conf={})
    Fluent::Test::Driver::Formatter.new(Fluent::Plugin::MyFormatter).configure(conf)
  end

  def create_formatter(conf)
    create_driver(conf).instance
  end

  test "format" do
    formatter = create_formatter(conf)
    formatted = formatter.format(tag, time, record)
    assert_equal("message:awesome\tgreeting:hello", formatted)
  end
end

Tests for logs

Testing log is very easy.

Code example:

# d is a Test Driver instance
assert_equal(1, d.logs.size)
logs = d.logs
assert do
  logs.any? {|log| log.include?(expected_log) }
end
assert do
  logs.last.match?(/This is last log/)
end
Last updated: 2018-12-14 01:46:26 +0000

Versions | v1.0 (td-agent3)

If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is a open source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.