Skip to content

feat: add additional event handlers#216

Open
jeastham1993 wants to merge 29 commits intomainfrom
feat/add-additional-event-handlers
Open

feat: add additional event handlers#216
jeastham1993 wants to merge 29 commits intomainfrom
feat/add-additional-event-handlers

Conversation

@jeastham1993
Copy link
Copy Markdown
Collaborator

@jeastham1993 jeastham1993 commented Feb 25, 2026

Add additional event handler to the user management service to handle when a user prints a sticker.

This PR adds a new event flow: when a print job completes, the user-management service handles that event and increments the user's printed sticker count.

Deets

  1. Print-service side: The print service now publishes a printJobs.completed.v1 event (via Kafka and EventBridge) when a print job is acknowledged, with manual DSM header injection for Datadog tracing. It also adds DatadogTransactionTracker for tracking transactions through the pipeline.

  2. User-management side: New StickerPrintedHandler in Core handles the event by looking up the user and incrementing PrintedStickerCount (new DB column via migration). New workers consume this event across all four platform adapters — Kafka, SQS, ServiceBus, GCP Pub/Sub, and Lambda — mirroring the existing StickerClaimed pattern.

  3. Infrastructure: AWS CDK adds SQS queues/DLQs and Lambda wiring for the new event. Aspire adds Kafka topics. Docker-compose gets updated for local dev. We've not tested the azure stuff yet.

@jeastham1993
Copy link
Copy Markdown
Collaborator Author

@codex could you review this please?

chatgpt-codex-connector[bot]

This comment was marked as outdated.

@jeastham1993
Copy link
Copy Markdown
Collaborator Author

@codex could you review the most recent commit on this PR please?

chatgpt-codex-connector[bot]

This comment was marked as outdated.

chatgpt-codex-connector[bot]

This comment was marked as outdated.

…ent parsing

JsonSerializer.SerializeToUtf8Bytes on an already-JSON string wraps it in
quotes, breaking DecodeStructuredModeMessageAsync. Switch to
Encoding.UTF8.GetBytes to pass the raw JSON bytes unchanged across Kafka,
Service Bus, and Google Pub/Sub workers.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
chatgpt-codex-connector[bot]

This comment was marked as outdated.

All messaging drivers (Kafka, Service Bus, EventBridge, GCP Pub/Sub) and
Aspire dashboard commands were publishing raw JSON payloads. Consumers call
DecodeStructuredModeMessageAsync and expect a full CloudEvents envelope, so
messages were never successfully processed.

- Use JsonEventFormatter + JsonDocument to encode structured CloudEvents
- Fix GooglePubSubMessaging to route by topic via a per-topic publisher
  dictionary instead of a hardcoded stickerClaimed client
- Add CloudNative.CloudEvents.SystemTextJson to the Aspire project

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
chatgpt-codex-connector[bot]

This comment was marked as outdated.

chatgpt-codex-connector[bot]

This comment was marked as outdated.

…per poll

Both KafkaStickerPrintedWorker and KafakStickerClaimedWorker were
recreating a new IConsumer on every PollAsync() call, which triggered a
Kafka group rebalance cycle on each iteration. In CI environments this
rebalance overhead (2-3s) competed with the 2s Consume() timeout,
causing the WhenStickerIsPrintedThenAUsersPrintedCountShouldIncrement
integration test to fail intermittently.

Consumers are now created once in StartAsync() and subscribed to their
topic, reused across all PollAsync() calls, and closed in StopAsync().
Service scope creation is also moved inside the message-present branch
to avoid unnecessary allocations on idle polls.

Additional changes:
- Add IKafkaConsumerFactory abstraction to enable unit testing of consumer lifecycle
- Add 8 unit tests covering StartAsync/PollAsync/StopAsync lifecycle for both workers
- Fix Log.ReceivedMessage level from Critical to Information

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
chatgpt-codex-connector[bot]

This comment was marked as outdated.

@scottgerring
Copy link
Copy Markdown
Member

