Skip to content

RISE-Maritime/porla

Repository files navigation

porla

Pipeline-Oriented Real-time Logging Assistant or simply a basic toolbox for handling of line-based streaming data using linux pipes and some handy command-line utilities

"porla" is also a Swedish word for the soothing sound of running water from a small stream of water

Motivation and purpose

porla was conceived as a result of our needs with regards to data logging and data "proxying" in relation to different research projects. The framework had to be:

  • Minimalistic, we needed something that was easy to reach for also for the smallest of tasks.
  • Versatile, we needed something that we could use for many different setups where the base assumptions did not work against us.
  • Configurable, we needed to be able to easily build data pipelines from scratch on-site ideally without resorting to re-building any software.
  • Documentable, everything needed to be documentable so that any setup could be recreated as required.
  • Non-invasive, we needed something that didnt require us to conform to any specific data format or programming language.
  • Extensible, we needed it to be easily extensible as the inherent nature of research projects means we never know what the next thing will be.

With that said, porla is primarily designed for logging and proxying of line-based, textual data.

Overview

Schematic

schematic

Tech stack

porla is built on top of three well-known technologies:

  • Linux pipes
  • UDP Multicast
  • Containerization (Docker containers)

UDP Multicast is leveraged for the bus that is used to connect user-confgured pipelines, wherein Linux pipes are used to chain multiple commands. Each pipeline is contained within its own containerizied environment. Have a look at the Examples section further down.

Architecture

porla follows a two-tier architecture separating domain-agnostic tooling from domain-specific extensions:

porla (base)
├── I/O: serial, UDP, TCP (via socat)
├── Transport: MQTT, Zenoh
├── Transform: JSON, base64, timestamps, shuffle
└── Core: bus, record, limit

porla-{domain} (extensions)
└── Domain-specific formats and protocols

What belongs in the base image (domain-agnostic):

  • Generic transports (MQTT, Zenoh, UDP, TCP)
  • Generic transformations (JSON, base64, timestamps)
  • Core pipeline tools (bus, record, limit)

What belongs in domain extensions:

  • Data format parsers/encoders (e.g., NMEA, AIS, SignalK)
  • Domain-specific protocol handlers
  • Format converters between domain standards

This separation allows the base image to remain lightweight and broadly applicable, while domain-specific tooling is available through dedicated extension images.

Performance

TODO

Usage

It is packaged as a docker image, available here: https://github.com/orgs/MO-RISE/packages

The image expects a "run" command inputted on startup. Using docker run, this would manifest as such:

docker run --network=host porla "<command>"

Using docker-compose it would look like this:

version: '3'
services:
    service_1:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["<command>"]

