Writing Output Plugins
To create an output plugin, you need to define a class inheriting Fluent::Plugin::Output and implement a set of methods on it.
The exact set of methods to be implemented is dependent on the design of your plugin. The following is a general template for writing a custom output plugin.
require 'fluent/plugin/output' module Fluent::Plugin class SomeOutput < Output # Register the name of your plugin (Choose a name which # is not used by any other output plugins). Fluent::Plugin.register_output('some', self) # Enable threads if you are writing an async buffered plugin helpers :thread # Define parameters for your plugin config_param :path, :string #### Non-Buffered Output ############################# # Implement process() if your plugin is unbeffered. # Read "Non-Buffered output" for details. ###################################################### def process(tag, es) es.each do |time, record| # output events to ... end end #### Sync Buffered Output ############################## # Implement write() if your plugin uses a normal buffer. # Read "Sync Buffered Output" for details. ######################################################## def write(chunk) real_path = extract_placeholders(@path, chunk) log.debug "writing data to file", chunk_id: dump_unique_id_hex(chunk.unique_id) # For standard chunk format (without #format() method) chunk.each do |time, record| # output events to ... end # For custom format (when #format() implemented) # File.open(real_path, 'w+') ## or #write_to(io) is available # File.open(real_path, 'w+') do |file| # chunk.write_to(file) # end end #### Async Buffered Output ############################# # Implement try_write() if you want to defer commit of # chunks. Read "Async Buffered Output" for how it works. ######################################################## def try_write(chunk) real_path = extract_placeholders(@path, chunk) log.debug "sending data to server", chunk_id: dump_unique_id_hex(chunk.unique_id) send_data_to_server(@host, real_path, chunk.read) chunk_id = chunk.unique_id # Create a thread for defered commit. thread_create(:check_send_result) do while thread_current_running? sleep SENDDATA_CHECK_INTERVAL # == 5 if check_data_on_server(real_path, chunk_id) # commit chunk - chunk will be deleted and not be retried anymore by this call commit_write(chunk_id) break end end end end # Override #format if you want to customize how Fluentd stores # events. Read the section "How to Customize the Serialization # Format for Chunks" for how it works. def format(tag, time, record) [tag, time, record].to_json end end end
Table of Contents
- Three Modes of Output Plugins
- How Buffers Work
- Understanding Chunking and Metadata
- How To Control Buffering
- How to Change the Default Values for Parameters
- Development Guide
- How To Use Asynchronous Buffered Mode and Delayed Commit
- How to Customize the Serialization Format for Chunks
- List of Interface Methods
- #process(tag, es)
- #write(chunk)
- #try_write(chunk)
- #format(tag, time, record)
- #prefer_buffered_processing
- #prefer_delayed_commit
- #extract_placeholders(str, chunk)
- #commit_write(chunk_id)
- #rollback_write(chunk_id)
- #dump_unique_id_hex(chunk_id)
- es.size
- es.each(&block)
- chunk.unique_id
- chunk.metadata
- chunk.size
- chunk.read
- chunk.open(&block)
- chunk.write_to(io)
- chunk.each(&block)
- How to Write Tests
Three Modes of Output Plugins
Output plugins have three operation modes. Each mode has a set of interfaces to implement. Any output plugin must support (at least) one of these three modes.
Non-buffered Mode
This is the simplest operation mode. The output plugin transfers events to the destination immediately after the reception. It does not use any buffer and never attempts to retry on errors.
For example, the built-in plugin out_stdout
normally operates in this mode; This plugin just writes each event to stdout and essentially has no state.
This mode is available when #process
method is implemented.
Sync Buffered Mode
In this mode, the output plugin temporally stores events in a buffer and send them later. The exact behaviour of buffering is finely tunable though configuration parameters.
The most notable benefit of this mode is that it enables you to leverage the native retry mechanism. Temporal errors (like network failures) are transparently handled by Fluentd and you do not need to implement an error-handling mechanism by yourself.
This mode is available when #write
method is implemented.
Async Buffered Mode
In this mode, the output plugin temporally stores events in a buffer and send them later. The major difference with the synchronous mode is that this mode allows you to defer acknowledgement of transferred records. For example, you can implement the “at-least-once” semantics using this mode. Please read “How To Use Asynchronous Buffered Mode” for details.
This mode is available when #try_write
method is implemented.
How Fluentd Chooses Modes
From viewpoint of users: write <buffer>
section to enable buffering - Output plugin will use buffered mode if available, or fail to launch if unavailable.
It’s a bit complex from viewpoint of plugin developers. Here’s full chart how Fluentd decides the mode:
Simply speaking, this is the rule:
- If users configure
<buffer>
section:- plugin tries to do buffering
- Else if plugin implements both of methods for buffered/non-buffered
- plugin calls
#prefer_buffer_processing
to decide it (true means to do buffering: default true)
- plugin calls
- Else plugin does as implemented
When plugin does buffering:
- If plugin implements both of Sync/Async buffered methods
- plugin calls
#prefer_delayed_commit
to decide it (true means to use delayed commit: default true)
- plugin calls
- Else plugin does as implemented
It’s an important point that methods to decide modes will be called in #start
methods, which is called after #configure
. So plugins can decide the default behavior with configured parameter values by overriding these methods (#prefer_buffer_processing
and #prefer_delayed_commit
).
How Buffers Work
Understanding Chunking and Metadata
See Buffer section article for basic knowledge.
Fluentd creates buffer chunks to store events. Each buffer chunks should be written at once, without any re-chunking in methods to write.
In Fluentd v1.0, users can specify chunk keys by themselves using <buffer CHUNK_KEYS>
section.
<buffer> # Use pre-configured chunk keys by plugin <buffer []> # without any chunk keys: all events will be appended into a chunk <buffer time> # events in a time unit will be appended into a chunk # (using ``timekey`` parameter) <buffer tag> # events with same tag will be appended into a chunk <buffer any_key> # events with same value for specified key will be appended into a chunk
When user specifies time
as a chunk key, user must specifies timekey
parameter in buffer section. If it is “15m” (15 minutes), events between 00:00:00 and 00:14:59 will be in a chunk, and an event at 00:15:00 will be in another.
Users can specify 2 or more chunk keys as a list of items separated by comma (‘,’).
<buffer time,tag> <buffer time,country> <buffer tag,country,item>
Specifying too many chunk keys causes that there are too many small chunks. It might be a problem (too many files in a directory for a file buffer).
Metadata, which can be fetched by chunk.metadata
, contains values for chunking. If a user specifies time
, metadata contains a value in metadata.timekey
. If a user doesn’t specify time
, metadata.timekey
is nil. See chunk.metadata
for details.
In most cases, plugin authors don’t need to consider these values. If you want to use these values to generate any paths, keys, table/database names or others, you can use #extract_placeholders
method.
@path = "/mydisk/mydir/${tag}/purchase.%Y-%m-%d.%H.log" # this value might come from configurations extract_placeholders(@path, chunk) # => "/mydisk/mydir/service.cart/purshase.2016-06-10.23.log"
This method call should be done by code of plugins, so plugins which supports this feature should explain which configuration parameter accepts placeholders in documents.
How To Control Buffering
Buffer chunks will be flushed by some reason. Here’s the complete list:
- its size reaches to the limit of bytesize or number of records
- its
timekey
is expired andtimekey_wait
passes - it lives longer than
flush_interval
- it is appended any data and
flush_mode
isimmediate
Buffer configuration has a parameter to control these mode, named flush_mode
, which has 3 modes and default behavior.
- lazy: 1 and 2 are enabled
- interval: 1, 2 and 3 are enabled
- immediate: 4 is enabled
Default is “lazy” if time
is specified as chunk key. Otherwise, interval.
If user specifies flush_mode
explicitly, the plugin works as specified.
How to Change the Default Values for Parameters
There are many configuration parameters in fluent/plugin/output.rb
, and some others in fluent/plugin/buffer.rb
. Almost parameters are designed to work well for many use cases, but we want to overwrite default values of some parameters in plugins.
If you want to change the default chunk keys and the limit of buffer chunk bytesize for your own plugin, you can do it like this:
class MyAwesomeOutput < Output config_section :buffer do config_set_default :chunk_keys, ['tag'] config_set_default :chunk_limit_size, 2 * 1024 * 1024 # 2MB end end
This default value overriding is valid only for your plugin, and doesn’t have any effect for others.
Development Guide
How To Use Asynchronous Buffered Mode and Delayed Commit
Plugins must call #commit_write
method by itself in async buffered mode. It will be done after some checks or wait for completion in destination side, so #try_write
should NOT block processing for it. The best practice is to create a thread or timer to do it when the plugin starts.
class MyAwesomeOutput < Output helpers :timer def start @waiting_ids_mutex = Mutex.new @waiting_ids = [] timer_create(:awesome_delayed_checker, 5) do @waiting_ids_mutex.synchronize{ @waiting_ids.dup }.each do |chunk_id| if check_it_succeeded(chunk_id) commit_write(chunk_id) @waiting_ids_mutex.synchronize{ @waiting_ids.delete(chunk_id) } end end end end def try_write(chunk) chunk_id = chunk.unique_id send_to_destination(chunk) @waiting_ids.synchronize{ @waiting_ids << chunk_id } end end
The plugin can perform writing data and checking/commiting completion in parallel, and can use CPU resources more effeciently.
If you are sure that writing data succeeded right after writing/sending data, you should use sync buffered output instead of async mode. Async mode is for destinations that we cannot check immediately whether operation succeeded or not. |
How to Customize the Serialization Format for Chunks
You can customize the serialization format with which Fluentd stores events into the buffer by overriding the method #format
.
Generally you do not need to do this. Fluentd is equipped with a standard formatting function and there are many benefits to just leave the task to Fluentd (for example, it transparently allows you to iterate through records via chunk.each
).
An exceptional case is when you can deliver chunks to the destination without any further processing. For example, out_file
overrides #format
so that it can produce chunk files that exactly look like final outputs. In this way, out_file
can flush data just by moving chunk files to the destination path.
For further details, please also read the interface definition of #format
below.
List of Interface Methods
Some methods should be overridden / implemented by plugins. On the other hand, plugins MUST NOT override methods without any metions about it.
#process(tag, es)
The method for non-buffered output mode. The plugin which supports non-buffered output must implement this method.
-
tag
: a string, represents tag of events. events in es have same tag. -
es
: an event stream (Fluent::EventStream)
Return values will be ignored.
#write(chunk)
The method for sync buffered output mode. The plugin which supports sync buffered output must implement this method.
-
chunk
: a buffer chunk (Fluent::Plugin::Buffer::Chunk)
This method will be executed in parallel when flush_thread_count
is larger than 1.
So if your plugin modifies instance variables in this method, you need to protect it with Mutex
or similar to avoid broken state.
Return values will be ignored.
#try_write(chunk)
The method for async buffered output mode. The plugin which supports async buffered output must implement this method.
-
chunk
: a buffer chunk (Fluent::Plugin::Buffer::Chunk)
This method will be executed in parallel when flush_thread_count
is larger than 1.
So if your plugin modifies instance variables in this method, you need to protect it with Mutex
or similar to avoid broken state.
Return values will be ignored.
#format(tag, time, record)
The method for custom format of buffer chunks. The plugin which uses custom format to format buffer chunks must implement this method.
-
tag
: a String represents tag of events -
time
: a Fluent::EventTime object, or an Integer which represents unix timestamp (seconds from Epoch) -
record
: a Hash with String keys
Return value must be a String.
#prefer_buffered_processing
The method to specify whether to use buffered or non-buffered output mode in default when both methods are implemented. True means buffered output.
Return value must be true
or false
.
#prefer_delayed_commit
The method to specify whether to use asynchronous or synchronous output mode when both methods are implemented. True means asynchronous buffered output.
Return value must be true
or false
.
#extract_placeholders(str, chunk)
This method extract placeholders in given string, using values in chunk.
-
str
: a String which contains placeholders -
chunk
: a Fluent::Plugin::Buffer::Chunk viawrite
/try_write
Return value is a String.
#commit_write(chunk_id)
This method is to tell Fluentd that specified chunk should be committed. That chunk will be purged after this method call.
-
chunk_id
: a String, brought bychunk.unique_id
This method has some other optional arguments, but these are for internal use.
#rollback_write(chunk_id)
This method is to tell Fluentd that Fluentd should retry writing buffer chunk specified in argument. Plugins can call this method explicitly to retry writing chunks, or plugins also can leave that chunk id until timeout without calling this method.
-
chunk_id
: a String, brought bychunk.unique_id
#dump_unique_id_hex(chunk_id)
This method is to dump buffer chunk ids. Buffer chunk id is a String, but the value includes non-printable characters. This method is to format chunk ids into printable strings, and be used for logging chunk ids.
-
chunk_id
: a String, brought bychunk.unique_id
Return value is a String.
es.size
EventStream#size
returns an Integer, which represents the number of events in the event stream.
es.each(&block)
EventStream#each
receives a block argument, and call that block for each events (time and record).
es.each do |time, record| # ... end
-
time
: a Fluent::EventTime object, or an Integer which represents unix timestamp (seconds from Epoch) -
record
: a Hash with String keys
chunk.unique_id
Chunk#unique_id
returns a String which represents unique id of buffer chunks.
Return value is a String. It includes non-printable characters, so use #dump_unique_id_hex
method to print it in logs or other purposes.
chunk.metadata
Chunk#metadata
returns a Fluent::Plugin::Buffer::Metadata object which contains values for chunking.
Available fields of metadata are:
- timekey: contains an integer represents unix timestamp, which is equal to the first second of timekey range (nil if
time
isn’t specified) - tag: contains a string represents tag (nil if
tag
isn’t specified) - variables: contains a hash with symbol keys, which contains any other keys of chunk keys and values for these (nil if any other chunk keys are specified)
For example, chunk.metadata.timekey
returns an Integer. chunk.metadata.variables[:name]
returns an Object if name
is specified as a chunk key.
chunk.size
Chunk#size
returns an Integer, represents the bytesize of chunks.
chunk.read
Chunk#read
reads all content of the chunk, and return it as a String.
this method doesn't get any arguments, unlinke Ruby's IO object. |
chunk.open(&block)
Chunk#open
receives a block, and call it with an IO argument which provides reade operations.
chunk.open do |io| while data = io.read(100) # ... end end
-
io
: a readable IO object
chunk.write_to(io)
Chunk#write_to
writes entire data into an IO object.
-
io
: a writable IO object
chunk.each(&block)
This method is available only for standard format buffer chunks, and to provide iteration for events in buffer chunks.
chunk.each do |time, record| # ... # tag is available from chunk.metadata.tag if chunk_key is configured with tag end
-
time
: a Fluent::EventTime object, or an Integer which represents unix timestamp (seconds from Epoch) -
record
: a Hash with String keys
How to Write Tests
This section explains how to write a test suite for your custom output plugin.
Plugin Testing Overview
To write a test suite, you need to define a class inheriting Test::Unit::TestCase
. The basic structure is mostly same as any other Test::Unit-based test codes. Here is a minimum example.
require_relative '../helper' require 'fluent/test/driver/output' require 'fluent/plugin/out_stdout' class MyOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup end test 'output a sample event' do # ... end end
Please take notice of Fluent::Test.setup
in the setup function. This function sets up a number of dummy proxies which are very convenient for many testing scenarios. So do not forget to call it!
When you write a test suite for your plugin, please try to cover the following scenarios:
- What happens if invalid configuration values are passed?
- Can your plugin transfer events to the destination?
- Does
#write
(or#try_write
) get called properly? (for a buffered plugin) - Is your
#format
method working as expected? (for a buffered plugin)
How to Use Test Drivers
Plugin test driver provides dummy router, logger and feature to override system configurations, and configuration parser and others to make it easy to test configuration errors or others.
Lifecycle of plugins and test drivers is:
- Instantiate plugin driver (and it instantiates plugin)
- Configure plugin
- Register conditions to stop/break running tests
- Run test code (provided as a block for
d.run
) - Assert results of tests by data provided from driver
Test drivers calls methods for plugin lifecycles at the beginning of 4. (#start
) and the end of 4. (#stop
, #shutdown
, …). It can be skipped by optional arguments of #run
. See Testing API for plugins for details.
For configuration tests, repeat 1-2. For full feature tests, repeat 1-5. Test drivers and helper methods will support it.
Example Test Code
The following is a more convoluted example involving test drivers. A good first step is to modify the following code according to your specific needs.
# test/plugin/test_out_your_own.rb require 'test/unit' require 'fluent/test/driver/output' require 'fluent/test/helpers' # for event_time() # your own plugin require 'fluent/plugin/out_your_own' class YourOwnOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup # this is required to setup router and others end # default configuration for tests CONFIG = %[ param1 value1 param2 value2 ] def create_driver(conf = CONFIG) Fluent::Test::Driver::Output.new(Fluent::Plugin::YourOwnOutput).configure(conf) end sub_test_case 'configured with invalid configurations' do test 'param1 should reject too short string' do assert_raise Fluent::ConfigError do create_driver(%[ param1 a ]) end end test 'param2 is set correctly' do d = create_driver assert_equal "value2", d.instance.param2 end # ... end sub_test_case 'tests for #write' do test 'to test #write' do d = create_driver t = event_time("2016-06-10 19:46:32 +0900") d.run do d.feed("tag", t, {"message" => "this is test message", "amount" => 53}) end assert{ check_write_of_plugin_called_and_its_result() } end end sub_test_case 'tests for #format' do test 'to test #format' do d = create_driver t = event_time("2016-06-10 19:46:32 +0900") d.run do d.feed("tag", t, {"message" => "this is test message", "amount" => 53}) end rows = d.formatted # this returns array of result of #format assert_equal "expected format result", rows[0] # of course, you can check #format and #write at once assert{ check_write_of_plugin_called_and_its_result() } end end # ... end
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is a open source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.