Skip to main content

How To Build a Pipeline?

Introduction

A pipeline is the entrypoint for logs in FlowG. Logs can be ingested via:

  • the REST API on a specific pipeline's endpoint
  • the Syslog Server endpoint (UDP, TCP, or TCP+TLS)

As such, a pipeline flow will always have 2 root nodes:

  • DIRECT: for logs ingested via the pipeline's API endpoint
  • SYSLOG: for logs received via the Syslog endpoint

From those nodes, you are able to add the following types of node:

  • Transform nodes: Call a transformer to refine the log record and pass the result to the next nodes
  • Switch nodes: Pass the log record to the next nodes only if it matches the node's filter
  • Pipeline nodes: Pass the log record to another pipeline
  • Forward nodes: Send the log to a third-party service
  • Router nodes: Store the log record into a stream

Using those nodes, a pipeline is able to parse, split, refine, enrich and route log records to the database.

Root nodes

Direct node

POST /api/v1/pipelines/default/logs
{
"record": {
"foo": "bar"
}
}
Direct node

{
"timestamp": "...",
"fields": {
"foo": "bar"
}
}

Syslog node (RFC3164)

logger -n localhost -P 5514 -d -t myapp --rfc3164 "hello world"
Syslog node

{
"timestamp": "...",
"fields": {
"timestamp": "...",
"client": "...",
"hostname": "...",
"tls_peer": "...",
"tag": "myapp",
"content": "hello world",
"priority": "13",
"facility": "1",
"severity": "6"
}
}

Syslog node (RFC5424)

logger -n localhost -P 5514 -d -t myapp --rfc5424 "hello world"
Syslog node

{
"timestamp": "...",
"fields": {
"timestamp": "...",
"client": "...",
"hostname": "...",
"tls_peer": "...",
"app_name": "myapp",
"message": "hello world",
"priority": "13",
"facility": "1",
"severity": "6",
"version": "1",
"msg_id": "",
"proc_id": "",
"structured_data": "[key key=\"value\"]",
}
}

NB: For more inforation about Syslog priorities, facilities and severities, please refer to this page.

Transformer nodes

Let's assume we have a transformer named from-json with the following source code:

. = parse_json!(.content)
{
"timestamp": "...",
"fields": {
"content": "{\"foo\": \"bar\"}"
}
}

Transformer node

{
"timestamp": "...",
"fields": {
"foo": "bar"
}
}

Switch nodes

{
"timestamp": "...",
"fields": {
"tag": "myapp",
"content": "hello world"
}
}

Switch node

{
"timestamp": "...",
"fields": {
"tag": "myapp",
"content": "hello world"
}
}
{
"timestamp": "...",
"fields": {
"tag": "notmyapp",
"content": "hello world"
}
}

Switch node

Pipeline nodes

The following are equivalent:

{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}

Pipeline node

OR

POST /api/v1/pipelines/default/logs
{
"record": {
"content": "hello world"
}
}

Forwarder nodes

Let's assume we have configured a Webhook Forwarder named zapier with the following webhook URL:

http://example-zapier-webhook.com

The following are equivalent:

{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}

Forwarder node

OR

POST http://example-zapier-webhook.com
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}

Router nodes

This will store the log record into the default stream:

{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}

Router node