RocketMQ 链路追踪

最近更新时间: 2025-01-15 17:01:00

TSF 从1.23.0版本开始支持 RocketMQ 使用联路追踪能力。

链路追踪原理

利用 springboot 提供自动配置原理,加入一个能插手 bean DefaultMQProducer、和 bean DefaultMQPushConsumer 创建过程的自动配置类。使用代理来增强 DefaultMQProducer/DefaultMQPushConsumer,在调用相应方法是加入 tracing 的逻辑,在方法结束时将 tracing 数据搜集起来,统一存储。最后分析展示给前端页面。

引入依赖及配置

    <!-- TSF 启动器 -->
    <dependency>
        <groupId>com.tencent.tsf</groupId>
        <artifactId>spring-cloud-tsf-starter</artifactId>
    </dependency>
    <!--RocketMQ-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.4.0</version>
    </dependency>
apache:
  rocketmq:
    consumer:
      pushConsumer: myConsumer
    producer:
      producerGroup: myProducer
    name-server: xxx.xxx.xx.xx #rocketMq ip
    topic: long-topic

生产者示例

向 Spring 容器中加入 bean DefaultMQProducer:

@Configuration
public class ProducerConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(ProducerConfiguration.class);
    @Value("${apache.rocketmq.name-server}")
    private String namesrvAddr;
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;
    @Bean
    public DefaultMQProducer defaultMQProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setVipChannelEnabled(false);
        producer.setRetryTimesWhenSendAsyncFailed(0);
        producer.start();
        logger.info("rocketmq producer is starting");
        return producer;
    }
}

引用 DefaultMQProducer,发送消息:

@Service
public class RocketmqService {
    private static  final Logger logger = LoggerFactory.getLogger(RocketmqService.class);
    @Autowired
    private DefaultMQProducer defaultMQProducer;
    // 使用 application.properties 里定义的 topic 属性
    @Value("${apache.rocketmq.topic}")
    private String springTopic;
    private AtomicLong id =new AtomicLong(0);
    @Scheduled(fixedDelayString = "${consumer.auto.test.interval:5000}")
    public String prepareSend() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
       return send();
    }
    public String send() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        String sentText = "rocketmq message: msg id="+id.addAndGet(1);
        Message message = new Message(springTopic,"push", sentText.getBytes());
        message.putUserProperty("traceID","1234567");
        SendResult sendResult = defaultMQProducer.send(message);
        logger.info("消息id:"+id.get()+","+"发送状态:"+sendResult.getSendStatus());
        return sendResult.getMsgId();
    }
}

消费者示例

@Configuration
public class ConsumerConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerConfiguration.class);
    @Value("${apache.rocketmq.name-server}")
    private String namesrvAddr;
    @Value("${apache.rocketmq.topic}")
    private String springTopic;
    @Value("${apache.rocketmq.consumer.pushConsumer}")
    private String consumerGroup;
    @Bean
    public DefaultMQPushConsumer pushConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            // 订阅PushTopic下Tag为push的消息,都订阅消息
            consumer.subscribe(springTopic, "push");
            // 程序第一次启动从消息队列头获取数据
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //可以修改每次消费消息的数量,默认设置是每次消费一条
            consumer.setConsumeMessageBatchMaxSize(1);
            //在此监听中消费信息,并返回消费的状态信息
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                // 会把不同的消息分别放置到不同的队列中
                for (Message msg : msgs) {
                    System.out.println("接收到了消息:" + new String(msg.getBody()));
                    logger.info("接收到了消息:" + new String(msg.getBody()));
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return consumer;
    }
}