@jeastham1993 give us a ping when you would like me to look at this!
(and can you write a brief breakdown of what's changed where and why - its huuuge)

@jeastham1993
Copy link
Copy Markdown
Collaborator Author

@codex could you review this please?

var data = formatter.EncodeStructuredModeMessage(cloudEvent, out _);
var jsonString = System.Text.Encoding.UTF8.GetString(data.Span);

if (Tracer.Instance.ActiveScope is not null)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds DSM support to the AWS publishing

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never call TrackTransactionAsync in this path, only in the kafka worker, which seems like it is needed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jeastham1993 and others added 2 commits March 3, 2026 13:42
…ntBridge

SetHeader was mutating a parsed JsonNode copy but never serializing it back,
so _datadog tracing/DSM headers were silently dropped from all EventBridge
payloads. Parse jsonString into a JsonNode before injection, pass it as the
carrier, then reassign jsonString from ToJsonString() so PutEventsAsync sends
the updated payload.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 70623caee6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

jeastham1993 and others added 2 commits March 3, 2026 14:04
… adapter

Without these registrations, resolving AcknowledgePrintJobCommandHandler in
DRIVEN=AWS deployments failed with an unresolved-service exception. Also adds
Activity span tagging for dsm.transaction_id and dsm.transaction.checkpoint
in both Core and Client transaction trackers.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
WithServiceBusTestCommands sends structured CloudEvents with ContentType
application/cloudevents+json, but the worker was deserializing the raw
body directly to StickerClaimedEventV1, missing the nested data field
and producing empty AccountId/StickerId values that dead-lettered.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ecd11fe3da

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0346c2f6c1

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8dff6e93cb

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ce1040cbfd

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@scottgerring scottgerring self-requested a review March 5, 2026 09:05
Comment on lines +80 to +81
// Complete the message
await args.CompleteMessageAsync(args.Message, CancellationToken.None);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK this - looks like it needs fixing @jeastham1993

- Fix wrong SubscribeOperation type in ServiceBusStickerPrintedWorker
  (was StickerClaimedEventV1, now StickerPrintedEventV1)
- Align envelope handling in ServiceBusStickerPrintedWorker to match
  ServiceBusStickerClaimedWorker (check ContentType, handle both
  CloudEvents and plain JSON)
- Register printJobs.completed.v1 Service Bus queue in Aspire setup
  so ServiceBusStickerPrintedWorker doesn't fail on Azure startup
- Fix activity tag in StickerPrintedHandler: stickerClaim.failed ->
  stickerPrint.failed
- Add unit tests for StickerPrintedHandler covering null event,
  empty userId, user not found, and successful update paths
- Inject DatadogTransactionTracker into EventBridgeEventPublisher
  and call TrackTransactionAsync after publishing PrintJobQueuedEvent,
  matching the Kafka publisher behaviour
- Fix DLQ producer in KafkaStickerPrintedWorker: create once in
  StartAsync and reuse, rather than creating a new instance per call
- Remove duplicate StopAsync override in StickerPrintedWorker and
  StickerClaimedWorker that was calling messagingWorker.StopAsync
  twice (already called in ExecuteAsync finally block)
- Fix log level mismatches in LogMessages.cs: ReceivedMessage
  Critical->Information, GenericWarning Error->Warning
- Fix duplicate EventId 5 in LogMessages.cs: StoppingMessageProcessor
  reassigned to EventId 13
- Fix copy-pasta comment in integration test: "claimed sticker count"
  -> "printed sticker count"
- Rename KafakStickerClaimedWorker -> KafkaStickerClaimedWorker
  (filename and class name typo)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Member

@scottgerring scottgerring left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeastham1993 went through this again; can you fix merge conflicts and address or resolve the codex comments?

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: a237e7128e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +76 to +79
request.Content.Headers.Add("DD-API-KEY", _ddApiKey);

var response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip Datadog POST when API key is unset

TrackTransactionAsync always sends a request to the Datadog pipeline endpoint even though _ddApiKey defaults to an empty string when DD_API_KEY is missing. In environments without that secret (common in local/dev/CI), every queued/acknowledged print path now does an outbound HTTP call that is guaranteed to fail and is still awaited by callers, adding avoidable latency and warning noise for each job. Add an early return when the key is empty (using the existing TransactionTrackingSkipped log) before constructing/sending the request.

Useful? React with 👍 / 👎.

@scottgerring
Copy link
Copy Markdown
Member

Code review

Found 3 issues:

  1. SqsStickerPrintedWorker deserializes message.Body directly as StickerPrintedEventV1, but this queue is fed from EventBridge which wraps the payload in an envelope — detail holds the actual event. Every message will deserialize to null and be dead-lettered. Compare with StickerPrintedSqsHandler which correctly unwraps via CloudWatchEvent<JsonElement>.

var evtData = JsonSerializer.Deserialize<StickerPrintedEventV1>(message.Body);
if (evtData == null)
{

  1. ServiceBusStickerPrintedWorker.ProcessMessageAsync calls DeadLetterMessageAsync when deserialization produces null, but does not return — execution falls through to CompleteMessageAsync on the same message. A Service Bus message can only be settled once; the second call throws ServiceBusException, the error handler fires, and the message re-enters the processing loop.

if (stickerPrinted == null) await args.DeadLetterMessageAsync(args.Message, "Message body cannot be deserialized");
try
{
await handler.Handle(stickerPrinted!);
}
catch (InvalidUserException ex)
{
Log.InvalidUser(_logger, ex);
await args.DeadLetterMessageAsync(args.Message, "Invalid account id");
}
// Complete the message
await args.CompleteMessageAsync(args.Message, CancellationToken.None);
}

  1. DatadogTransactionTracker is duplicated: a correct implementation exists in PrintService.Core (implements IDatadogTransactionTracker, uses structured logging helpers), and a second degraded copy lives in PrintService.Client (no interface, bare #pragma warning disable, inline log calls). The print-service CLAUDE.md describes a ports-and-adapters architecture where shared logic belongs in Core — the Client copy violates this and will drift from the Core version over time.

namespace Stickerlandia.PrintService.Client.Telemetry;
internal class DatadogTransactionTracker

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

- SqsStickerPrintedWorker: unwrap EventBridge envelope before deserializing
  (was deserializing message.Body directly as StickerPrintedEventV1, missing
  the CloudWatchEvent<JsonElement> outer envelope; mirrored StickerPrintedSqsHandler)
- ServiceBusStickerPrintedWorker: add return after DeadLetterMessageAsync calls
  so CompleteMessageAsync is never called on an already-settled message
- PrintService.Client: remove duplicate DatadogTransactionTracker; reference
  Core implementation via IDatadogTransactionTracker instead
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 148ea84281

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +51 to +53
if (_task is not null)
{
await _task;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid blocking poll loop on Pub/Sub subscriber task

PollAsync awaits _task, but _task is created by subscriber.StartAsync(ProcessMessageAsync) and is the long-running subscriber task that only completes after StopAsync. This makes the first poll call block indefinitely, so the hosted worker loop can no longer observe cancellation promptly or reach its finally block to stop the subscriber during shutdown/redeploy in DRIVEN=GCP worker runs.

Useful? React with 👍 / 👎.

};


var serviceBusMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageJson));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Set CloudEvent content type in Azure test messaging driver

