Versions | v0.14 (td-agent3)

This page is for v0.14, not the latest stable version which is v0.12. For the latest stable version of this article, click here.


Writing Buffered Output Plugins

This page is simply copied from v0.12 documents, and will be updated later.

Extend the Fluent::Plugin::Output class and implement the following methods. See Plugin Base Class API to show details of common API for all plugin types.

This example implements all methods for all output modes. In most cases, output plugins should implement one or some of these methods, not all.

require 'fluent/plugin/output'

module Fluent::Plugin
  class SomeOutput < Output
    Fluent::Plugin.register_output('some', self)

    helpers :thread  # for try_write

    config_param :path, :string

    # method for non-buffered output mode
    def process(tag, es)
      es.each do |time, record|
        # output events to ...
      end
    end

    # method for sync buffered output mode
    def write(chunk)
      real_path = extract_placeholders(@path, chunk.metadata)

      log.debug "writing data to file", chunk_id: dump_unique_id_hex(chunk.unique_id)

      # for standard chunk format (without #format method)
      chunk.each do |time, record|
        # output events to ...
      end

      ## for custom format (when #format implemented by itself)
      # File.open(real_path, 'w+')

      ## or #write_to(io) is available
      # File.open(real_path, 'w+') do |file|
      #   chunk.write_to(file)
      # end
    end

    # method for async buffered output mode
    def try_write(chunk)
      real_path = extract_placeholders(@path, chunk.metadata)

      log.debug "sending data to server", chunk_id: dump_unique_id_hex(chunk.unique_id)

      send_data_to_server(@host, real_path, chunk.read)

      chunk_id = chunk.unique_id

      # create a thread and check whether data is successfully sent or not
      thread_create(:check_send_result) do
        while thread_current_running?
          sleep SENDDATA_CHECK_INTERVAL # == 5

          if check_data_on_server(real_path, chunk_id)
            # commit chunk - chunk will be deleted and not be retried anymore by this call
            commit_write(chunk_id)
            break
          end
        end
      end
    end

    # method for custom format
    def format(tag, time, record)
      [tag, time, record].to_json
    end
  end
end

Table of Contents

Modes of Output Plugins

Output plugins has 3 mode for output, and 2 way for formatting buffer chunks (for buffered output). Plugins can implement one of these modes, or also can implement some/all of these modes. Users can choose a mode of these by configuration if 2 or more modes are available. Plugins can make a default (preferred) mode if users don’t select mode explicitly.

Output modes

Non-buffered output

Fluentd calls #process method immediately after input plugins emit events into Fluentd router. Output plugins should output these events to destinations, without any errors. Fluentd doesn’t retry for failures in this mode.

This mode is available when #process method is implemented.

Sync buffered output

Fluentd stores events in buffer chunks, then calls #write. Users can control how to split chunks and how often to write chunks. Output plugins should output these events to destinations, or raise errors if any failure occurs. Fluentd will retry to write chunks for errors.

This mode is available when #write method is implemented.

Async buffered output (Delayed Commit)

Fluentd stores events in buffer chunks, then calls #try_write. Users can control how to split chunks and how often to write chunks. Output plugins should output these events to destinations, and then call #commit_write to tell Fluentd core that write operation succeeded (it can be done from another threads). Fluentd will retry to write chunks when any errors are raised from #try_write or when chunks are not committed until timeout. Timeout seconds can be configured by delayed_commit_timeout parameter.

This mode is available when #try_write method is implemented.

Formatting modes

Standard format

Fluentd v0.14 has “standard” format for buffer chunks. If output plugins don’t implement #format method, Fluentd uses this standard format. Buffer chunks with this format are iterable by chunk.each method to extract events from chunk body.

Some tools will be provided with Fluentd core to read/extract buffer chunks formatted by standard format. If your plugin does iterate events from buffer chunks, it’s the best to use standard format.

Custom format

Fluentd uses custom format if plugins implement #format method. That method should return a String as representation of an event. Fluentd appends it into buffer chunk body.

Custom format is useful when output plugins write data into plain text file on local disks or distributed file systems.

How Fluentd Chooses Modes

From viewpoint of users: write <buffer> section to enable buffering - Output plugin will use buffered mode if available, or fail to launch if unavailable.

It’s a bit complex from viewpoint of plugin developers. Here’s full chart how Fluentd decides the mode:

