Fluentd
1.0
1.0
  • Introduction
  • Overview
    • Life of a Fluentd event
    • Support
    • FAQ
    • Logo
    • fluent-package v5 vs td-agent v4
  • Installation
    • Before Installation
    • Install fluent-package
      • RPM Package (Red Hat Linux)
      • DEB Package (Debian/Ubuntu)
      • .dmg Package (macOS)
      • .msi Installer (Windows)
    • Install calyptia-fluentd
      • RPM Package (Red Hat Linux)
      • DEB Package (Debian/Ubuntu)
      • .dmg Package (macOS)
      • .msi Installer (Windows)
    • Install by Ruby Gem
    • Install from Source
    • Post Installation Guide
    • Obsolete Installation
      • Treasure Agent v4 (EOL) Installation
        • Install by RPM Package v4 (Red Hat Linux)
        • Install by DEB Package v4 (Debian/Ubuntu)
        • Install by .dmg Package v4 (macOS)
        • Install by .msi Installer v4 (Windows)
      • Treasure Agent v3 (EOL) Installation
        • Install by RPM Package v3 (Red Hat Linux)
        • Install by DEB Package v3 (Debian/Ubuntu)
        • Install by .dmg Package v3 (macOS)
        • Install by .msi Installer v3 (Windows)
  • Configuration
    • Config File Syntax
    • Config File Syntax (YAML)
    • Routing Examples
    • Config: Common Parameters
    • Config: Parse Section
    • Config: Buffer Section
    • Config: Format Section
    • Config: Extract Section
    • Config: Inject Section
    • Config: Transport Section
    • Config: Storage Section
    • Config: Service Discovery Section
  • Deployment
    • System Configuration
    • Logging
    • Signals
    • RPC
    • High Availability Config
    • Performance Tuning
    • Multi Process Workers
    • Failure Scenarios
    • Plugin Management
    • Trouble Shooting
    • Fluentd UI
    • Linux Capability
    • Command Line Option
    • Source Only Mode
    • Zero-downtime restart
  • Container Deployment
    • Docker Image
    • Docker Logging Driver
    • Docker Compose
    • Kubernetes
  • Monitoring Fluentd
    • Overview
    • Monitoring by Prometheus
    • Monitoring by REST API
  • Input Plugins
    • tail
    • forward
    • udp
    • tcp
    • unix
    • http
    • syslog
    • exec
    • sample
    • monitor_agent
    • windows_eventlog
  • Output Plugins
    • file
    • forward
    • http
    • exec
    • exec_filter
    • secondary_file
    • copy
    • relabel
    • roundrobin
    • stdout
    • null
    • s3
    • kafka
    • elasticsearch
    • opensearch
    • mongo
    • mongo_replset
    • rewrite_tag_filter
    • webhdfs
    • buffer
  • Filter Plugins
    • record_transformer
    • grep
    • parser
    • geoip
    • stdout
  • Parser Plugins
    • regexp
    • apache2
    • apache_error
    • nginx
    • syslog
    • ltsv
    • csv
    • tsv
    • json
    • msgpack
    • multiline
    • none
  • Formatter Plugins
    • out_file
    • json
    • ltsv
    • csv
    • msgpack
    • hash
    • single_value
    • stdout
    • tsv
  • Buffer Plugins
    • memory
    • file
    • file_single
  • Storage Plugins
    • local
  • Service Discovery Plugins
    • static
    • file
    • srv
  • Metrics Plugins
    • local
  • How-to Guides
    • Stream Analytics with Materialize
    • Send Apache Logs to S3
    • Send Apache Logs to Minio
    • Send Apache Logs to Mongodb
    • Send Syslog Data to Graylog
    • Send Syslog Data to InfluxDB
    • Send Syslog Data to Sematext
    • Data Analytics with Treasure Data
    • Data Collection with Hadoop (HDFS)
    • Simple Stream Processing with Fluentd
    • Stream Processing with Norikra
    • Stream Processing with Kinesis
    • Free Alternative To Splunk
    • Email Alerting like Splunk
    • How to Parse Syslog Messages
    • Cloud Data Logging with Raspberry Pi
  • Language Bindings
    • Java
    • Ruby
    • Python
    • Perl
    • PHP
    • Nodejs
    • Scala
  • Plugin Development
    • How to Write Input Plugin
    • How to Write Base Plugin
    • How to Write Buffer Plugin
    • How to Write Filter Plugin
    • How to Write Formatter Plugin
    • How to Write Output Plugin
    • How to Write Parser Plugin
    • How to Write Storage Plugin
    • How to Write Service Discovery Plugin
    • How to Write Tests for Plugin
    • Configuration Parameter Types
    • Upgrade Plugin from v0.12
  • Plugin Helper API
    • Plugin Helper: Child Process
    • Plugin Helper: Compat Parameters
    • Plugin Helper: Event Emitter
    • Plugin Helper: Event Loop
    • Plugin Helper: Extract
    • Plugin Helper: Formatter
    • Plugin Helper: Inject
    • Plugin Helper: Parser
    • Plugin Helper: Record Accessor
    • Plugin Helper: Server
    • Plugin Helper: Socket
    • Plugin Helper: Storage
    • Plugin Helper: Thread
    • Plugin Helper: Timer
    • Plugin Helper: Http Server
    • Plugin Helper: Service Discovery
  • Troubleshooting Guide
  • Appendix
    • Update from v0.12 to v1
    • td-agent v2 vs v3 vs v4
