Fluentd
Search…
Upgrade Plugin from v0.12
This guide is for plugin authors to show how to update input/output/filter plugins written for Fluentd v0.12 or earlier.
There are some things to be considered (see "Updating Plugins Overview" section for details):
    Plugins using v0.12 API will be supported in Fluentd v1. This compatibility
    guarantee will no longer be applicable with v2.
    Users may use the new features of Fluentd v1 only with the plugins using new
    API.
    Plugins using the new API will not work with Fluentd v0.12.x.
It is strongly recommended to use v1 API to write your plugins stable, consistent and easy to test.

Updating Plugins Overview

Following are the steps to update your plugins safely:
    1.
    Release a Latest Version for Fluentd v0.12.x
    2.
    Update Dependency
    3.
    Update Code and Tests
    4.
    Update CI Environments
    5.
    Release the Newer Version for Fluentd v1 and Later

1. Release a Latest Version

At first, you should make a git branch named as v0.12 (if you are using git for that plugin), and release the latest patch version from that branch without any changes, except fixing dependency of Fluentd ~> 0.12.0. This makes it possible to fix bugs and release newer versions for Fluentd v0.12 users without breaking anything.
    make a branch for Fluentd v0.12 versions
    fix dependency of Fluentd to ~> 0.12.0 (or later: ~> 0.12.26)
    bump the gem version up to next patch version (e.g. 0.4.1 to 0.4.2)
    release it to RubyGems.org

2. Update Dependency

The following updates are on the master branch. You should update the dependency in gemspec first for Fluentd v1.
    fix dependency of Fluentd to [">= 1", "< 2"]
    execute bundle install
