问题描述:在使用Spring Boot框架时,如何使用BindingsEndpoint和Kafka Stream进行消息处理?
org.springframework.cloud
spring-cloud-stream-binder-kafka-streams
@SpringBootApplication
@EnableBinding
public class MyApp {
@Autowired
private BindingsEndpoint bindingsEndpoint;
// ...
}
management.server.port=8081
management.endpoints.web.exposure.include=bindings # 开启bindings接口
spring.cloud.stream.bindings.input.destination=my-topic # 绑定input,topic为my-topic
@EnableBinding(Processor.class)
public class MyProcessor {
@Autowired
private Processor processor;
public void processData(String input) {
this.processor
.input()
.send(new GenericMessage(input));
}
@StreamListener("input")
@SendTo("output")
public String transform(String message) {
return message.toUpperCase();
}
}
在上面的示例中,@StreamListener注释它标记到一个方法,该方法接收来自名为“input”的主题的消息。然后,您可以使用@SendTo注释,以将输出发送到名为“output”的主题。
以上就是BindingsEndpoint kafka Stream spring boot 的解决方法。通过使用Bindings