Simply speaking, this is the rule:

  • If users configure <buffer> section:
    • plugin tries to do buffering
  • Else if plugin implements both of methods for buffered/non-buffered
    • plugin calls #prefer_buffer_processing to decide it (true means to do buffering: default true)
  • Else plugin does as implemented

When plugin does buffering:

  • If plugin implements both of Sync/Async buffered methods
    • plugin calls #prefer_delayed_commit to decide it (true means to use delayed commit: default true)
  • Else plugin does as implemented

It’s an important point that methods to decide modes will be called in #start methods, which is called after #configure. So plugins can decide the default behavior with configured parameter values by overriding these methods (#prefer_buffer_processing and #prefer_delayed_commit).

How To Use Asynchronous Buffered Mode and Delayed Commit

Plugins must call #commit_write method by itself in async buffered mode. It will be done after some checks or wait for completion in destination side, so #try_write should NOT block processing for it. The best practice is to create a thread or timer to do it when the plugin starts.

class MyAwesomeOutput < Output
  helpers :timer

  def start
    @waiting_ids_mutex = Mutex.new
    @waiting_ids = []

    timer_create(:awesome_delayed_checker, 5) do
      @waiting_ids_mutex.synchronize{ @waiting_ids.dup }.each do |chunk_id|
        if check_it_succeeded(chunk_id)
          commit_write(chunk_id)
          @waiting_ids_mutex.synchronize{ @waiting_ids.delete(chunk_id) }
        end
      end
    end
  end

  def try_write(chunk)
    chunk_id = chunk.unique_id
    send_to_destination(chunk)
    @waiting_ids.synchronize{ @waiting_ids << chunk_id }
  end
end

The plugin can perform writing data and checking/commiting completion in parallel, and can use CPU resources more effeciently.

If you are sure that writing data succeeded right after writing/sending data, you should use sync buffered output instead of async mode. Async mode is for destinations that we cannot check immediately whether operation succeeded or not.

How To Control Buffering

Buffer chunks will be flushed by some reason. Here’s the complete list:

  1. its size reaches to the limit of bytesize or number of records
  2. its timekey is expired and timekey_wait passes
  3. it lives longer than flush_interval
  4. it is appended any data and flush_mode is immediate

Buffer configuration has a parameter to control these mode, named flush_mode, which has 3 modes and default behavior.

  • lazy: 1 and 2 are enabled
  • interval: 1, 2 and 3 are enabled
  • immediate: 4 is enabled

Default is “lazy” if time is specified as chunk key. Otherwise, interval. If user specifies flush_mode explicitly, the plugin works as specified.

Understanding Chunking and Metadata

TODO: move some of below to users guide

Fluentd creates buffer chunks to store events. Each buffer chunks should be written at once, without any re-chunking in methods to write. In Fluentd v0.14, users can specify chunk keys by themselves using <buffer CHUNK_KEYS> section.

<buffer>         # without any chunk keys: all events will be appended into a chunk
<buffer time>    # events in a time unit will be appended into a chunk
                 #  (using ``timekey`` parameter)
<buffer tag>     # events with same tag will be appended into a chunk
<buffer any_key> # events with same value for specified key will be appended into a chunk

When user specifies time as a chunk key, user must specifies timekey parameter in buffer section. If it is “15m” (15 minutes), events between 00:00:00 and 00:14:59 will be in a chunk, and an event at 00:15:00 will be in another.

Users can specify 2 or more chunk keys as a list of items separated by comma (‘,’).

<buffer time,tag>
<buffer time,country>
<buffer tag,country,item>

Specifying too many chunk keys causes that there are too many small chunks. It might be a problem (too many files in a directory for a file buffer).

Metadata, which can be fetched by chunk.metadata, contains values for chunking. If a user specifies time, metadata contains a value in metadata.timekey. If a user doesn’t specify time, metadata.timekey is nil. See chunk.metadata for details.

In most cases, plugin authors don’t need to consider these values. If you want to use these values to generate any paths, keys, table/database names or others, you can use #extract_placeholders method.

@path = "/mydisk/mydir/${tag}/purchase.%Y-%m-%d.%H.log" # this value might come from configurations
extract_placeholders(@path, chunk.metadata) # => "/mydisk/mydir/service.cart/purshase.2016-06-10.23.log"

