How To Build a Pipeline?
Introduction
A pipeline is the entrypoint for logs in FlowG. Logs can be ingested via:
- the REST API on a specific pipeline's endpoint
- the Syslog Server endpoint (UDP, TCP, or TCP+TLS)
As such, a pipeline flow will always have 2 root nodes:
DIRECT
: for logs ingested via the pipeline's API endpointSYSLOG
: for logs received via the Syslog endpoint
From those nodes, you are able to add the following types of node:
- Transform nodes: Call a transformer to refine the log record and pass the result to the next nodes
- Switch nodes: Pass the log record to the next nodes only if it matches the node's filter
- Pipeline nodes: Pass the log record to another pipeline
- Forward nodes: Send the log to a third-party service
- Router nodes: Store the log record into a stream
Using those nodes, a pipeline is able to parse, split, refine, enrich and route log records to the database.
Root nodes
Direct node
POST /api/v1/pipelines/default/logs
{
"record": {
"foo": "bar"
}
}
⇒
{
"timestamp": "...",
"fields": {
"foo": "bar"
}
}
Syslog node (RFC3164)
logger -n localhost -P 5514 -d -t myapp --rfc3164 "hello world"
⇒
{
"timestamp": "...",
"fields": {
"timestamp": "...",
"client": "...",
"hostname": "...",
"tls_peer": "...",
"tag": "myapp",
"content": "hello world",
"priority": "13",
"facility": "1",
"severity": "6"
}
}
Syslog node (RFC5424)
logger -n localhost -P 5514 -d -t myapp --rfc5424 "hello world"
⇒
{
"timestamp": "...",
"fields": {
"timestamp": "...",
"client": "...",
"hostname": "...",
"tls_peer": "...",
"app_name": "myapp",
"message": "hello world",
"priority": "13",
"facility": "1",
"severity": "6",
"version": "1",
"msg_id": "",
"proc_id": "",
"structured_data": "[key key=\"value\"]",
}
}
NB: For more inforation about Syslog priorities, facilities and severities, please refer to this page.
Transformer nodes
Let's assume we have a transformer named from-json
with the following source
code:
. = parse_json!(.content)
{
"timestamp": "...",
"fields": {
"content": "{\"foo\": \"bar\"}"
}
}
⇓
⇓
{
"timestamp": "...",
"fields": {
"foo": "bar"
}
}
Switch nodes
{
"timestamp": "...",
"fields": {
"tag": "myapp",
"content": "hello world"
}
}
⇓
⇓
{
"timestamp": "...",
"fields": {
"tag": "myapp",
"content": "hello world"
}
}
{
"timestamp": "...",
"fields": {
"tag": "notmyapp",
"content": "hello world"
}
}
⇓
⇓
☓
Pipeline nodes
The following are equivalent:
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}
⇒
OR
POST /api/v1/pipelines/default/logs
{
"record": {
"content": "hello world"
}
}
Forwarder nodes
Let's assume we have configured a Webhook Forwarder named zapier
with the
following webhook URL:
http://example-zapier-webhook.com
The following are equivalent:
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}
⇒
OR
POST http://example-zapier-webhook.com
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}
Router nodes
This will store the log record into the default
stream:
{
"timestamp": "...",
"fields": {
"content": "hello world"
}
}
⇒