How to Write Output Plugin
Extend Fluent::Plugin::Output
class and implement its methods.
The exact set of methods to be implemented is dependent on the design of the plugin. The following is a general template for writing a custom output plugin:
Three Modes of Output Plugins
Output plugins have three operational modes. Each mode has a set of interfaces to implement. An output plugin must support (at least) one of these three modes.
Non-Buffered Mode
Sync-Buffered Mode
Async-Buffered Mode
Non-Buffered Mode
This is the simplest mode. The output plugin transfers events to the destination immediately after receiving them. It does not use any buffer and never attempts to retry on errors.
For example, the built-in plugin out_stdout
normally uses this mode. It just dumps events to the standard output without maintaining any state.
This mode is available when the #process
method is implemented.
Sync-Buffered Mode
In this mode, the output plugin temporarily stores events in a buffer and send them later. The exact buffering behavior is fine-tunable through configuration parameters.
The most notable benefit of this mode is that it enables you to leverage the native retry mechanism. The errors such as network failures are transparently handled by Fluentd and you do not need to implement an error-handling mechanism by yourself.
This mode is available when the #write
method is implemented.
Async-Buffered Mode
In this mode, the output plugin temporarily stores events in a buffer and send them later. The major difference with the synchronous mode is that this mode allows you to defer the acknowledgment of transferred records. For example, you can implement the at-least-once semantics using this mode.
Please read "How To Use Asynchronous Buffered Mode" for details.
This mode is available when the #try_write
method is implemented.
How Fluentd Chooses Modes
From the users' perspective, the <buffer>
section enables buffering. An output plugin will use the buffered mode if available or fail otherwise.
However, from the plugin developer's viewpoint, it is a bit different.
See the full chart here showing how Fluentd chooses a mode:
Fluentd v0.14 Plugin API Details from SATOSHI TAGOMORI
This is the rule:
If
<buffer>
section is configured:plugin tries to do buffering
Else if plugin implements both methods for buffered/non-buffered:
plugin calls
#prefer_buffer_processing
to decide (true
means to dobuffering: default is
true
)
Else plugin does as implemented.
When plugin does buffering:
If plugin implements both Sync/Async buffered methods:
plugin calls
#prefer_delayed_commit
to decide (true
means to use delayedcommit: default is
true
)
Else plugin does as implemented.
It is important to note that the methods that decide modes are called in #start
after #configure
. So, the plugins can decide the default behavior using configured parameters by overriding #prefer_buffer_processing
and #prefer_delayed_commit
methods.
How Buffers Work
Understanding Chunking and Metadata
For more details, see Buffer section.
Fluentd creates buffer chunks to store events. Each buffer chunks should be written at once, without any re-chunking. In Fluentd v1.0, users can specify chunk keys by themselves using <buffer CHUNK_KEYS>
section.
Example:
When time
is specified as the chunk key, timekey
must also be specified in the buffer section. If it is configured as 15m
(15 minutes), the events between 00:00:00
and 00:14:59
will be in a chunk. An event at 00:15:00
will be in a different chunk.
Users can specify two or more chunk keys as a list of items separated by comma (',').
Example:
Too many chunk keys means too many small chunks and that may result in too many open files in a directory for a file buffer.
Metadata, fetched by chunk.metadata
, contains values for chunking. If time
is specified, metadata contains a value in metadata.timekey
. Otherwise, metadata.timekey
is nil
. See chunk.metadata
for details.
In most cases, plugin authors do not need to consider these values. If you want to use these values to generate any paths, keys, table/database names, etc., you can use #extract_placeholders
method.
The above method call should be performed by the plugin. And, the plugin that supports this feature should explain which configuration parameter accepts placeholders in its documentation.
How To Control Buffering
Buffer chunks may be flushed due to the following reasons:
its size reaches the limit of bytesize or the maximum number of records
its
timekey
is expired andtimekey_wait
lapsedit lives longer than
flush_interval
it appends data when
flush_mode
isimmediate
Buffer configuration provides flush_mode
to control the mode. Here are its supported values and default behavior:
lazy
: 1 and 2 are enabledinterval
: 1, 2 and 3 are enabledimmediate
: 4 is enabled
Default is lazy
if time
is specified as the chunk key, interval
otherwise. If flush_mode
is explicitly configured, the plugin used the configured mode.
How to Change the Default Values for Parameters
Some configuration parameters are in fluent/plugin/output.rb
and some are in fluent/plugin/buffer.rb
. These parameters have been designed to work for most use cases. However, there may be need to override the default values of some parameters in plugins.
To change the default chunk keys and the limit of buffer chunk bytesize of a custom plugin, configure like this:
This overriding is valid for this plugin only!
Development Guide
How To Use Asynchronous Buffered Mode and Delayed Commit
Plugins must call #commit_write
in async buffered mode. It is called after some checks and it waits for the completion on destination side, so #try_write
should NOT block its processing. The best practice is to create a dedicated thread or timer for it when the plugin starts.
Example:
The plugin can perform writing of data and checking/committing completion in parallel, and can use CPU resources more efficiently.
If you are sure that writing of data succeeds right after writing/sending data, you should use sync buffered output instead. Async mode is for destinations that we cannot check immediately after sending data whether it succeeded or not.
How to Customize the Serialization Format for Chunks
The serialization format to store events in a buffer may be customized by overriding #format
method.
Generally, it is not needed. Fluentd has its own serialization format and there are many benefits to just use the default one. For example, it transparently allows you to iterate through records via chunk.each
.
An exceptional case is when the chunks need to sent to the destination without any further processing. For example, out_file
overrides #format
so that it can produce chunk files that exactly look like the final outputs. By doing this, out_file
can flush data just by moving chunk files to the destination path.
For further details, read the interface definition of the #format
method below.
List of Interface Methods
Some methods should be overridden/implemented by the plugins. On the other hand, plugins MUST NOT override methods without any mention.
#process(tag, es)
#process(tag, es)
The method for non-buffered output mode. A plugin that supports non-buffered output must implement this method.
tag
: a string, represents tag of events. Events ines
have the sametag
.es
: an event stream (Fluent::EventStream
)
Return values will be ignored.
#write(chunk)
#write(chunk)
The method for sync buffered output mode. A plugin that supports sync buffered output must implement this method.
chunk
: a buffer chunk (Fluent::Plugin::Buffer::Chunk
)
This method will execute in parallel when flush_thread_count
is greater than 1
. So, if your plugin modifies an instance variable in this method, you need to synchronize its access using a Mutex
or some other synchronization facility to avoid the broken state.
Return values will be ignored.
#try_write(chunk)
#try_write(chunk)
The method for async buffered output mode. The plugin which supports async buffered output must implement this method.
chunk
: a buffer chunk (Fluent::Plugin::Buffer::Chunk)
This method will be executed in parallel when flush_thread_count
is larger than 1. So if your plugin modifies instance variables in this method, you need to protect it with Mutex
or similar to avoid broken state.
Return values will be ignored.
#format(tag, time, record)
#format(tag, time, record)
The method for custom formatting of buffer chunks. A plugin that uses a custom format to format buffer chunks must implement this method.
tag
: aString
represents tag of eventstime
: aFluent::EventTime
object or anInteger
representing Unixtimestamp (seconds from Epoch)
record
: aHash
with String keys
Return value must be a String
.
#prefer_buffered_processing
#prefer_buffered_processing
It specifies whether to use buffered or non-buffered output mode as the default when both methods are implemented. True
means buffered output.
Return value must be true
or false
.
#prefer_delayed_commit
#prefer_delayed_commit
It specifies whether to use asynchronous or synchronous output mode when both methods are implemented. True
means asynchronous buffered output.
Return value must be true
or false
.
#extract_placeholders(str, chunk)
#extract_placeholders(str, chunk)
It extracts placeholders in the given string using the values in chunk.
str
: aString
containing placeholderschunk
: aFluent::Plugin::Buffer::Chunk
viawrite
/try_write
Return value is a String
.
#commit_write(chunk_id)
#commit_write(chunk_id)
It tells Fluentd that the specified chunk should be committed. That chunk will be purged after this method call.
chunk_id
: aString
, brought bychunk.unique_id
This method has some other optional arguments, but those are for internal use.
#rollback_write(chunk_id)
#rollback_write(chunk_id)
It tells Fluentd that it should retry writing buffer chunk specified in the argument. Plugins can call this method explicitly to retry writing chunks, or they can just leave that chunk ID until timeout.
chunk_id
: aString
, brought bychunk.unique_id
#dump_unique_id_hex(chunk_id)
#dump_unique_id_hex(chunk_id)
It dumps buffer chunk IDs. Buffer chunk ID is a String
, but the value may include non-printable characters. This method formats chunk IDs as printable strings that can be used for logging purposes.
chunk_id
: aString
, brought bychunk.unique_id
Return value is a String
.
es.size
es.size
EventStream#size
returns an Integer
representing the number of events in the event stream.
es.each(&block)
es.each(&block)
EventStream#each
receives a block argument and call it for each event (time
and record
).
Example:
time
: aFluent::EventTime
object or anInteger
representing Unixtimestamp (seconds from Epoch)
record
: aHash
with String keys
chunk.unique_id
chunk.unique_id
Returns a String
representing a unique ID for buffer chunk.
The returned String
may include non-printable characters, so use #dump_unique_id_hex
method to print it in logs or other purposes.
chunk.metadata
chunk.metadata
Returns a Fluent::Plugin::Buffer::Metadata
object containing values for chunking.
Available fields of metadata
are:
timekey
: anInteger
representing Unix timestamp, which is equal to thefirst second of
timekey
range (nil
iftime
is not specified)tag
: aString
representing event tag (nil
iftag
is not specified)variables
: aHash
with Symbol keys, containing other keys of chunk keysand values for these (
nil
if any other chunk keys are specified)
For example, chunk.metadata.timekey
returns an Integer
. chunk.metadata.variables[:name]
returns an Object
if name
is specified as a chunk key.
chunk.size
chunk.size
Returns an Integer
representing the bytesize of chunks.
chunk.read
chunk.read
Reads all the content of the chunk and returns it as a String
.
Unlike Ruby's IO object, this method has no arguments.
chunk.open(&block)
chunk.open(&block)
Receives a block and calls it with an IO argument that provides read operations.
io
: a readable IO object
chunk.write_to(io)
chunk.write_to(io)
Writes entire data into an IO object.
io
: a writable IO object
chunk.each(&block)
This method is available only for standard format buffer chunks, and to provide iteration for events in buffer chunks.
time
: aFluent::EventTime
object or anInteger
representing Unixtimestamp (seconds from Epoch)
record
: aHash
with String keys
How to Write Tests
This section explains how to write a test suite for a custom output plugin.
Plugin Testing Overview
To write a test suite, you need to define a class inheriting from Test::Unit::TestCase
. The basic structure is similar to any other Test::Unit
-based test codes.
Here is a minimum example:
Please take notice of Fluent::Test.setup
in the setup
function. This function sets up a number of dummy proxies that are convenient for most testing scenarios. So, do not forget to call it!
When you write a test suite for your plugin, please try to cover the following scenarios:
What happens if the configuration is invalid?
Can the plugin transfer events to the destination?
Does
#write
(or#try_write
) get called properly? (for a buffered plugin)Is your
#format
method working as expected? (for a buffered plugin)
How to Use Test Drivers
To make testing easy, the plugin test driver provides a dummy router, a logger and the functionality to override the system and parser configurations, etc.
The lifecycle of plugin and test driver is:
Instantiate plugin driver which then instantiates the plugin
Configure plugin
Register conditions to stop/break running tests
Run test code (provided as a block for
d.run
)Assert results of tests by data provided from driver
Test driver calls methods for plugin lifecycle at the beginning of Step # 4 (i.e. #start
) and the end (i.e. #stop
, #shutdown
, etc.). It can be skipped by optional arguments of #run
.
See Testing API for Plugins for details.
For:
configuration tests, repeat steps # 1-2
full feature tests, repeat steps # 1-5
Example Test Code
The following is a more convoluted example involving test drivers. A good first step is to modify the following code according to your own specific needs.
If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is an open-source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.
Last updated