This method call should be done by code of plugins, so plugins which supports this feature should explain which configuration parameter accepts placeholders in documents.

Changing Parameter Defaults of Buffer Plugins

There are many configuration parameters in fluent/plugin/output.rb, and some others in fluent/plugin/buffer.rb. Almost parameters are designed to work well for many use cases, but we want to overwrite default values of some parameters in plugins.

If you want to change the default chunk keys and the limit of buffer chunk bytesize for your own plugin, you can do it like this:

:::ruby class MyAwesomeOutput < Output config_section :buffer do config_set_default :chunk_keys, [‘tag’] config_set_default :chunk_limit_size, 2 * 1024 * 1024 # 2MB end end

This default value overriding is valid only for your plugin, and doesn’t have any effect for others.

Methods

Some methods should be overridden / implemented by plugins. On the other hand, plugins MUST NOT override methods without any metions about it.

#process(tag, es)

The method for non-buffered output mode. The plugin which supports non-buffered output must implement this method.

  • tag: a string, represents tag of events. events in es have same tag.
  • es: an event stream (Fluent::EventStream)

Return values will be ignored.

#write(chunk)

The method for sync buffered output mode. The plugin which supports sync buffered output must implement this method.

  • chunk: a buffer chunk (Fluent::Plugin::Buffer::Chunk)

Return values will be ignored.

#try_write(chunk)

The method for async buffered output mode. The plugin which supports async buffered output must implement this method.

  • chunk: a buffer chunk (Fluent::Plugin::Buffer::Chunk)

Return values will be ignored.

#format(tag, time, record)

The method for custom format of buffer chunks. The plugin which uses custom format to format buffer chunks must implement this method.

  • tag: a String represents tag of events
  • time: a Fluent::EventTime object, or an Integer which represents unix timestamp (seconds from Epoch)
  • record: a Hash with String keys

Return value must be a String.

#prefer_buffered_processing

The method to specify whether to use buffered or non-buffered output mode in default when both methods are implemented. True means buffered output.

Return value must be true or false.

#prefer_delayed_commit

The method to specify whether to use asynchronous or synchronous output mode when both methods are implemented. True means asynchronous buffered output.

Return value must be true or false.

#extract_placeholders(str, metadata)

This method extract placeholders in given string, using values in metadata.

  • str: a String which contains placeholders
  • metadata: a Fluent::Plugin::Buffer::Metadata, brought by chunk.metadata

Return value is a String.

#commit_write(chunk_id)

This method is to tell Fluentd that specified chunk should be committed. That chunk will be purged after this method call.

  • chunk_id: a String, brought by chunk.unique_id

This method has some other optional arguments, but these are for internal use.

#rollback_write(chunk_id)

This method is to tell Fluentd that Fluentd should retry writing buffer chunk specified in argument. Plugins can call this method explicitly to retry writing chunks, or plugins also can leave that chunk id until timeout without calling this method.

  • chunk_id: a String, brought by chunk.unique_id

#dump_unique_id_hex(chunk_id)

This method is to dump buffer chunk ids. Buffer chunk id is a String, but the value includes non-printable characters. This method is to format chunk ids into printable strings, and be used for logging chunk ids.

  • chunk_id: a String, brought by chunk.unique_id

Return value is a String.

es.size

EventStream#size returns an Integer, which represents the number of events in the event stream.

es.each(&block)

EventStream#each receives a block argument, and call that block for each events (time and record).

es.each do |time, record|
  # ...
end
  • time: a Fluent::EventTime object, or an Integer which represents unix timestamp (seconds from Epoch)
  • record: a Hash with String keys

chunk.unique_id

Chunk#unique_id returns a String which represents unique id of buffer chunks.

Return value is a String. It includes non-printable characters, so use #dump_unique_id_hex method to print it in logs or other purposes.

chunk.metadata

Chunk#metadata returns a Fluent::Plugin::Buffer::Metadata object which contains values for chunking.

Available fields of metadata are:

  • timekey: contains an integer represents unix timestamp, which is equal to the first second of timekey range (nil if time isn’t specified)
  • tag: contains a string represents tag (nil if tag isn’t specified)
  • variables: contains a hash with symbol keys, which contains any other keys of chunk keys and values for these (nil if any other chunk keys are specified)

