.. _concept-log-pipeline:
.. _log.pipeline:

=============================
Understanding the Log Pipeline
=============================

.. meta::
   :description: How rsyslog processes logs through a pipeline of inputs, rulesets, transformations, queues, and outputs.
   :keywords: rsyslog, log pipeline, message pipeline, input, ruleset, action, queue, transform, json, mmjsonparse, mmjsontransform

.. summary-start

The log pipeline connects rsyslog inputs, rulesets, and actions: receive → parse/transform → deliver. This page introduces the concept with diagrams and a commented example.

.. summary-end

:Status: **Draft**
:Audience: intermediate
:Tier: conceptual core

Overview
--------
rsyslog processes logs through a **log pipeline** — internally known as the *message pipeline*.
An **input** receives events, a **ruleset** filters or transforms them, and **actions** deliver them.

.. mermaid::
   flowchart LR
     subgraph Ingest[Input stage]
       I1[imuxsock]
       I2[imjournal]
       I3["imtcp/imudp"]
       I4[imfile]
     end
     subgraph Logic[Ruleset (filter/transform)]
       F1[if/then conditions]
       P1[mmjsonparse]
       T1[mmjsontransform]
     end
     subgraph Delivery[Actions (outputs)]
       A1[omfile]
       A2[omfwd/omrelp]
       A3[omelasticsearch]
       A4[ompgsql]
     end
     I1 --> Logic
     I2 --> Logic
     I3 --> Logic
     I4 --> Logic
     Logic --> A1
     Logic --> A2
     Logic --> A3
     Logic --> A4

Queues connect these stages to provide buffering and concurrency.

.. mermaid::
   flowchart LR
     I[Input] -->|input queue| Q1[(queue)]
     Q1 --> R[Ruleset]
     R -->|action queue(s)| Q2[(queue)]
     Q2 --> A[Action(s)]

Pipeline stages
---------------
- **Input** – how rsyslog receives data (e.g., ``imuxsock``, ``imjournal``, ``imtcp``, ``imudp``, ``imfile``).
- **Ruleset** – processing logic (filters, parsing, transformation). Inputs can be assigned to different rulesets.
- **Actions** – destinations (``omfile``, ``omfwd``, ``omrelp``, ``omelasticsearch``, ``ompgsql``).
- **Queues** – buffers between stages; tune for throughput and reliability.

Example: parse and transform JSON, then write
---------------------------------------------
This example parses JSON from the raw payload, converts dotted keys into nested objects (``unflatten``), and writes **only** the structured subtree.

.. code-block:: rsyslog
   :caption: Input → transform → output (commented)

   # Output only the structured subtree we build during processing
   template(name="outfmt" type="subtree" subtree="$!qradar_structured")

   # Load parser/transformer modules used in this pipeline
   module(load="mmjsonparse")
   module(load="mmjsontransform")

   # Step 1: Parse JSON from the raw message payload.
   # useRawMsg="on" is robust when a syslog header may or may not be present.
   action(
     type="mmjsonparse"
     container="$!qradar"
     mode="find-json"
     useRawMsg="on"
   )

   # Step 2: Only proceed if parsing succeeded to avoid partial outputs
   if $parsesuccess == "OK" then {
     # Convert dotted keys (e.g., src.ip) to nested JSON ("src": {"ip": ...})
     action(
       type="mmjsontransform"
       input="$!qradar"
       output="$!qradar_structured"
       mode="unflatten"
     )

     # Step 3: Write just the structured subtree to a file
     action(
       type="omfile"
       file="/var/log/qradar-structured.json"
       template="outfmt"
     )
   }

Design patterns (with diagrams)
-------------------------------
**1) Fan-out (branching to multiple actions)**

.. mermaid::
   flowchart LR
     RS[Ruleset]
     RS --> A1[omfile: local archive]
     RS --> A2[omrelp: reliable ship]
     RS --> A3[omelasticsearch: search/index]

Best practice: log locally for audit, ship reliably for centralization, index for search.

**2) Staged processing (ruleset chaining)**

.. mermaid::
   flowchart LR
     I[Input(s)] --> R1[Ruleset: parse/normalize]
     R1 --> R2[Ruleset: enrich/filter]
     R2 --> R3[Ruleset: route/select action]
     R3 --> A[Action(s)]

Use ``call`` to hand off between rulesets. Gate transforms with ``$parsesuccess``.

**3) Isolate workloads with per-ruleset queues**

.. mermaid::
   flowchart LR
     I1[Remote TCP] --> QR[(ruleset queue)]
     I2[Local system] --> QL[(ruleset queue)]
     QR --> Rrem[Ruleset remote]
     QL --> Rloc[Ruleset local]
     Rrem --> Arem[Remote actions]
     Rloc --> Aloc[Local actions]

Tune queue sizes/workers per workload; avoid burst interference.

Key ideas & tips
----------------
- **Gate transforms** using ``$parsesuccess`` to avoid writing malformed data.
- Prefer **subtree templates** for clean, stable downstream contracts.
- **Filter early** to drop noise before expensive transforms.
- Configure **action/ruleset queues** for resilience under load.
- Compose stages with modules (``mmjsonparse``, ``mmjsontransform``, ``mmnormalize``) as needed.

Troubleshooting checklist
-------------------------
- **Validate:** ``rsyslogd -N1`` (syntax & module checks).
- **Inspect fields:** temporarily add
  ``action(type="omfile" file="/tmp/debug" template="RSYSLOG_DebugFormat")``.
- **Verify JSON presence:** ``mode="find-json"`` parses only if JSON exists.
- **Check queues:** monitor queue sizes/latency; increase workers if needed.

See also
--------
- :doc:`../configuration/basic_structure`
- :doc:`../configuration/modules/workflow`
- :doc:`../concepts/queues`
- :doc:`../configuration/templates`
- :doc:`../configuration/modules/mmjsonparse`
- :doc:`../configuration/modules/mmjsontransform`