Built-in functionality

  • to_bus and from_bus

    Pipes data to or from the bus. Expects a single argument, the bus_id, which needs to be in the range 0-255. In addition, to_bus accepts the optional argument --ttl which can be used to set the TTL (Time-to-Live) of the outgoing multicast datagram, it defaults to --ttl 0 (which constrains the traffic to the own host only).

  • record

    Records (appends) data from STDIN to a file. Expects a single required argument, the file_path. Optionally accepts:

    • --rotate-at (a cron expression for when to rotate logs)
    • --rotate-count (number of rotated files to keep, defaults to 7)
    • --date-format (strftime format for rotated file dates, defaults to -%Y%m%d)

    The cron expression must have exactly 5 fields: minute, hour, day, month, and weekday (e.g., '0 0 * * *' for daily at midnight, '*/15 * * * *' for every 15 minutes). Rotated files are stored in a historic subdirectory next to the original log file, with the original file extension preserved (e.g., historic/bus_id_1-20251118.log.gz).

  • b64

    Base64 encodes (--encode) or decodes (--decode) data from STDIN to STDOUT. Optinally takes two arguments, the input_format_specification and the output_format_specification to flexibly allow only parts of the input to be encoded/decoded.

  • jsonify

    Parses each line according to a parse format specification (see https://github.com/r1chardj0n3s/parse#format-syntax) and outputs the named values as key-value pairs in a json object. Expects a single argument, the format specification.

  • timestamp

    Prepends a timestamp to each line. The timestamp is either the unix epoch (--epoch) or in rfc3339 format (--rfc3339)

  • shuffle

    Rearrange, deduct or add content to each line using two (one for the input and one for the output) format specifications. Expects two arguments, the input_format_specification and the output_format_specification.

  • limit

    Rate limit the flow through a pipe on a line-by-line basis. Expects a single required argument, interval, and an optional argument, --key with a format specification of how to find the key of each line whereby to "group" the flow.

Transport tools

3rd-party tools

Examples

version: '3.8'

services:
    source_1:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["socat UDP4-RECV:1457,reuseaddr STDOUT | timestamp | to_bus 1"]

    transform_1:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["from_bus 1 | jsonify '{} {name} {value}' | to_bus 2"]

    transform_2:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["from_bus 2 | b64 --encode | to_bus 3"]

    sink_1:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["from_bus 3 | socat STDIN UDP4-DATAGRAM:1458"]

    sink2:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["from_bus 3 | to_bus 255 --ttl 1"]

    record_1:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        volumes:
            - ./recordings:/recordings
        command: ["from_bus 1 | record /recordings/bus_id_1.log --rotate-at '0 0 * * *' --rotate-count 30 --date-format '-%Y-%m-%d'"]

    record_2:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        volumes:
            - ./recordings:/recordings
        command: ["from_bus 2 | record /recordings/bus_id_2.log"]

    record_3:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        volumes:
            - ./recordings:/recordings
        command: ["from_bus 3 | record /recordings/bus_id_3.log"]

MQTT Pipeline Example

This example demonstrates subscribing to an MQTT topic, transforming the data, and publishing to another topic:

version: '3.8'

services:
    mqtt_source:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        environment:
            - MQTT_HOST=broker.example.com
        command: ["mqtt subscribe -t sensors/temperature | timestamp | to_bus 10"]

    transform:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["from_bus 10 | jsonify '{timestamp} {value}' | jq -c '{temp: .value, ts: .timestamp}' | to_bus 11"]

    mqtt_sink:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        environment:
            - MQTT_HOST=broker.example.com
        command: ["from_bus 11 | mqtt publish -t processed/temperature"]

    record:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        volumes:
            - ./recordings:/recordings
        command: ["from_bus 10 | record /recordings/temperature.log"]

Zenoh Pipeline Example

This example demonstrates subscribing to a Zenoh key expression, processing data, and publishing back:

version: '3.8'

services:
    zenoh_source:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["zenoh subscribe -k sensors/** | timestamp | to_bus 20"]

    transform:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["from_bus 20 | jq -c '. + {processed: true}' | to_bus 21"]

    zenoh_sink:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["from_bus 21 | zenoh put -k processed/data"]

Combined MQTT and Zenoh Pipeline

This example demonstrates bridging data between MQTT and Zenoh:

version: '3.8'

services:
    # MQTT to Zenoh bridge
    mqtt_to_zenoh:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        environment:
            - MQTT_HOST=mqtt-broker.local
        command: ["mqtt subscribe -t legacy/sensors/# | to_bus 30"]

    zenoh_publisher:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["from_bus 30 | zenoh put -k modern/sensors/data"]

    # Zenoh to MQTT bridge
    zenoh_to_mqtt:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        command: ["zenoh subscribe -k commands/** | to_bus 31"]

    mqtt_publisher:
        image: ghcr.io/rise-maritime/porla
        network_mode: host
        restart: always
        environment:
            - MQTT_HOST=mqtt-broker.local
        command: ["from_bus 31 | mosquitto_pub -h mqtt-broker.local -t legacy/commands -l"]

Extensions

Extensions provide domain-specific tooling that builds on the porla base image. They are Docker images that add specialized parsers, encoders, and protocol handlers for specific domains.

For available domain-specific extensions, see repositories tagged with porla-extension.

When to create an extension

Create a new extension when you need:

  • Domain-specific data format parsers/encoders (e.g., NMEA, AIS, SignalK for maritime)
  • Specialized protocol handlers for a particular industry
  • Format converters between domain standards

Do not create an extension for:

  • Generic transports or transformations — these should be proposed for inclusion in the base image
  • Tools that are broadly applicable across domains

Creating an extension

  1. Use the porla-extension-template as a starting point
  2. Name your repository porla-<domain> (e.g., porla-maritime, porla-automotive)
  3. Add the porla-extension topic to your repository for discoverability
  4. Use ghcr.io/rise-maritime/porla as the base image in your Dockerfile

Example extension Dockerfile:

FROM ghcr.io/rise-maritime/porla:latest

COPY requirements.txt requirements.txt
RUN pip3 install --no-cache-dir -r requirements.txt

COPY --chmod=555 ./bin/* /usr/local/bin/

About

Pipeline-Oriented Real-time Logging Assistant

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors