Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions example_scripts/python/basic/logs_to_trace/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dotenv import load_dotenv
from utils import generate_trace_data
from pathlib import Path
from collections import Counter

load_dotenv(override=True)
parent_dir = Path(__file__).parent.resolve()
Expand All @@ -35,6 +36,11 @@
processed_logs = generate_trace_data(
json.load(open(file_name)), datetime.now(timezone.utc)
)

# Quick sanity check: show the distribution of span types this payload will render as.
log_type_counts = Counter([log.get("log_type", "missing") for log in processed_logs])
print(f"log_type counts: {dict(log_type_counts)}")

response = requests.post(
f"{os.getenv('KEYWORDSAI_BASE_URL')}/v1/traces/ingest",
json=processed_logs,
Expand Down
36 changes: 36 additions & 0 deletions example_scripts/python/basic/logs_to_trace/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,37 @@
from typing import List, Dict, Any


def infer_log_type(log: Dict[str, Any]) -> str:
"""
Infer KeywordsAI `log_type` for a span log.

The ingest API defaults missing/unknown types to "chat" in the UI, which makes
this example misleading. We infer a reasonable type from common fields so the
example produces a mix of span types (chat / generation / task / tool).
"""
span_name = str(log.get("span_name") or "")
span_path = str(log.get("span_path") or "")

# Heuristic: treat "store_*" steps as tools (e.g. saving to a DB/vector store).
if span_name.endswith(".task") and ("store" in span_name or ".store" in span_path):
return "tool"

# Workflow / task spans
if span_name.endswith(".task") or span_name.endswith(".workflow"):
return "task"

# Provider spans
if ".chat" in span_name:
return "chat"

# Non-chat model calls (embeddings, etc.)
if "openai." in span_name or ".embeddings" in span_name:
return "generation"

# Fallback
return "generation"


def deterministic_string_mapper(original_string: str, seed: str) -> str:
"""
Create a deterministic mapping that preserves the original string length.
Expand Down Expand Up @@ -169,6 +200,11 @@ def generate_trace_data(
# Create a shallow copy to avoid modifying the original
processed_log = log.copy()

# Ensure log_type exists so the UI renders correct span types.
# (If the sample log already includes log_type, keep it as-is.)
if not processed_log.get("log_type"):
processed_log["log_type"] = infer_log_type(processed_log)

# Update trace_unique_id (same for all spans in this trace)
if "trace_unique_id" in processed_log:
processed_log["trace_unique_id"] = new_trace_id
Expand Down