Fluentd
1.0
Search
⌃K

Upgrade Plugin from v0.12

This guide is for plugin authors to show how to update input/output/filter plugins written for Fluentd v0.12 or earlier.
There are some things to be considered (see "Updating Plugins Overview" section for details):
  • Plugins using v0.12 API will be supported in Fluentd v1. This compatibility
    guarantee will no longer be applicable with v2.
  • Users may use the new features of Fluentd v1 only with the plugins using new
    API.
  • Plugins using the new API will not work with Fluentd v0.12.x.
It is strongly recommended to use v1 API to write your plugins stable, consistent and easy to test.

Updating Plugins Overview

Following are the steps to update your plugins safely:
  1. 1.
    Release a Latest Version for Fluentd v0.12.x
  2. 2.
    Update Dependency
  3. 3.
    Update Code and Tests
  4. 4.
    Update CI Environments
  5. 5.
    Release the Newer Version for Fluentd v1 and Later

1. Release a Latest Version

At first, you should make a git branch named as v0.12 (if you are using git for that plugin), and release the latest patch version from that branch without any changes, except fixing dependency of Fluentd ~> 0.12.0. This makes it possible to fix bugs and release newer versions for Fluentd v0.12 users without breaking anything.
  • make a branch for Fluentd v0.12 versions
  • fix dependency of Fluentd to ~> 0.12.0 (or later: ~> 0.12.26)
  • bump the gem version up to next patch version (e.g. 0.4.1 to 0.4.2)
  • release it to RubyGems.org

2. Update Dependency

The following updates are on the master branch. You should update the dependency in gemspec first for Fluentd v1.
  • fix dependency of Fluentd to [">= 1", "< 2"]
  • execute bundle install