Powered by GitBook
On this page
  • Background
  • Architecture
  • Prerequisites
  • Installing Fluentd Norikra Plugin
  • Installing Norikra
  • Verify Installation
  • Fluentd Configuration
  • HTTP Input
  • Norikra Output
  • Test
  • Registering Queries and Fetching Outputs
  • Conclusion
  • Learn More

Was this helpful?

  1. How-to Guides

Stream Processing with Norikra

PreviousSimple Stream Processing with FluentdNextStream Processing with Kinesis

Last updated 4 months ago

Was this helpful?

This article explains how to use and to create a SQL-based realtime complex event processing platform.

Background

is an advanced open-source log collector originally developed at . Fluentd is not only a log collector, but also an all-purpose stream processing platform. Plugins can be written to handle many kinds of events.

However, Fluentd is not primarily designed for stream processing. We must restart Fluentd after making modifications to its configuration/code, making it unsuitable for running both short-span (seconds or minutes) calculations and long-span (hours or days) calculations. If we restart Fluentd to perform a short-span calculation, all existing internal statuses of short and long span calculations are lost. For large scale stream processing platforms, code/processes must be added/removed without any such losses.

is an open-source stream processing server based on by . It allows you to subscribe/unsubscribe to data streams anytime and add/remove SQL queries anytime. is written by , a committer of the Fluentd project.

This article will show you how to integrate , , and the to create a robust stream data processing platform.

Architecture

The figure below shows the high-level architecture:

Prerequisites

The following software/services are required to be set up correctly:

You can install Fluentd via major packaging systems.

Installing Fluentd Norikra Plugin

If out_norikra (fluent-plugin-norikra) is not installed yet, please install it manually.

Installing Norikra

Once JRuby is set up, install Norikra:

jgem install norikra

Verify Installation

We'll start the Norikra server after installation. The norikra start command will launch the Norikra server in your console.

....
2014-05-20 20:36:01 +0900 [INFO] : Loading UDF plugins
2014-05-20 20:36:01 +0900 [INFO] : RPC server 0.0.0.0:26571, 2 threads
2014-05-20 20:36:01 +0900 [INFO] : WebUI server 0.0.0.0:26578, 2 threads
2014-05-20 20:36:01 +0900 [INFO] : Norikra server started.

Fluentd Configuration

We'll now configure Fluentd. If you used the deb/rpm package, Fluentd's config file is located at /etc/fluent/fluentd.conf.

HTTP Input

For the input source, we will set up Fluentd to accept records from HTTP. The Fluentd configuration file should look like this:

<source>
  @type http
  port 8888
</source>

Norikra Output

The output destination will be Norikra. The output configuration should look like this:

<match data.*>
  @type norikra
  norikra localhost:26571
  target_map_tag true
  remove_tag_prefix data

  <default>
    include *
    exclude time
  </default>
</match>

The <match> section specifies the glob pattern used to look for the matching tags. If the tag of a log is matched, the respective match configuration is used (i.e. the log is routed accordingly).

The norikra attribute specifies the Norikra server's RPC host and port (default: 26571). By target_map_tag true and remove_tag_prefix data, out_norikra handle the rest of tags (e.g. foo for data.foo) as the target, which is the name of the set of events as same as table name of RDBMS.

Test

To test the configuration, just post the JSON to Fluentd (we use the curl command in this example):

$ curl -X POST -d 'json={"action":"login","user":2}' \
  http://localhost:8888/data.access

Norikra's console log will show that Fluentd has opened the target access and sent a message with fields of action and user.

2014-05-20 20:43:22 +0900 [INFO] : opening target, target:"access", fields:{}, auto_field:true
2014-05-20 20:43:23 +0900 [INFO] : opening lazy target, target:#<Norikra::Target:0x69c04611 @last_modified=nil, @fields={}, @name="access", @auto_field=true>
2014-05-20 20:43:23 +0900 [INFO] : target successfully opened (snip)

We can check its fields with norikra-client command from the console that has the PATH to JRuby:

$ norikra-client target list
TARGET  AUTO_FIELD
access  true
1 targets found.
$ norikra-client field list access
FIELD   TYPE    OPTIONAL
action  string  false
user    integer false
2 fields found.

Registering Queries and Fetching Outputs

We can add queries on opened targets via the WebUI or CLI. The following query (just SQL!) counts the number of events with a non-zero user per 10 second interval, with a 'group by' action:

SELECT
  action,
  count(*) AS c
FROM access.win:time_batch(10 sec)
WHERE user != 0
GROUP BY action

To register a query, issue norikra-client query add on the CLI:

$ norikra-client query add test_query "SELECT action, count(*) AS c FROM access.win:time_batch(10 sec) WHERE user != 0 GROUP BY action"
$ norikra-client query list
NAME    GROUP   TARGETS QUERY
test_query  default access  SELECT action, count(*) AS c FROM access.win:time_batch(10 sec) WHERE user != 0 GROUP BY action
1 queries found.

Once the query has been registered, post the events that you want:

$ curl -X POST -d 'json={"action":"login","user":2}' \
  http://localhost:8888/data.access
$ curl -X POST -d 'json={"action":"login","user":0}' \
  http://localhost:8888/data.access
$ curl -X POST -d 'json={"action":"write","user":2}' \
  http://localhost:8888/data.access
$ curl -X POST -d 'json={"action":"save","user":2}' \
  http://localhost:8888/data.access
$ curl -X POST -d 'json={"action":"logout","user":2}' \
  http://localhost:8888/data.access
$ curl -X POST -d 'json={"action":"logout","user":0}' \
  http://localhost:8888/data.access
$ curl -X POST -d 'json={"action":"login","user":2}' \
  http://localhost:8888/data.access

And fetch output events from this test_query query:

$ norikra-client event fetch test_query
{"time":"2014/05/20 21:00:24","c":1,"action":"logout"}
{"time":"2014/05/20 21:00:24","c":1,"action":"save"}
{"time":"2014/05/20 21:00:24","c":1,"action":"write"}
{"time":"2014/05/20 21:00:24","c":2,"action":"login"}
{"time":"2014/05/20 21:00:34","c":0,"action":"logout"}
{"time":"2014/05/20 21:00:34","c":0,"action":"save"}
{"time":"2014/05/20 21:00:34","c":0,"action":"write"}
{"time":"2014/05/20 21:00:34","c":0,"action":"login"}
$

If posts are done in 10 seconds, this query calculates all the events in first 10 seconds, and counts events per action for events with user != 0 only, and outputs events at "2014/05/20 21:00:24". At "2014/05/20 21:00:34", just after next 10 seconds, this query reports that no events arrived (These are teardown records, and reported only once).

Conclusion

We can create a stream data processing platform without any schema definitions, using Fluentd and Norikra. This platform enables an agile stream processing environment that can handle real workloads.

Learn More

See section how to install fluent-plugin-norikra on your environment.

Norikra requires JRuby. You can download the JRuby binary directly from the and export the PATH of JRUBY_INSTALL_DIRECTORY/bin.

You can also check the current Norikra's status via the WebUI ().

The <default> section specifies which fields are sent to the Norikra server. We can also specify these sets per target with <target NAME>...</target>. For more details, refer to .

If this article is incorrect or outdated, or omits critical information, please . is an open-source project under . All components are available under the Apache 2 License.

Fluentd
Fluentd Norikra Plugin
Norikra
Installation
Plugin Management
official site
http://localhost:26578/
fluent-plugin-norikra
Fluentd Architecture
Fluentd Get Started
Norikra: Query Syntax
Norikra: Query Examples
Slides: fluent-plugin-norikra
let us know
Fluentd
Cloud Native Computing Foundation (CNCF)
Fluentd
Norikra
Fluentd
Treasure Data, Inc
Norikra
Esper
EsperTech
Norikra
@tagomoris
Fluentd
Norikra
Fluentd norikra plugin
Fluentd + Norikra Overview