生产者只需要注入IMqService
即可发送消息,服务会将消息提交到rocketmq。
@Autowired
private IMqService mqService;
public interface IMqService {
/**
* 默认情况下,destination会取${spring.application.name}-topic:tag
* @param tag
* @param payload
* @return
*/
void sendOneWayDefault(String tag, String payload);
/**
* 单向发送,即发送后不需要等待响应
* @param destination 目的地
* @param payload
* @return
*/
void sendOneWay(String destination, String payload);
/**
* 同步发送
* @param destination
* @param payload
* @return
*/
void send(String destination, String payload);
/**
* 默认情况下,destination会取${spring.application.name}-topic:tag
* @param tag
* @param payload
* @return
*/
void sendDefault(String tag, String payload);
}
消费者需要添加rocketmq配置
rocketmq:
name-server: xx.xx.xx.xx:9876
producer:
group: ${spring.application.name}-group
topic: ${spring.application.name}-topic
通过注解RocketMQMessageListener
监听队列
@Slf4j
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic}", selectorExpression = "log", consumerGroup = "log-consumer_${rocketmq.topic}")
public class SysUserLogListener implements RocketMQListener<String> {
@Autowired
private IOfaSysUserLogService ofaSysUserLogService;
@Override
public void onMessage(String message) {
log.info("receive log,{}", message);
log.info("save user operation log...");
if (AssertUtil.isNotEmpty(message)){
OfaSysUserLog ofaSysUserLog = JSON.parseObject(message, OfaSysUserLog.class);
ofaSysUserLog.setId(IDGenerate.nextId().get());
ofaSysUserLogService.save(ofaSysUserLog);
}
}
}
topic
表示订阅哪个主题
selectorExpression
可以过滤tag,建议每个应用都使用一个topic,通过tag来区分不同的业务。
consumerGroup
表示消费者组