Recommended dependency in gemspec:
1
# in gemspec
2
Gem::Specification.new do |gem|
3
gem.name = 'fluent-plugin-my_awesome'
4
# ...
5
gem.add_runtime_dependency 'fluentd', ['>= 1', "< 2']
6
end
Copied!

3. Update Code and Tests

There are many differences between plugin types on updating code and tests. See "Updating Plugin Code" section below for each type of plugin.
    update code and tests
    run bundle exec rake test

4. Update CI Environments

If you have CI configurations like .travis.yml and appvayor.yml, these should be updated to support Fluentd v1. Fluentd v1 supports Ruby 2.4 or later. CI environments should not include Ruby 2.3 or earlier.
    remove Ruby 2.3 or earlier from CI environments
    add Ruby 2.4 (or other latest version) to CI environments

5. Add Requirements Section

Add a Requirements section to README.md like this:

Requirements

fluent-plugin-may_awesome
Fluentd
Ruby
>= 1.0.0
>= v1
>= 2.4
< 1.0.0
>= v0.12.0
>= 2.1
This helps that plugin users can understand plugin requirements.

6. Release New Version

This is the last step. The new version should bump the major or minor version, not the patch version. If the current major version of your gem is >= 1, you should bump the major version (e.g. from 1 to 2). If the current major version is 0, you should bump the minor version (e.g. from 0.4.2 to 0.5.0). Then, you can publish the new release with Fluentd v1.
    bump the version up
    release it to RubyGems.org

Updating Plugin Code

For all types of plugins, take care about these things:
    require files with definitions of classes/modules referred in your plugin code
    call super in #initialize, #configure, #start and #shutdown
    use router.emit to emit events into Fluentd instead of Engine.emit
About updating tests, see the "Test Code" section for all plugin types.

Input Plugins

For input plugins, the points to be fixed are:
    require 'fluent/plugin/input' instead of 'fluent/input'
    fix superclass from Fluent::Input to Fluent::Plugin::Input
    use compat_parameters plugin helper for compatibility with v0.12 config style
    use Fluent::Engine.now or Fluent::EventTime.now to create current time
    object instead of Time.now.to_i
    update test code
Plugins will work fine once the above changes are incorporated.
Moreover, most input plugins create threads, timers, network servers and/or parsers. It is better to use plugin helpers to simplify code and to make tests stable.
For more details, see Plugin Helper Overview.

Filter Plugins

For filter plugins, the points to be fixed are:
    require 'fluent/plugin/filter' instead of 'fluent/filter'
    fix superclass from Fluent::Filter to Fluent::Plugin::Filter
    use compat_parameters plugin helper for compatibility with v0.12 config style
    update test code
Plugins will work fine once the above changes are incorporated. But if your plugin implements #filter_stream, remove it if possible. Overriding #filter_stream makes it impossible to optimize filters' performance.
Moreover, many filter plugins use parsers or formatters. It is better to use plugin helpers for them to simplify the code and make the configuration easier to understand.

Non-Buffered Output Plugins

For output plugins, the points to be fixed are:
    require 'fluent/plugin/output' instead of 'fluent/output'
    fix superclass from Fluent::Output to Fluent::Plugin::Output
    use compat_parameters plugin helper for compatibility with v0.12 config style
    remove #emit method and implement #process(tag, es) method
    update test code
If your output plugin emits events into Fluentd, follow these points too:
    use event_emitter plugin helper to introduce a router
    use Fluent::Engine.now or Fluent::EventTime.now to create current time
    object instead of Time.now.to_i
It is recommended to use plugin helpers if your plugin creates any one of thread, timer, socket, child process, and/or parsers/formatters. Using plugin helpers simplifies the code and makes the tests stable.
For more details, see Plugin Helper Overview.
Before:
1
require 'fluent/output'
2
3
module Fluent
4
class SomeOutput < Output
5
Fluent::Plugin.register_output('NAME', self)
6
7
def configure(conf)
8
super
9
# ...
10
end
11
12
def start
13
super
14
# ...
15
end
16
17
def shutdown
18
super
19
# ...
20
end
21
22
def emit(tag, es, chain)
23
chain.next
24
es.each do |time,record|
25
log.info 'OK!'
26
end
27
end
28
end
29
end
Copied!
After:
1
require 'fluent/plugin/output'
2
3
module Fluent
4
module Plugin
5
class SomeOutput < Fluent::Plugin::Output
6
Fluent::Plugin.register_output('NAME', self)
7
8
helpers :compat_parameters
9
10
def configure(conf)
11
compat_parameters_convert(conf, ...)
12
super
13
# ...
14
end
15
16
def start
17
super
18
# ...
19
end
20
21
def shutdown
22
# ...
23
super # This super must be at the end of shutdown method
24
end
25
26
def process(tag, es)
27
es.each do |time, record|
28
log.info 'OK!'
29
end
30
end
31
end
32
end
33
end
Copied!

Buffered Output Plugins

For buffered output plugins (subclass of Fluent::BufferedOutput), the points to be fixed are:
    require 'fluent/plugin/output' instead of 'fluent/output'
    fix superclass from Fluent::BufferedOutput to Fluent::Plugin::Output
    use compat_parameters plugin helper for compatibility with v0.12 config style
    implement #compat_parameters_default_chunk_key to return an empty string to
    show chunk key is not specified
    fix config_set_default and its parameter names to override parameters in
    <buffer> section
    remove #format_stream method if it is implemented in your plugin (it is not
    supported)
    update test code
It is recommended to use plugin helpers if your plugin creates any one of thread, timer, socket, child process, and/or parsers/formatters. Using plugin helpers simplifies the code and makes the tests stable.
For more details, see Plugin Helper Overview.
Before:
1
require 'fluent/output'
2
3
module Fluent
4
class SomeOutput < BufferedOutput
5
Fluent::Plugin.register_output('NAME', self)
6
7
config_param :path, :string
8
9
def configure(conf)
10
super
11
# ...
12
end
13
14
def start
15
super
16
# ...
17
end
18
19
def shutdown
20
super
21
# ...
22
end
23
24
def format(tag, time, record)
25
# ...
26
end
27
28
def write(chunk)
29
data = chunk.read
30
print data
31
end
32
33
## Optionally, you can use chunk.msgpack_each to deserialize objects.
34
#def write(chunk)
35
# chunk.msgpack_each {|(tag,time,record)|
36
# }
37
#end
38
end
39
end
Copied!
After:
1
require 'fluent/plugin/output'
2
3
module Fluent
4
module Plugin
5
class SomeOutput < Fluent::Plugin::Output
6
Fluent::Plugin.register_output('NAME', self)
7
8
helpers :compat_parameters
9
10
config_param :path, :string
11
12
def configure(conf)
13
compat_parameters_convert(conf, ...)
14
super
15
# ...
16
end
17
18
def start
19
super
20
# ...
21
end
22
23
def shutdown
24
# ...
25
super # This super must be at the end of shutdown method
26
end
27
28
# method for synchronous buffered output mode
29
def write(chunk)
30
end
31
32
# method for asynchronous buffered output mode
33
def try_write(chunk)
34
end
35
36
def format(tag, time, record)
37
# ...
38
end
39
end
40
end
41
end
Copied!
For more details, see Writing Buffered Output Plugins.