For example, chunk.metadata.timekey returns an Integer. chunk.metadata.variables[:name] returns an Object if name is specified as a chunk key.

chunk.size

Chunk#size returns an Integer, represents the bytesize of chunks.

chunk.read

Chunk#read reads all content of the chunk, and return it as a String.

this method doesn't get any arguments, unlinke Ruby's IO object.

chunk.open(&block)

Chunk#open receives a block, and call it with an IO argument which provides reade operations.

::ruby
chunk.open do |io|
  while data = io.read(100)
    # ...
  end
end
  • io: a readable IO object

chunk.write_to(io)

Chunk#write_to writes entire data into an IO object.

  • io: a writable IO object

chunk.each(&block)

This method is available only for standard format buffer chunks, and to provide iteration for events in buffer chunks.

chunk.each do |time, record|
  # ...
  # tag is available from chunk.metadata.tag if chunk_key is configured with tag
end
  • time: a Fluent::EventTime object, or an Integer which represents unix timestamp (seconds from Epoch)
  • record: a Hash with String keys

Writing Tests

Fluentd output plugin has just one or some points to be tested. Others (parsing configurations, controlling buffers, retries, flushes and many others) are controlled by Fluentd core.

Fluentd also provides test driver for plugins. You can write tests of your own plugins very easily:

# test/plugin/test_out_your_own.rb

require 'test/unit'
require 'fluent/test/driver/output'
require 'fluent/test/helpers' # for event_time()

# your own plugin
require 'fluent/plugin/out_your_own'

class YourOwnOutputTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup  # this is required to setup router and others
  end

  # default configuration for tests
  CONFIG = %[
    param1 value1
    param2 value2
  ]

  def create_driver(conf = CONFIG)
    Fluent::Test::Driver::Output.new(Fluent::Plugin::YourOwnOutput).configure(conf)
  end

  sub_test_case 'configured with invalid configurations' do
    test 'param1 should reject too short string' do
      assert_raise Fluent::ConfigError do
        create_driver(%[
          param1 a
        ])
      end
    end

    test 'param2 is set correctly' do
      d = create_driver
      assert_equal "value2", d.instance.param2
    end

    # ...
  end

  sub_test_case 'tests for #write' do
    test 'to test #write' do
      d = create_driver
      t = event_time("2016-06-10 19:46:32 +0900")
      d.run do
        d.feed("tag", t, {"message" => "this is test message", "amount" => 53})
      end

      assert{ check_write_of_plugin_called_and_its_result() }
    end
  end

  sub_test_case 'tests for #format' do
    test 'to test #format' do
      d = create_driver
      t = event_time("2016-06-10 19:46:32 +0900")
      d.run do
        d.feed("tag", t, {"message" => "this is test message", "amount" => 53})
      end

      rows = d.formatted # this returns array of result of #format
      assert_equal "expected format result", rows[0]

      # of course, you can check #format and #write at once
      assert{ check_write_of_plugin_called_and_its_result() }
    end
  end

  # ...
end

Overview of Tests

Testing for output plugins are mainly for:

  • Configuration/Validation checks for invalid configurations (about #configure)
  • Write method #write is called correctly
  • What #write does is correct
  • Write method #try_write is called correctly
  • What #try_write does is correct
  • Result of #format is correct

Plugin test driver provides dummy router, logger and feature to override system configurations, and configuration parser and others to make it easy to test configuration errors or others.

Lifecycle of plugins and test drivers is:

  1. Instantiate plugin driver (and it instantiates plugin)
  2. Configure plugin
  3. Register conditions to stop/break running tests
  4. Run test code (provided as a block for d.run)
  5. Assert results of tests by data provided from driver

Test drivers calls methods for plugin lifecycles at the beginning of 4. (#start) and the end of 4. (#stop, #shutdown, …). It can be skipped by optional arguments of #run. See Testing API for plugins for details.

For configuration tests, repeat 1-2. For full feature tests, repeat 1-5. Test drivers and helper methods will support it.

Last updated: 2016-06-13 06:11:23 UTC

Versions | v0.14 (td-agent3)

If this article is incorrect or outdated, or omits critical information, please let us know. Fluentd is a open source project under Cloud Native Computing Foundation (CNCF). All components are available under the Apache 2 License.

Interested in the Fluentd Newsletters?