-
Notifications
You must be signed in to change notification settings - Fork 385
Expand file tree
/
Copy pathmonitor.py
More file actions
68 lines (55 loc) · 2.13 KB
/
monitor.py
File metadata and controls
68 lines (55 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""Log monitoring node demonstrating in-dataflow log aggregation.
Receives structured log entries from processor via send_logs_as routing,
and sensor readings directly. Counts warnings and errors to demonstrate
that log data can be consumed and analyzed within the dataflow graph.
"""
import json
import logging
from dora import Node
def main():
node = Node()
warn_count = 0
error_count = 0
reading_count = 0
for event in node:
if event["type"] == "INPUT":
input_id = event["id"]
if input_id == "logs":
# Log entries arrive as Arrow-encoded JSON strings
arr = event["value"]
if len(arr) == 0:
continue
raw = arr[0].as_py()
try:
entry = json.loads(raw) if isinstance(raw, str) else raw
except (json.JSONDecodeError, TypeError):
entry = {"level": "unknown", "message": str(raw)}
level = entry.get("level", "").lower()
msg = entry.get("message", entry.get("msg", ""))
if level == "error":
error_count += 1
logging.error("monitor caught error: %s", msg)
elif level in ("warn", "warning"):
warn_count += 1
logging.warning("monitor caught warning: %s", msg)
else:
logging.debug("monitor log entry: level=%s msg=%s", level, msg)
elif input_id == "reading":
reading_count += 1
if reading_count % 20 == 0:
logging.info(
"monitor summary: readings=%d warnings=%d errors=%d",
reading_count,
warn_count,
error_count,
)
elif event["type"] == "STOP":
logging.info(
"monitor final: readings=%d warnings=%d errors=%d",
reading_count,
warn_count,
error_count,
)
break
if __name__ == "__main__":
main()