ObjectBuffered Output Plugins

For object buffered output plugins (subclass of Fluent::ObjectBufferedOutput), the points to be fixed are:
    require 'fluent/plugin/output' instead of 'fluent/output'
    fix superclass from Fluent::ObjectBufferedOutput to Fluent::Plugin::Output
    use compat_parameters plugin helper for compatibility with v0.12 config style
    implement #compat_parameters_default_chunk_key to return "tag" to show
    chunk key is tag (or something else if your plugin overwrites #emit to
    change key)
    fix config_set_default and its parameter names to override parameters in
    <buffer> section
    fix #write method code not to use chunk.key, to use chunk.metadata.tag
    and #extract_placeholders
    update test code
It is recommended to use plugin helpers if your plugin creates any one of thread, timer, socket, child process, and/or parsers/formatters. Using plugin helpers simplifies the code and makes the tests stable.
For more details, see Plugin Helper Overview.
Before:
1
require 'fluent/output'
2
3
module Fluent
4
class SomeOutput < ObjectBufferedOutput
5
Plugin.register_output('NAME', self)
6
# configure(conf), start, shutdown
7
# ...
8
9
def write(chunk)
10
# ...
11
end
12
end
13
end
Copied!
After: Same as the buffered output.
For more details, see Writing Buffered Output Plugins.

TimeSliced Output Plugins

For the time-sliced output plugins (sub-class of Fluent::TimeSlicedOutput), the points to be fixed are:
    require 'fluent/plugin/output' instead of 'fluent/output'
    fix superclass from Fluent::TimeSlicedOutput to Fluent::Plugin::Output
    use compat_parameters plugin helper for compatibility with v0.12 config style
    implement #compat_parameters_default_chunk_key to return "time" to show
    chunk key is time
    set default value of timekey in <buffer> section if your plugin specifies
    default time_slice_format
    fix config_set_default and its parameter names to override parameters in
    <buffer> section
    fix #write method code not to use chunk.key, to use
    chunk.metadata.timekey and #extract_placeholders
    update test code
It is recommended to use plugin helpers if your plugin creates any one of thread, timer, socket, child process and/or parsers/formatters. It is better to use plugin helpers to simplify code and to make tests stable.
For more details, see Plugin Helper Overview.
Before (code):
1
require 'fluent/output'
2
3
module Fluent
4
class SomeOutput < TimeSlicedOutput
5
Plugin.register_output('NAME', self)
6
# configure(conf), start, shutdown
7
# ...
8
9
def write(chunk)
10
day = chunk.key
11
# ...
12
end
13
end
14
end
Copied!
Before (configuration):
1
<match *>
2
@type ...
3
time_slice_format %Y%m%d%H
4
</match>
Copied!
After (code): Same as the buffered output.
For more details, see Writing Buffered Output Plugins.
After (configuration):
Use <buffer> section to customize chunking.
1
<match *>
2
@type ...
3
<buffer time>
4
timekey 1h
5
</buffer>
6
</match>
Copied!

Multi Output Plugins

For the multi-output plugins (sub-class of Fluent::MultiOutput), there are many points to be considered.
If the plugin uses <store> sections and instantiates plugins per each store section, use Fluent::Plugin::MultiOutput. See code to know how to use it: lib/fluent/plugin/multi_output.rb or some built-in plugins such asout_copy and out_roundrobin.
Otherwise, your plugin does something curious for Fluentd. Read code of lib/fluent/plugin/output.rb and lib/fluent/plugin/bare_output.rb, and consider which is better for your plugin. But, it is advised against using Fluent::Plugin::BareOutput for most use cases.

Output Plugins using Mixins

Fluent::HandleTagAndTimeMixin, Fluent::SetTagKeyMixin, Fluent::SetTimeKeyMixin

Use inject and compat_parameters plugin helper in the plugin code.
The old configuration will be converted to the new style configuration automatically if the plugin code uses the proper plugin helpers. So, plugin users will not need to rewrite the configuration immediately.
Fluentd shows the converted new style configuration in the startup log if the user provides an old-style configuration. The user may then rewrite the configuration dumped in the log.
Before:
1
<match **>
2
@type some_output
3
include_tag_key true
4
tag_key tag
5
include_time_key true
6
time_key time
7
time_format %Y-%m-%d
8
</match>
Copied!
After:
1
<match **>
2
@type some_output
3
<inject>
4
tag_key tag
5
time_key time
6
time_format %Y-%m-%d
7
</inject>
8
</match>
Copied!

