How To Build a Pipeline?
Introduction
A pipeline is the entrypoint for logs in FlowG. Logs can be ingested via:
- the REST API on a specific pipeline's endpoint
- the Syslog Server endpoint (UDP, TCP, or TCP+TLS)
As such, a pipeline flow will always have 2 root nodes:
DIRECT
: for logs ingested via the pipeline's API endpointsSYSLOG
: for logs received via the Syslog endpoint
From those nodes, you are able to add the following types of node:
- Transform nodes: Call a transformer to refine the log record and pass the result to the next nodes
- Switch nodes: Pass the log record to the next nodes only if it matches the node's filter
- Pipeline nodes: Pass the log record to another pipeline
- Forward nodes: Send the log to a third-party service
- Router nodes: Store the log record into a stream
Using those nodes, a pipeline is able to parse, split, refine, enrich and route log records to the database.
Root nodes
Direct node (Structured Logs)
POST /api/v1/pipelines/default/logs/struct
Content-Type: application/json
{
"records": [
{
"foo": "bar"
}
]
}
⇒
{
"timestamp": "...",
"fields": {
"foo": "bar"
}
}
Direct node (Textual logs)
POST /api/v1/pipelines/default/logs/text
Content-Type: text/plain
hello
world
⇒
{
"timestamp": "...",
"fields": {
"content": "hello"
}
}
{
"timestamp": "...",
"fields": {
"content": "world"
}
}
Direct node (OpenTelemetry JSON logs)
POST /api/v1/pipelines/default/logs/otlp
Content-Type: application/json
{
"resource_logs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {"stringValue": "myapp"}
}
]
},
"scope_logs": [
{
"log_records": [
{
"time_unix_nano": "...",
"severity_number": 9,
"severity_text": "INFO",
"body": {"stringValue": "hello world"},
"attributes": [
{
"key": "foo",
"value": {"stringValue": "bar"}
}
]
}
]
}
]
}
]
}
⇒
{
"timestamp": "...",
"fields": {
"severity_number": "9",
"severity_text": "INFO",
"body": "hello world",
"dropped_attribute_count": "0",
"flags": "0",
"trace_id": "...",
"span_id": "...",
"event_name": "...",
"observed_time_unix_nano": "...",
"time_unix_nano": "...",
"attr.service.name": "myapp",
"attr.foo": "bar"
}
}
Direct node (OpenTelemetry Protobuf logs)
POST /api/v1/pipelines/default/logs/otlp
Content-Type: application/x-protobuf
...
⇒
{
"timestamp": "...",
"fields": {
"severity_number": "9",
"severity_text": "INFO",
"body": "hello world",
"dropped_attribute_count": "0",
"flags": "0",
"trace_id": "...",
"span_id": "...",
"event_name": "...",
"observed_time_unix_nano": "...",
"time_unix_nano": "...",
"attr.service.name": "myapp",
"attr.foo": "bar"
}
}
Syslog node (RFC3164)
logger -n localhost -P 5514 -d -t myapp --rfc3164 "hello world"
⇒
{
"timestamp": "...",
"fields": {
"timestamp": "...",
"client": "...",
"hostname": "...",
"tls_peer": "...",
"tag": "myapp",
"content": "hello world",
"priority": "13",
"facility": "1",
"severity": "6"
}
}
Syslog node (RFC5424)
logger -n localhost -P 5514 -d -t myapp --rfc5424 "hello world"
⇒
{
"timestamp": "...",
"fields": {
"timestamp": "...",
"client": "...",
"hostname": "...",
"tls_peer": "...",
"app_name": "myapp",
"message": "hello world",
"priority": "13",
"facility": "1",
"severity": "6",
"version": "1",
"msg_id": "",
"proc_id": "",
"structured_data": "[key key=\"value\"]",
}
}
NB: For more inforation about Syslog priorities, facilities and severities, please refer to this page.
Transformer nodes
Let's assume we have a transformer named from-json
with the following source
code:
. = parse_json!(.content)
{
"timestamp": "...",
"fields": {
"content": "{\"foo\": \"bar\"}"
}
}
⇓
⇓
{
"timestamp": "...",
"fields": {
"foo": "bar"
}
}
Switch nodes
{
"timestamp": "...",
"fields": {
"tag": "myapp",
"content": "hello world"
}
}
⇓
⇓
{
"timestamp": "...",
"fields": {
"tag": "myapp",
"content": "hello world"
}
}
{
"timestamp": "...",
"fields": {
"tag": "notmyapp",
"content": "hello world"
}
}
⇓
⇓
☓
Pipeline nodes
The following are equivalent:
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}
⇒
OR
POST /api/v1/pipelines/default/logs/struct
{
"records": [
{
"content": "hello world"
}
]
}
Forwarder nodes
Let's assume we have configured a Webhook Forwarder named zapier
with the
following webhook URL:
http://example-zapier-webhook.com
The following are equivalent:
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}
⇒
OR
POST http://example-zapier-webhook.com
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}
Router nodes
This will store the log record into the default
stream:
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}
⇒