消息队列


生产者

生产者只需要注入IMqService即可发送消息,服务会将消息提交到rocketmq。

@Autowired
private IMqService mqService;
public interface IMqService {

    /**
     * 默认情况下,destination会取${spring.application.name}-topic:tag
     * @param tag
     * @param payload
     * @return
     */
    void sendOneWayDefault(String tag, String payload);


    /**
     * 单向发送,即发送后不需要等待响应
     * @param destination 目的地
     * @param payload
     * @return
     */
    void sendOneWay(String destination, String payload);



    /**
     * 同步发送
     * @param destination
     * @param payload
     * @return
     */
    void send(String destination, String payload);

    /**
     * 默认情况下,destination会取${spring.application.name}-topic:tag
     * @param tag
     * @param payload
     * @return
     */
    void sendDefault(String tag, String payload);
}

消费者

消费者需要添加rocketmq配置

rocketmq:
  name-server: xx.xx.xx.xx:9876
  producer:
    group: ${spring.application.name}-group
  topic: ${spring.application.name}-topic

通过注解RocketMQMessageListener监听队列

@Slf4j
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic}", selectorExpression = "log", consumerGroup = "log-consumer_${rocketmq.topic}")
public class SysUserLogListener implements RocketMQListener<String> {

    @Autowired
    private IOfaSysUserLogService ofaSysUserLogService;

    @Override
    public void onMessage(String message) {
        log.info("receive log,{}", message);
        log.info("save user operation log...");
        if (AssertUtil.isNotEmpty(message)){
            OfaSysUserLog ofaSysUserLog = JSON.parseObject(message, OfaSysUserLog.class);
            ofaSysUserLog.setId(IDGenerate.nextId().get());
            ofaSysUserLogService.save(ofaSysUserLog);
        }
    }
}

topic表示订阅哪个主题

selectorExpression可以过滤tag,建议每个应用都使用一个topic,通过tag来区分不同的业务。

consumerGroup表示消费者组