Skip to content

Problem intercepting messages sent with StreamBridge #2885

@jgsmarques

Description

@jgsmarques

I have a global interceptor configured for all my output channels to log messages:

@Slf4j
@RequiredArgsConstructor
public class OutputChannelInterceptor implements ChannelInterceptor {
  private final BindingServiceProperties bindingServiceProperties;

  @Override
  public Message<?> preSend(Message<?> message, MessageChannel channel) {
    InterceptorProperties properties =
        new InterceptorProperties(message, channel, bindingServiceProperties);
    log.debug(
        "Outbound channel {} received message in topic {}",
        properties.getChannelName(),
        properties.getTopicName());
    log.trace("Full message is {}", message);
    return message
  }

  @Override
  public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
    InterceptorProperties properties =
        new InterceptorProperties(message, channel, bindingServiceProperties);
    log.debug(
        "Outbound channel {} processed message in topic {} and message {} sent",
        properties.getChannelName(),
        properties.getTopicName(),
        sent ? "was" : "was not");
    log.trace("Full message is {}", message);
  }

  @Data
  static class InterceptorProperties {
    private final String channelName;
    private final String topicName;

    public InterceptorProperties(
        Message<?> message,
        MessageChannel channel,
        BindingServiceProperties bindingServiceProperties) {
      if (channel instanceof DirectWithAttributesChannel directChannel) {
        channelName = directChannel.getFullChannelName();
        topicName =
            directChannel.getComponentName() != null
                ? Optional.ofNullable(
                        bindingServiceProperties.getBindingProperties(
                            directChannel.getComponentName()))
                    .map(BindingProperties::getDestination)
                    .orElse("")
                : "";
      } else {
        channelName = "";
        topicName = "";
      }
    }
  }
}

I use bindingServiceProperties mostly to determine the kafka topic that the message is being sent to.
This works fine when i use message processing like

@Bean
public Function<String, String> interceptor() {
  return message -> "interceptorMessage";
}

However, when i need to send a message on demand (e.g., via a REST API call), I'm using the StreamBridge approach

@Slf4j
@Component
@RequiredArgsConstructor
static class StreamProcessor {
  @Autowired public StreamBridge streamBridge;

  public void testProduce(String key, String message) {
    streamBridge.send(key, "message");
  }
}

For this case, I can't seem to extract the channel name from the channel in the interceptor. I navigated in stream bridge code up to the point where the channel is created, and it does not seem to have a name at all when it is created this way. Is there any way that I can have at least the channel name in the interceptor, or is there an alternative way to produce the message without stream bridge?

Version of the framework 2023.0.0

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions