-
Notifications
You must be signed in to change notification settings - Fork 630
Description
I have a global interceptor configured for all my output channels to log messages:
@Slf4j
@RequiredArgsConstructor
public class OutputChannelInterceptor implements ChannelInterceptor {
private final BindingServiceProperties bindingServiceProperties;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
InterceptorProperties properties =
new InterceptorProperties(message, channel, bindingServiceProperties);
log.debug(
"Outbound channel {} received message in topic {}",
properties.getChannelName(),
properties.getTopicName());
log.trace("Full message is {}", message);
return message
}
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
InterceptorProperties properties =
new InterceptorProperties(message, channel, bindingServiceProperties);
log.debug(
"Outbound channel {} processed message in topic {} and message {} sent",
properties.getChannelName(),
properties.getTopicName(),
sent ? "was" : "was not");
log.trace("Full message is {}", message);
}
@Data
static class InterceptorProperties {
private final String channelName;
private final String topicName;
public InterceptorProperties(
Message<?> message,
MessageChannel channel,
BindingServiceProperties bindingServiceProperties) {
if (channel instanceof DirectWithAttributesChannel directChannel) {
channelName = directChannel.getFullChannelName();
topicName =
directChannel.getComponentName() != null
? Optional.ofNullable(
bindingServiceProperties.getBindingProperties(
directChannel.getComponentName()))
.map(BindingProperties::getDestination)
.orElse("")
: "";
} else {
channelName = "";
topicName = "";
}
}
}
}I use bindingServiceProperties mostly to determine the kafka topic that the message is being sent to.
This works fine when i use message processing like
@Bean
public Function<String, String> interceptor() {
return message -> "interceptorMessage";
}However, when i need to send a message on demand (e.g., via a REST API call), I'm using the StreamBridge approach
@Slf4j
@Component
@RequiredArgsConstructor
static class StreamProcessor {
@Autowired public StreamBridge streamBridge;
public void testProduce(String key, String message) {
streamBridge.send(key, "message");
}
}For this case, I can't seem to extract the channel name from the channel in the interceptor. I navigated in stream bridge code up to the point where the channel is created, and it does not seem to have a name at all when it is created this way. Is there any way that I can have at least the channel name in the interceptor, or is there an alternative way to produce the message without stream bridge?
Version of the framework 2023.0.0