Fluent::HandleTagNameMixin

Related configurations:
    remove_tag_prefix
    remove_tag_suffix
    add_tag_prefix
    add_tag_suffix
Use extract_placeholders(template, chunk) in plugin code.
Use placeholders ${tag}, ${tag[0]}, ${tag[1]} in configuration.
Before:
1
<match input.access>
2
@type some_output
3
remove_tag_prefix input.
4
tag some.${tag}
5
<record>
6
# ...
7
</record>
8
</match>
Copied!
After:
1
<match input.access>
2
@type some_output
3
tag some.${tag[1]}
4
<record>
5
# ...
6
</record>
7
</match>
Copied!

Parser Plugins

    require 'fluent/plugin/parser' instead of 'fluent/parser'
    fix superclass from Fluent::Parser to Fluent::Plugin::Parser
    use compat_parameters plugin helper for compatibility with v0.12 config style
    update test code

Formatter Plugins

    require 'fluent/plugin/formatter' instead of 'fluent/formatter'
    fix superclass from Fluent::Formatter to Fluent::Plugin::Formatter
    use compat_parameters plugin helper for compatibility with v0.12 config style
    update test code

Test Code

    organize test_helper.rb
    require 'fluent/test/driver/output' and 'fluent/test'
    replace test driver from Fluent::Test::OutputTestDrive to
    Fluent::Test::Driver::Output
    use new test driver API
For example, here is an output plugin's test code.
For more details, see Writing Plugin Test Code.
Before:
test/test_helper.rb
1
require 'rubygems'
2
require 'bundler'
3
begin
4
Bundler.setup(:default, :development)
5
rescue Bundler::BundlerError => e
6
$stderr.puts e.message
7
$stderr.puts 'Run `bundle install` to install missing gems'
8
exit e.status_code
9
end
10
11
require 'test/unit'
12
13
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
14
$LOAD_PATH.unshift(File.dirname(__FILE__))
15
require 'fluent/test'
16
17
unless ENV.has_key?('VERBOSE')
18
nulllogger = Object.new
19
nulllogger.instance_eval {|obj|
20
def method_missing(method, *args)
21
# pass
22
end
23
}
24
$log = nulllogger
25
end
26
27
class Test::Unit::TestCase
28
end
Copied!
test/plugin/test_some_output.rb
1
require 'test_helper'
2
require 'fluent/plugin/out_some'
3
4
class SomeOutputTest < Test::Unit::TestCase
5
def create_driver(conf, tag = 'test')
6
Fluent::Test::OutputTestDriver.new(Fluent::SomeOutput, tag).configure(conf)
7
end
8
9
def setup
10
Fluent::Test.setup
11
end
12
13
def test_configure
14
# Configuration-related test cases
15
end
16
end
Copied!
After:
test_helper.rb
1
require 'bundler/setup'
2
require 'test/unit'
3
$LOAD_PATH.unshift(File.join(__dir__, '..', 'lib'))
4
$LOAD_PATH.unshift(__dir__)
5
require 'fluent/test'
Copied!
test/plugin/test_some_output.rb
1
require 'test_helper'
2
require 'fluent/test/driver/output'
3
require 'fluent/plugin/out_some'
4
5
class SomeOutputTest < Test::Unit::TestCase
6
def create_driver(conf)
7
Fluent::Test::Driver::Output.new(Fluent::Plugin::SomeOutput).configure(conf)
8
end
9
10
def setup
11
Fluent::Test.setup
12
end
13
14
sub_test_case 'configure' do
15
# Configuration-related tests
16
test 'empty' do
17
assert_raise(Fluent::ConfigError) do
18
create_driver('')
19
end
20
end
21
# ...
22
end
23
24
sub_test_case 'emit events' do
25
# Emit events-related tests
26
test 'emit 2 simple records' do
27
d = create_driver(conf)
28
d.run(default_tag: 'test') do
29
d.feed(time, record1)
30
d.feed(time, record2)
31
end
32
events = d.events
33
assert_equal(...)
34
end
35
end
36
end
Copied!
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.
Last modified 4mo ago