Recommended dependency in gemspec:
# in gemspec
Gem::Specification.new do |gem|
gem.name = 'fluent-plugin-my_awesome'
# ...
gem.add_runtime_dependency 'fluentd', ['>= 1', "< 2']
end

3. Update Code and Tests

There are many differences between plugin types on updating code and tests. See "Updating Plugin Code" section below for each type of plugin.
  • update code and tests
  • run bundle exec rake test

4. Update CI Environments

If you have CI configurations like .travis.yml and appvayor.yml, these should be updated to support Fluentd v1. Fluentd v1 supports Ruby 2.4 or later. CI environments should not include Ruby 2.3 or earlier.
  • remove Ruby 2.3 or earlier from CI environments
  • add Ruby 2.4 (or other latest version) to CI environments

5. Add Requirements Section

Add a Requirements section to README.md like this:

Requirements

fluent-plugin-may_awesome
Fluentd
Ruby
>= 1.0.0
>= v1
>= 2.4
< 1.0.0
>= v0.12.0
>= 2.1
This helps that plugin users can understand plugin requirements.

6. Release New Version

This is the last step. The new version should bump the major or minor version, not the patch version. If the current major version of your gem is >= 1, you should bump the major version (e.g. from 1 to 2). If the current major version is 0, you should bump the minor version (e.g. from 0.4.2 to 0.5.0). Then, you can publish the new release with Fluentd v1.
  • bump the version up
  • release it to RubyGems.org

Updating Plugin Code

For all types of plugins, take care about these things:
  • require files with definitions of classes/modules referred in your plugin code
  • call super in #initialize, #configure, #start and #shutdown
  • use router.emit to emit events into Fluentd instead of Engine.emit
About updating tests, see the "Test Code" section for all plugin types.

Input Plugins

For input plugins, the points to be fixed are:
  • require 'fluent/plugin/input' instead of 'fluent/input'
  • fix superclass from Fluent::Input to Fluent::Plugin::Input
  • use compat_parameters plugin helper for compatibility with v0.12 config style
  • use Fluent::Engine.now or Fluent::EventTime.now to create current time
    object instead of Time.now.to_i
  • update test code
Plugins will work fine once the above changes are incorporated.
Moreover, most input plugins create threads, timers, network servers and/or parsers. It is better to use plugin helpers to simplify code and to make tests stable.
For more details, see Plugin Helper Overview.

Filter Plugins

For filter plugins, the points to be fixed are:
  • require 'fluent/plugin/filter' instead of 'fluent/filter'
  • fix superclass from Fluent::Filter to Fluent::Plugin::Filter
  • use compat_parameters plugin helper for compatibility with v0.12 config style
  • update test code
Plugins will work fine once the above changes are incorporated. But if your plugin implements #filter_stream, remove it if possible. Overriding #filter_stream makes it impossible to optimize filters' performance.
Moreover, many filter plugins use parsers or formatters. It is better to use plugin helpers for them to simplify the code and make the configuration easier to understand.

Non-Buffered Output Plugins

For output plugins, the points to be fixed are:
  • require 'fluent/plugin/output' instead of 'fluent/output'
  • fix superclass from Fluent::Output to Fluent::Plugin::Output
  • use compat_parameters plugin helper for compatibility with v0.12 config style
  • remove #emit method and implement #process(tag, es) method
  • update test code
If your output plugin emits events into Fluentd, follow these points too:
  • use event_emitter plugin helper to introduce a router
  • use Fluent::Engine.now or Fluent::EventTime.now to create current time
    object instead of Time.now.to_i
It is recommended to use plugin helpers if your plugin creates any one of thread, timer, socket, child process, and/or parsers/formatters. Using plugin helpers simplifies the code and makes the tests stable.
For more details, see Plugin Helper Overview.
Before:
require 'fluent/output'
module Fluent
class SomeOutput < Output
Fluent::Plugin.register_output('NAME', self)
def configure(conf)
super
# ...
end
def start
super
# ...
end
def shutdown
super
# ...
end
def emit(tag, es, chain)
chain.next
es.each do |time,record|
log.info 'OK!'
end
end
end
end
After:
require 'fluent/plugin/output'
module Fluent
module Plugin
class SomeOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('NAME', self)
helpers :compat_parameters
def configure(conf)
compat_parameters_convert(conf, ...)
super
# ...
end
def start
super
# ...
end
def shutdown
# ...
super # This super must be at the end of shutdown method
end
def process(tag, es)
es.each do |time, record|
log.info 'OK!'
end
end
end
end
end

Buffered Output Plugins

For buffered output plugins (subclass of Fluent::BufferedOutput), the points to be fixed are:
  • require 'fluent/plugin/output' instead of 'fluent/output'
  • fix superclass from Fluent::BufferedOutput to Fluent::Plugin::Output
  • use compat_parameters plugin helper for compatibility with v0.12 config style
  • implement #compat_parameters_default_chunk_key to return an empty string to
    show chunk key is not specified
  • fix config_set_default and its parameter names to override parameters in
    <buffer> section
  • remove #format_stream method if it is implemented in your plugin (it is not
    supported)
  • update test code
It is recommended to use plugin helpers if your plugin creates any one of thread, timer, socket, child process, and/or parsers/formatters. Using plugin helpers simplifies the code and makes the tests stable.
For more details, see Plugin Helper Overview.
Before:
require 'fluent/output'
module Fluent
class SomeOutput < BufferedOutput
Fluent::Plugin.register_output('NAME', self)
config_param :path, :string
def configure(conf)
super
# ...
end
def start
super
# ...
end
def shutdown
super
# ...
end
def format(tag, time, record)
# ...
end
def write(chunk)
data = chunk.read
print data
end
## Optionally, you can use chunk.msgpack_each to deserialize objects.
#def write(chunk)
# chunk.msgpack_each {|(tag,time,record)|
# }
#end
end
end
After:
require 'fluent/plugin/output'
module Fluent
module Plugin
class SomeOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('NAME', self)
helpers :compat_parameters
config_param :path, :string
def configure(conf)
compat_parameters_convert(conf, ...)
super
# ...
end
def start
super
# ...
end
def shutdown
# ...
super # This super must be at the end of shutdown method
end
# method for synchronous buffered output mode
def write(chunk)
end
# method for asynchronous buffered output mode
def try_write(chunk)
end
def format(tag, time, record)
# ...
end
end
end
end
For more details, see Writing Buffered Output Plugins.