The integration driver now sends raw JSON bytes without a ContentType, but the printed Service Bus worker only unwraps structured CloudEvents when ContentType is application/cloudevents+json. For printJobs.completed.v1 test injections (which are now structured CloudEvents), this path falls back to direct StickerPrintedEventV1 deserialization and fails to extract userId, so Azure integration runs can loop/retry instead of incrementing printed counts.

Useful? React with 👍 / 👎.

…nts content type in Azure test driver

- Add early return in TrackTransactionAsync when _ddApiKey is empty to avoid guaranteed-to-fail outbound HTTP calls in local/dev/CI environments
- Set ContentType = "application/cloudevents+json" on ServiceBusMessage in AzureServiceBusMessaging so test-injected printJobs.completed.v1 messages hit the structured CloudEvents deserialization path in the worker
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3e01435131

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

var serviceBusMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageJson))
{
ContentType = "application/json"
ContentType = "application/cloudevents+json"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Send claimed Azure test messages with JSON content type

AzureServiceBusMessaging.SendMessageAsync now labels every injected message as application/cloudevents+json, but claimed-message tests still publish plain StickerClaimedEventV1 JSON (see AccountDriver.InjectStickerClaimedMessage). In ServiceBusStickerClaimedWorker, that content type forces the CloudEvent parsing branch, which expects a top-level data field and cannot deserialize the claimed payload, so claimed events are dead-lettered instead of incrementing counts in DRIVEN=AZURE integration runs.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants