Skip to content
knightliao edited this page Jul 20, 2015 · 4 revisions

Tutorial 2 可靠消息组件使用(下游)

这里以 pikaq-demos/pikaq-web-demo 某个程序片段为例,详细介绍了一个 分布式事务 的简单示例程序(仅显示下游consumer的做法)。

本教程是 https://github.com/knightliao/pikaQ/wiki/Tutorial1 的延续。

第一部分:使用 原生的 rabbitmq 来做下游处理

使用原生的rabbitmq处理下游,必须人工保证下游处理函数是幂等的,否则可能会处理重复处理问题。

第二部分:使用 PikaQ 来做 下游处理

使用pikaQ进行处理,pikaQ 可以保证函数的幂等性,不需要人工处理。

步骤一:通用配置

<!-- queue -->
<rabbit:queue id="pikaQWebDemoLogMessageQueue" name="pikaQWebDemoLogMessageQueue"/>

<!-- consumer service -->
<bean name="createHandler"
      class="com.baidu.pikaq.demo.service.campaign.message.consumer.CampaignCreateConsumerHandler">
</bean>

<!-- listener container -->
<listener-container concurrency="5" connection-factory="connectionFactory"
                    xmlns="http://www.springframework.org/schema/rabbit">
    <listener ref="createHandler" queues="pikaQWebDemoLogMessageQueue"/>
</listener-container>

这里指定了Q,指定了处理函数,指定了监听容器

步骤二:处理函数

package com.baidu.pikaq.demo.service.campaign.message.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.baidu.pikaq.client.consumer.ConsumerListenerBase;
import com.baidu.pikaq.demo.service.campaign.message.CampaignPikaMessage;

/**
 * 强一致性的消息处理
 */
public class CampaignCreateConsumerHandler extends ConsumerListenerBase<CampaignPikaMessage> {

    protected static final Logger LOG = LoggerFactory.getLogger(CampaignCreateConsumerHandler.class);

    /**
     * @param campaignPikaMessage
     */
    public void handleMessage(CampaignPikaMessage campaignPikaMessage) {

        LOG.info("receive: " + campaignPikaMessage.toString());
    }
}

处理函数必须实现 ConsumerListenerBase 接口,并实现 handleMessage 方法,即可。

Clone this wiki locally