ObjectBuffered Output Plugins

For object buffered output plugins (subclass of Fluent::ObjectBufferedOutput), the points to be fixed are:
  • require 'fluent/plugin/output' instead of 'fluent/output'
  • fix superclass from Fluent::ObjectBufferedOutput to Fluent::Plugin::Output
  • use compat_parameters plugin helper for compatibility with v0.12 config style
  • implement #compat_parameters_default_chunk_key to return "tag" to show
    chunk key is tag (or something else if your plugin overwrites #emit to
    change key)
  • fix config_set_default and its parameter names to override parameters in
    <buffer> section
  • fix #write method code not to use chunk.key, to use chunk.metadata.tag
    and #extract_placeholders
  • update test code
It is recommended to use plugin helpers if your plugin creates any one of thread, timer, socket, child process, and/or parsers/formatters. Using plugin helpers simplifies the code and makes the tests stable.
For more details, see Plugin Helper Overview.
Before:
require 'fluent/output'
module Fluent
class SomeOutput < ObjectBufferedOutput
Plugin.register_output('NAME', self)
# configure(conf), start, shutdown
# ...
def write(chunk)
# ...
end
end
end
After: Same as the buffered output.
For more details, see Writing Buffered Output Plugins.

TimeSliced Output Plugins

For the time-sliced output plugins (sub-class of Fluent::TimeSlicedOutput), the points to be fixed are:
  • require 'fluent/plugin/output' instead of 'fluent/output'
  • fix superclass from Fluent::TimeSlicedOutput to Fluent::Plugin::Output
  • use compat_parameters plugin helper for compatibility with v0.12 config style
  • implement #compat_parameters_default_chunk_key to return "time" to show
    chunk key is time
  • set default value of timekey in <buffer> section if your plugin specifies
    default time_slice_format
  • fix config_set_default and its parameter names to override parameters in
    <buffer> section
  • fix #write method code not to use chunk.key, to use
    chunk.metadata.timekey and #extract_placeholders
  • update test code
It is recommended to use plugin helpers if your plugin creates any one of thread, timer, socket, child process and/or parsers/formatters. It is better to use plugin helpers to simplify code and to make tests stable.
For more details, see Plugin Helper Overview.
Before (code):
require 'fluent/output'
module Fluent
class SomeOutput < TimeSlicedOutput
Plugin.register_output('NAME', self)
# configure(conf), start, shutdown
# ...
def write(chunk)
day = chunk.key
# ...
end
end
end
Before (configuration):
<match *>
@type ...
time_slice_format %Y%m%d%H
</match>
After (code): Same as the buffered output.
For more details, see Writing Buffered Output Plugins.
After (configuration):
Use <buffer> section to customize chunking.
<match *>
@type ...
<buffer time>
timekey 1h
</buffer>
</match>

Multi Output Plugins

For the multi-output plugins (sub-class of Fluent::MultiOutput), there are many points to be considered.
If the plugin uses <store> sections and instantiates plugins per each store section, use Fluent::Plugin::MultiOutput. See code to know how to use it: lib/fluent/plugin/multi_output.rb or some built-in plugins such asout_copy and out_roundrobin.
Otherwise, your plugin does something curious for Fluentd. Read code of lib/fluent/plugin/output.rb and lib/fluent/plugin/bare_output.rb, and consider which is better for your plugin. But, it is advised against using Fluent::Plugin::BareOutput for most use cases.

Output Plugins using Mixins

Fluent::HandleTagAndTimeMixin, Fluent::SetTagKeyMixin, Fluent::SetTimeKeyMixin

