Skip to content
knightliao edited this page Jul 20, 2015 · 1 revision

Tutorial 2 PikaQ安全机制配置

为了实现事务的一致性,PikaQ需要额外的一些简单设置。目前的解决方案是 使用 PikaData。

使用 PikaData

步骤一:执行SQL

CREATE TABLE `pikaq_data` (
    `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
    `createTime` VARCHAR(14) NOT NULL,
    `updateTime` VARCHAR(14) NOT NULL,
    `correlation` CHAR(36) NOT NULL COMMENT '唯一标识',
    `data` MEDIUMTEXT NOT NULL COMMENT '最大16MB,UTF-8',
    `status` TINYINT(4) NOT NULL DEFAULT '0',
    `infoMsg` VARCHAR(256) NOT NULL DEFAULT '',
    `consumeTime` BIGINT(20) NOT NULL DEFAULT '0' COMMENT '上次消耗时间,精确到毫妙',
    `retryCount` INT(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
    `exchange` VARCHAR(50) NOT NULL DEFAULT '',
    `routerKey` VARCHAR(50) NOT NULL DEFAULT '',
    PRIMARY KEY (`id`),
    INDEX `correlation_id` (`correlation`),
    INDEX `status` (`status`)
)
COMMENT='pikaq数据存储'
COLLATE='utf8_general_ci'
ENGINE=InnoDB
;

步骤二:实现 producer 接口

实现接口 DbStoreProducerUserCallback

示例:

/**
 * 保存 pika data
 */
@Service
public class DbStoreProducerUserCallbackImpl implements DbStoreProducerUserCallback {

    @Autowired
    private PikaDataDao pikaDataDao;

    @Override
    public void saveStatusData(String correlation, String data, String exchange, String routerKey)
        throws StoreUserCallbackException {

        pikaDataDao.createOne(correlation, data, exchange, routerKey);
    }
}

其中,PikaDataDao 需要你自己根据你自己的orm类型来实现

pikaq-web-demo 提供一种orm实现方式

步骤三:实现 consumer 接口

实现接口 DbStoreConsumerUserCallback

示例:

/**
 * 保存 pika data
 */
@Service
public class DbStoreConsumerUserCallbackImpl implements DbStoreConsumerUserCallback {

    @Autowired
    private PikaDataDao pikaDataDao;

    /**
     * @param correlation
     *
     * @return
     */
    public boolean exist(String correlation) {

        PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
        if (null != pikaData) {
            return true;
        }
        return false;
    }

    /**
     * @param correlation
     *
     * @return
     *
     * @throws StoreException
     */
    public boolean isProcessing(String correlation) throws StoreException {
        PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
        if (null != pikaData) {
            return pikaData.getStatus().equals(StoreDataStatusEnum.PROCESS.getValue());
        }
        throw new StoreUserCallbackException("cannot find this message " + correlation);
    }

    /**
     * 将消息设置为处理中
     *
     * @param correlation
     *
     * @throws StoreUserCallbackException
     */
    public void update2Processing(String correlation) throws StoreUserCallbackException {

        PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
        if (null != pikaData) {
            pikaData.setStatus(StoreDataStatusEnum.PROCESS.getValue());
            pikaDataDao.updateOne(pikaData);
        } else {

            throw new StoreUserCallbackException("cannot find this message " + correlation);
        }
    }

    /**
     * 将消息设置为成功,该条消息必须存在,否则可能就是bug了
     *
     * @param correlation
     * @param infoMsg
     *
     * @throws StoreUserCallbackException
     */
    public void update2Success(String correlation, String infoMsg, Long costTime) throws StoreUserCallbackException {

        PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
        if (pikaData != null) {
            pikaData.setStatus(StoreDataStatusEnum.SUCCESS.getValue());
            pikaData.setConsumeTime(costTime);
            pikaData.setInfoMsg(infoMsg);
            pikaDataDao.updateOne(pikaData);
        } else {
            throw new StoreUserCallbackException("cannot find this message " + correlation);
        }

    }

    /**
     * @param correlation
     * @param infoMsg
     *
     * @throws StoreUserCallbackException
     */
    public void update2Failed(String correlation, String infoMsg, Long costTime) throws StoreUserCallbackException {

        PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
        if (pikaData != null) {
            pikaData.setStatus(StoreDataStatusEnum.FAILED.getValue());
            pikaData.setConsumeTime(costTime);
            pikaData.setInfoMsg(infoMsg);
            pikaDataDao.updateOne(pikaData);
        } else {
            throw new StoreUserCallbackException("update2Failed cannot find this message: " + correlation);
        }
    }
}

END

Clone this wiki locally