springcloud整合stream消费自己生产的消息,在springcloud整合stream,rabbitmq实现消息驱动功能基础代码上进行修改。
1.代码实现:
添加依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
yml配置
spring:
cloud:
stream:
bindings:
example-topic-input:
destination: default.messages
binder: cxh-topic
example-topic-output:
destination: default.messages
binder: cxh-topic
binders:
cxh-topic:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
添加类
@RestController
public class TestController {
@Autowired
private TestTopic testTopic;
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}
public interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
@Slf4j
@Component
public class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received2: " + payload);
}
}
@EnableBinding(TestTopic.class)
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
}
2.实现效果:
启动rabbitmq, 项目stream, 打开浏览器http://localhost:8080/sendMessage?message=hello-cxh
查看控制台输出:
com.cxh.stream.msg.TestListener : Received2: hello-cxh