Pipeline-Oriented Real-time Logging Assistant or simply a basic toolbox for handling of line-based streaming data using linux pipes and some handy command-line utilities
"porla" is also a Swedish word for the soothing sound of running water from a small stream of water
porla was conceived as a result of our needs with regards to data logging and data "proxying" in relation to different research projects. The framework had to be:
- Minimalistic, we needed something that was easy to reach for also for the smallest of tasks.
- Versatile, we needed something that we could use for many different setups where the base assumptions did not work against us.
- Configurable, we needed to be able to easily build data pipelines from scratch on-site ideally without resorting to re-building any software.
- Documentable, everything needed to be documentable so that any setup could be recreated as required.
- Non-invasive, we needed something that didnt require us to conform to any specific data format or programming language.
- Extensible, we needed it to be easily extensible as the inherent nature of research projects means we never know what the next thing will be.
With that said, porla is primarily designed for logging and proxying of line-based, textual data.
porla is built on top of three well-known technologies:
- Linux pipes
- UDP Multicast
- Containerization (Docker containers)
UDP Multicast is leveraged for the bus that is used to connect user-confgured pipelines, wherein Linux pipes are used to chain multiple commands. Each pipeline is contained within its own containerizied environment. Have a look at the Examples section further down.
porla follows a two-tier architecture separating domain-agnostic tooling from domain-specific extensions:
porla (base)
├── I/O: serial, UDP, TCP (via socat)
├── Transport: MQTT, Zenoh
├── Transform: JSON, base64, timestamps, shuffle
└── Core: bus, record, limit
porla-{domain} (extensions)
└── Domain-specific formats and protocols
What belongs in the base image (domain-agnostic):
- Generic transports (MQTT, Zenoh, UDP, TCP)
- Generic transformations (JSON, base64, timestamps)
- Core pipeline tools (bus, record, limit)
What belongs in domain extensions:
- Data format parsers/encoders (e.g., NMEA, AIS, SignalK)
- Domain-specific protocol handlers
- Format converters between domain standards
This separation allows the base image to remain lightweight and broadly applicable, while domain-specific tooling is available through dedicated extension images.
TODO
It is packaged as a docker image, available here: https://github.com/orgs/MO-RISE/packages
The image expects a "run" command inputted on startup. Using docker run, this would manifest as such:
docker run --network=host porla "<command>"
Using docker-compose it would look like this:
version: '3'
services:
service_1:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["<command>"]
-
to_bus and from_bus
Pipes data to or from the
bus. Expects a single argument, thebus_id, which needs to be in the range 0-255. In addition,to_busaccepts the optional argument--ttlwhich can be used to set the TTL (Time-to-Live) of the outgoing multicast datagram, it defaults to--ttl 0(which constrains the traffic to the own host only). -
record
Records (appends) data from STDIN to a file. Expects a single required argument, the
file_path. Optionally accepts:--rotate-at(a cron expression for when to rotate logs)--rotate-count(number of rotated files to keep, defaults to 7)--date-format(strftime format for rotated file dates, defaults to-%Y%m%d)
The cron expression must have exactly 5 fields: minute, hour, day, month, and weekday (e.g.,
'0 0 * * *'for daily at midnight,'*/15 * * * *'for every 15 minutes). Rotated files are stored in ahistoricsubdirectory next to the original log file, with the original file extension preserved (e.g.,historic/bus_id_1-20251118.log.gz). -
b64
Base64 encodes (
--encode) or decodes (--decode) data from STDIN to STDOUT. Optinally takes two arguments, theinput_format_specificationand theoutput_format_specificationto flexibly allow only parts of the input to be encoded/decoded. -
jsonify
Parses each line according to a
parseformat specification (see https://github.com/r1chardj0n3s/parse#format-syntax) and outputs the named values as key-value pairs in a json object. Expects a single argument, theformat specification. -
timestamp
Prepends a timestamp to each line. The timestamp is either the unix epoch (
--epoch) or in rfc3339 format (--rfc3339) -
shuffle
Rearrange, deduct or add content to each line using two (one for the input and one for the output) format specifications. Expects two arguments, the
input_format_specificationand theoutput_format_specification. -
limit
Rate limit the flow through a pipe on a line-by-line basis. Expects a single required argument,
interval, and an optional argument,--keywith a format specification of how to find the key of each line whereby to "group" the flow.
-
mqtt
Command-line tool for publishing and subscribing to MQTT. See https://github.com/MO-RISE/mqtt-cli for full documentation.
Subscribe example:
mqtt subscribe -t my/topicPublish example:
echo "hello" | mqtt publish -t my/topic -
mosquitto_sub and mosquitto_pub
Standard Mosquitto MQTT clients. See https://mosquitto.org/man/mosquitto_sub-1.html and https://mosquitto.org/man/mosquitto_pub-1.html
Subscribe example:
mosquitto_sub -h broker -t my/topicPublish example:
mosquitto_pub -h broker -t my/topic -l(reads lines from stdin) -
zenoh
Command-line tool for interacting with a Zenoh session. See https://github.com/RISE-Maritime/zenoh-cli for full documentation.
Subscribe example:
zenoh subscribe -k my/key/expressionPut example:
echo "hello" | zenoh put -k my/key/expressionGet example:
zenoh get -k my/key/expression
-
socat
-
jq
-
parallel
-
websocat
-
modbus
Command-line tool for reading and writing Modbus registers over TCP or RTU (serial). Supports encoding/decoding of types larger than 16 bits (floats, 32-bit integers, etc.).
https://github.com/favalex/modbus-cli
Read example:
modbus -s 1 192.168.1.100:502 h@0/f(read holding register 0 as float)Write example:
modbus -s 1 192.168.1.100:502 h@0=42(write value 42 to holding register 0)
version: '3.8'
services:
source_1:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["socat UDP4-RECV:1457,reuseaddr STDOUT | timestamp | to_bus 1"]
transform_1:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["from_bus 1 | jsonify '{} {name} {value}' | to_bus 2"]
transform_2:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["from_bus 2 | b64 --encode | to_bus 3"]
sink_1:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["from_bus 3 | socat STDIN UDP4-DATAGRAM:1458"]
sink2:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["from_bus 3 | to_bus 255 --ttl 1"]
record_1:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
volumes:
- ./recordings:/recordings
command: ["from_bus 1 | record /recordings/bus_id_1.log --rotate-at '0 0 * * *' --rotate-count 30 --date-format '-%Y-%m-%d'"]
record_2:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
volumes:
- ./recordings:/recordings
command: ["from_bus 2 | record /recordings/bus_id_2.log"]
record_3:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
volumes:
- ./recordings:/recordings
command: ["from_bus 3 | record /recordings/bus_id_3.log"]
This example demonstrates subscribing to an MQTT topic, transforming the data, and publishing to another topic:
version: '3.8'
services:
mqtt_source:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
environment:
- MQTT_HOST=broker.example.com
command: ["mqtt subscribe -t sensors/temperature | timestamp | to_bus 10"]
transform:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["from_bus 10 | jsonify '{timestamp} {value}' | jq -c '{temp: .value, ts: .timestamp}' | to_bus 11"]
mqtt_sink:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
environment:
- MQTT_HOST=broker.example.com
command: ["from_bus 11 | mqtt publish -t processed/temperature"]
record:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
volumes:
- ./recordings:/recordings
command: ["from_bus 10 | record /recordings/temperature.log"]This example demonstrates subscribing to a Zenoh key expression, processing data, and publishing back:
version: '3.8'
services:
zenoh_source:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["zenoh subscribe -k sensors/** | timestamp | to_bus 20"]
transform:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["from_bus 20 | jq -c '. + {processed: true}' | to_bus 21"]
zenoh_sink:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["from_bus 21 | zenoh put -k processed/data"]This example demonstrates bridging data between MQTT and Zenoh:
version: '3.8'
services:
# MQTT to Zenoh bridge
mqtt_to_zenoh:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
environment:
- MQTT_HOST=mqtt-broker.local
command: ["mqtt subscribe -t legacy/sensors/# | to_bus 30"]
zenoh_publisher:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["from_bus 30 | zenoh put -k modern/sensors/data"]
# Zenoh to MQTT bridge
zenoh_to_mqtt:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
command: ["zenoh subscribe -k commands/** | to_bus 31"]
mqtt_publisher:
image: ghcr.io/rise-maritime/porla
network_mode: host
restart: always
environment:
- MQTT_HOST=mqtt-broker.local
command: ["from_bus 31 | mosquitto_pub -h mqtt-broker.local -t legacy/commands -l"]Extensions provide domain-specific tooling that builds on the porla base image. They are Docker images that add specialized parsers, encoders, and protocol handlers for specific domains.
For available domain-specific extensions, see repositories tagged with porla-extension.
Create a new extension when you need:
- Domain-specific data format parsers/encoders (e.g., NMEA, AIS, SignalK for maritime)
- Specialized protocol handlers for a particular industry
- Format converters between domain standards
Do not create an extension for:
- Generic transports or transformations — these should be proposed for inclusion in the base image
- Tools that are broadly applicable across domains
- Use the porla-extension-template as a starting point
- Name your repository
porla-<domain>(e.g.,porla-maritime,porla-automotive) - Add the
porla-extensiontopic to your repository for discoverability - Use
ghcr.io/rise-maritime/porlaas the base image in your Dockerfile
Example extension Dockerfile:
FROM ghcr.io/rise-maritime/porla:latest
COPY requirements.txt requirements.txt
RUN pip3 install --no-cache-dir -r requirements.txt
COPY --chmod=555 ./bin/* /usr/local/bin/