Fluentd
Search…
How to Write Filter Plugin
This section shows how to write a custom filter plugin in addition to the core ones. The plugin filenames, starting with filter_ prefix, are registered as filter plugins.
See Plugin Base Class API for more details on the common APIs of all the plugins.
Here is the implementation of the most basic filter that passes through all the events as-is:
1
require 'fluent/plugin/filter'
2
3
module Fluent::Plugin
4
class PassThruFilter < Filter
5
# Register this filter as "passthru"
6
Fluent::Plugin.register_filter('passthru', self)
7
8
# config_param works like other plugins
9
10
def configure(conf)
11
super
12
# Do the usual configuration here
13
end
14
15
# def start
16
# super
17
# # Override this method if anything needed as startup.
18
# end
19
20
# def shutdown
21
# # Override this method to use it to free up resources, etc.
22
# super
23
# end
24
25
def filter(tag, time, record)
26
# Since our example is a pass-thru filter, it does nothing and just
27
# returns the record as-is.
28
# If returns nil, that records are ignored.
29
record
30
end
31
end
32
end
Copied!

Methods

A filter plugin overrides the one of filter/filter_with_time/filter_steam method.

#filter(tag, time, record)

This method implements the filtering logic.
    tag: is a String,
    time is a Fluent::EventTime or an Integer; and,
    record is a Hash with String keys.
The return value of this method should be a Hash of modified record, or nil. If it is nil, the event will be discarded.
1
# example
2
def filter(tag, time, record)
3
# process record
4
record['fluentd_tag'] = tag
5
record
6
end
Copied!

#filter_with_time(tag, time, record)

This method implements the filtering logic with time update. Event time will be replaced with the return value.
    tag: is a String,
    time is a Fluent::EventTime or an Integer; and,
    record is a Hash with String keys.
The return value of this method should be two element array, [new_time, new_record] , or nil. If it is nil, the event will be discarded.
1
# example
2
def filter_with_time(tag, time, record)
3
new_time = get_time_from_record(record)
4
new_record = update_record(tag, record)
5
return new_time, new_record # this is same with return [new_time, new_record]
6
end
Copied!

#filter_steam(tag, es)

This method implements the event stream based filtering logic. If you hard to implement the logic with filter, e.g. need to handle multiple records in one processing, use this method.
The return value of this method should be MultiEventStream. If it is nil, the event will be discarded.
1
# example
2
def filter_stream(tag, es)
3
new_es = Fluent::MultiEventStream.new
4
es.each { |time, record|
5
new_time = process_time(tag, time, record)
6
new_record = process_record(tag, time, record)
7
new_es.add(time, record)
8
}
9
new_es
10
end
Copied!

Writing Tests

Fluentd filter plugin has one or some points to be tested. Others (parsing configurations, controlling buffers, retries, flushes and many others) are controlled by Fluentd core.
Fluentd also provides test driver for plugins. You can write tests for your own plugins very easily:
1
# test/plugin/test_filter_your_own.rb
2
3
require 'test/unit'
4
require 'fluent/plugin/test/driver/filter'
5
6
# your own plugin
7
require 'fluent/plugin/filter_your_own'
8
9
class YourOwnFilterTest < Test::Unit::TestCase
10
def setup
11
Fluent::Test.setup # this is required to setup router and others
12
end
13
14
# default configuration for tests
15
CONFIG = %[
16
param1 value1
17
param2 value2
18
]
19
20
def create_driver(conf = CONFIG)
21
Fluent::Test::Driver::Filter.new(Fluent::Plugin::YourOwnFilter).configure(conf)
22
end
23
24
def filter(config, messages)
25
d = create_driver(config)
26
d.run(default_tag: 'input.access') do
27
messages.each do |message|
28
d.feed(message)
29
end
30
end
31
d.filtered_records
32
end
33
34
sub_test_case 'configured with invalid configuration' do
35
test 'empty configuration' do
36
assert_raise(Fluent::ConfigError) do
37
create_driver('')
38
end
39
end
40
41
test 'param1 should reject too short string' do
42
conf = %[
43
param1 a
44
]
45
assert_raise(Fluent::ConfigError) do
46
create_driver(conf)
47
end
48
end
49
# ...
50
end
51
52
sub_test_case 'plugin will add some fields' do
53
test 'add hostname to record' do
54
conf = CONFIG
55
messages = [
56
{ 'message' => 'This is test message' }
57
]
58
expected = [
59
{ 'message' => 'This is test message', 'hostname' => 'example.com' }
60
]
61
filtered_records = filter(conf, messages)
62
assert_equal(expected, filtered_records)
63
end
64
# ...
65
end
66
# ...
67
end
Copied!

Overview of Tests

Testing for the filter plugins is mainly for:
    Validation of configuration parameters (i.e. #configure)
    Validation of the filtered records
To make testing easy, the plugin test driver provides a dummy router, a logger and general functionality to override the system, parser and other relevant configurations.
The lifecycle of the plugin and its test driver is:
    1.
    Instantiate the test driver which then instantiates the plugin
    2.
    Configure plugin
    3.
    Register conditions to stop/break running tests
    4.
    Run test code (provided as a block for d.run)
    5.
    Assert results of tests using data provided by the driver
At the start of Step # 4, the test driver calls the startup methods of the plugin e.g. #start and at the end #stop, #shutdown, etc. It can be skipped by optional arguments of #run.
For:
    configuration tests, repeat steps # 1-2
    full feature tests, repeat steps # 1-5
For more details, see Testing API for Plugins.
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.
Last modified 4mo ago