Use inject and compat_parameters plugin helper in the plugin code.
The old configuration will be converted to the new style configuration automatically if the plugin code uses the proper plugin helpers. So, plugin users will not need to rewrite the configuration immediately.
Fluentd shows the converted new style configuration in the startup log if the user provides an old-style configuration. The user may then rewrite the configuration dumped in the log.
Before:
<match **>
@type some_output
include_tag_key true
tag_key tag
include_time_key true
time_key time
time_format %Y-%m-%d
</match>
After:
<match **>
@type some_output
<inject>
tag_key tag
time_key time
time_format %Y-%m-%d
</inject>
</match>

Fluent::HandleTagNameMixin

Related configurations:
  • remove_tag_prefix
  • remove_tag_suffix
  • add_tag_prefix
  • add_tag_suffix
Use extract_placeholders(template, chunk) in plugin code.
Use placeholders ${tag}, ${tag[0]}, ${tag[1]} in configuration.
Before:
<match input.access>
@type some_output
remove_tag_prefix input.
tag some.${tag}
<record>
# ...
</record>
</match>
After:
<match input.access>
@type some_output
tag some.${tag[1]}
<record>
# ...
</record>
</match>

Parser Plugins

  • require 'fluent/plugin/parser' instead of 'fluent/parser'
  • fix superclass from Fluent::Parser to Fluent::Plugin::Parser
  • use compat_parameters plugin helper for compatibility with v0.12 config style
  • update test code

Formatter Plugins

  • require 'fluent/plugin/formatter' instead of 'fluent/formatter'
  • fix superclass from Fluent::Formatter to Fluent::Plugin::Formatter
  • use compat_parameters plugin helper for compatibility with v0.12 config style
  • update test code

Test Code

  • organize test_helper.rb
  • require 'fluent/test/driver/output' and 'fluent/test'
  • replace test driver from Fluent::Test::OutputTestDrive to
    Fluent::Test::Driver::Output
  • use new test driver API
For example, here is an output plugin's test code.
For more details, see Writing Plugin Test Code.
Before:
test/test_helper.rb
require 'rubygems'
require 'bundler'
begin
Bundler.setup(:default, :development)
rescue Bundler::BundlerError => e
$stderr.puts e.message
$stderr.puts 'Run `bundle install` to install missing gems'
exit e.status_code
end
require 'test/unit'
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))
require 'fluent/test'
unless ENV.has_key?('VERBOSE')
nulllogger = Object.new
nulllogger.instance_eval {|obj|
def method_missing(method, *args)
# pass
end
}
$log = nulllogger
end
class Test::Unit::TestCase
end
test/plugin/test_some_output.rb
require 'test_helper'
require 'fluent/plugin/out_some'
class SomeOutputTest < Test::Unit::TestCase
def create_driver(conf, tag = 'test')
Fluent::Test::OutputTestDriver.new(Fluent::SomeOutput, tag).configure(conf)
end
def setup
Fluent::Test.setup
end
def test_configure
# Configuration-related test cases
end
end
After:
test_helper.rb
require 'bundler/setup'
require 'test/unit'
$LOAD_PATH.unshift(File.join(__dir__, '..', 'lib'))
$LOAD_PATH.unshift(__dir__)
require 'fluent/test'
test/plugin/test_some_output.rb
require 'test_helper'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_some'
class SomeOutputTest < Test::Unit::TestCase
def create_driver(conf)
Fluent::Test::Driver::Output.new(Fluent::Plugin::SomeOutput).configure(conf)
end
def setup
Fluent::Test.setup
end
sub_test_case 'configure' do
# Configuration-related tests
test 'empty' do
assert_raise(Fluent::ConfigError) do
create_driver('')
end
end
# ...
end
sub_test_case 'emit events' do
# Emit events-related tests
test 'emit 2 simple records' do
d = create_driver(conf)
d.run(default_tag: 'test') do
d.feed(time, record1)
d.feed(time, record2)
end
events = d.events
assert_equal(...)
end
end
end
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.