Skip to main content

How To Build a Pipeline?

Introduction

A pipeline is the entrypoint for logs in FlowG. Logs can be ingested via:

  • the REST API on a specific pipeline's endpoint
  • the Syslog Server endpoint (UDP, TCP, or TCP+TLS)

As such, a pipeline flow will always have 2 root nodes:

  • DIRECT: for logs ingested via the pipeline's API endpoints
  • SYSLOG: for logs received via the Syslog endpoint

From those nodes, you are able to add the following types of node:

  • Transform nodes: Call a transformer to refine the log record and pass the result to the next nodes
  • Switch nodes: Pass the log record to the next nodes only if it matches the node's filter
  • Pipeline nodes: Pass the log record to another pipeline
  • Forward nodes: Send the log to a third-party service
  • Router nodes: Store the log record into a stream

Using those nodes, a pipeline is able to parse, split, refine, enrich and route log records to the database.

Root nodes

Direct node (Structured Logs)

POST /api/v1/pipelines/default/logs/struct
Content-Type: application/json
{
"records": [
{
"foo": "bar"
}
]
}
Direct node

{
"timestamp": "...",
"fields": {
"foo": "bar"
}
}

Direct node (Textual logs)

POST /api/v1/pipelines/default/logs/text
Content-Type: text/plain
hello
world
Direct node

{
"timestamp": "...",
"fields": {
"content": "hello"
}
}
{
"timestamp": "...",
"fields": {
"content": "world"
}
}

Direct node (OpenTelemetry JSON logs)

POST /api/v1/pipelines/default/logs/otlp
Content-Type: application/json
{
"resource_logs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {"stringValue": "myapp"}
}
]
},
"scope_logs": [
{
"log_records": [
{
"time_unix_nano": "...",
"severity_number": 9,
"severity_text": "INFO",
"body": {"stringValue": "hello world"},
"attributes": [
{
"key": "foo",
"value": {"stringValue": "bar"}
}
]
}
]
}
]
}
]
}
Direct node

{
"timestamp": "...",
"fields": {
"severity_number": "9",
"severity_text": "INFO",
"body": "hello world",
"dropped_attribute_count": "0",
"flags": "0",
"trace_id": "...",
"span_id": "...",
"event_name": "...",
"observed_time_unix_nano": "...",
"time_unix_nano": "...",
"attr.service.name": "myapp",
"attr.foo": "bar"
}
}

Direct node (OpenTelemetry Protobuf logs)

POST /api/v1/pipelines/default/logs/otlp
Content-Type: application/x-protobuf
...
Direct node

{
"timestamp": "...",
"fields": {
"severity_number": "9",
"severity_text": "INFO",
"body": "hello world",
"dropped_attribute_count": "0",
"flags": "0",
"trace_id": "...",
"span_id": "...",
"event_name": "...",
"observed_time_unix_nano": "...",
"time_unix_nano": "...",
"attr.service.name": "myapp",
"attr.foo": "bar"
}
}

Syslog node (RFC3164)

logger -n localhost -P 5514 -d -t myapp --rfc3164 "hello world"
Syslog node

{
"timestamp": "...",
"fields": {
"timestamp": "...",
"client": "...",
"hostname": "...",
"tls_peer": "...",
"tag": "myapp",
"content": "hello world",
"priority": "13",
"facility": "1",
"severity": "6"
}
}

Syslog node (RFC5424)

logger -n localhost -P 5514 -d -t myapp --rfc5424 "hello world"
Syslog node

{
"timestamp": "...",
"fields": {
"timestamp": "...",
"client": "...",
"hostname": "...",
"tls_peer": "...",
"app_name": "myapp",
"message": "hello world",
"priority": "13",
"facility": "1",
"severity": "6",
"version": "1",
"msg_id": "",
"proc_id": "",
"structured_data": "[key key=\"value\"]",
}
}

NB: For more inforation about Syslog priorities, facilities and severities, please refer to this page.

Transformer nodes

Let's assume we have a transformer named from-json with the following source code:

. = parse_json!(.content)
{
"timestamp": "...",
"fields": {
"content": "{\"foo\": \"bar\"}"
}
}

Transformer node

{
"timestamp": "...",
"fields": {
"foo": "bar"
}
}

Switch nodes

{
"timestamp": "...",
"fields": {
"tag": "myapp",
"content": "hello world"
}
}

Switch node

{
"timestamp": "...",
"fields": {
"tag": "myapp",
"content": "hello world"
}
}
{
"timestamp": "...",
"fields": {
"tag": "notmyapp",
"content": "hello world"
}
}

Switch node

Pipeline nodes

The following are equivalent:

{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}

Pipeline node

OR

POST /api/v1/pipelines/default/logs/struct
{
"records": [
{
"content": "hello world"
}
]
}

Forwarder nodes

Let's assume we have configured a Webhook Forwarder named zapier with the following webhook URL:

http://example-zapier-webhook.com

The following are equivalent:

{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}

Forwarder node

OR

POST http://example-zapier-webhook.com
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}

Router nodes

This will store the log record into the default stream:

{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}

Router node