-
Notifications
You must be signed in to change notification settings - Fork 67
Tutorial3
knightliao edited this page Jul 20, 2015
·
1 revision
为了实现事务的一致性,PikaQ需要额外的一些简单设置。目前的解决方案是 使用 PikaData。
CREATE TABLE `pikaq_data` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`createTime` VARCHAR(14) NOT NULL,
`updateTime` VARCHAR(14) NOT NULL,
`correlation` CHAR(36) NOT NULL COMMENT '唯一标识',
`data` MEDIUMTEXT NOT NULL COMMENT '最大16MB,UTF-8',
`status` TINYINT(4) NOT NULL DEFAULT '0',
`infoMsg` VARCHAR(256) NOT NULL DEFAULT '',
`consumeTime` BIGINT(20) NOT NULL DEFAULT '0' COMMENT '上次消耗时间,精确到毫妙',
`retryCount` INT(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`exchange` VARCHAR(50) NOT NULL DEFAULT '',
`routerKey` VARCHAR(50) NOT NULL DEFAULT '',
PRIMARY KEY (`id`),
INDEX `correlation_id` (`correlation`),
INDEX `status` (`status`)
)
COMMENT='pikaq数据存储'
COLLATE='utf8_general_ci'
ENGINE=InnoDB
;
实现接口 DbStoreProducerUserCallback
示例:
/**
* 保存 pika data
*/
@Service
public class DbStoreProducerUserCallbackImpl implements DbStoreProducerUserCallback {
@Autowired
private PikaDataDao pikaDataDao;
@Override
public void saveStatusData(String correlation, String data, String exchange, String routerKey)
throws StoreUserCallbackException {
pikaDataDao.createOne(correlation, data, exchange, routerKey);
}
}
其中,PikaDataDao 需要你自己根据你自己的orm类型来实现
pikaq-web-demo 提供一种orm实现方式
实现接口 DbStoreConsumerUserCallback
示例:
/**
* 保存 pika data
*/
@Service
public class DbStoreConsumerUserCallbackImpl implements DbStoreConsumerUserCallback {
@Autowired
private PikaDataDao pikaDataDao;
/**
* @param correlation
*
* @return
*/
public boolean exist(String correlation) {
PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
if (null != pikaData) {
return true;
}
return false;
}
/**
* @param correlation
*
* @return
*
* @throws StoreException
*/
public boolean isProcessing(String correlation) throws StoreException {
PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
if (null != pikaData) {
return pikaData.getStatus().equals(StoreDataStatusEnum.PROCESS.getValue());
}
throw new StoreUserCallbackException("cannot find this message " + correlation);
}
/**
* 将消息设置为处理中
*
* @param correlation
*
* @throws StoreUserCallbackException
*/
public void update2Processing(String correlation) throws StoreUserCallbackException {
PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
if (null != pikaData) {
pikaData.setStatus(StoreDataStatusEnum.PROCESS.getValue());
pikaDataDao.updateOne(pikaData);
} else {
throw new StoreUserCallbackException("cannot find this message " + correlation);
}
}
/**
* 将消息设置为成功,该条消息必须存在,否则可能就是bug了
*
* @param correlation
* @param infoMsg
*
* @throws StoreUserCallbackException
*/
public void update2Success(String correlation, String infoMsg, Long costTime) throws StoreUserCallbackException {
PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
if (pikaData != null) {
pikaData.setStatus(StoreDataStatusEnum.SUCCESS.getValue());
pikaData.setConsumeTime(costTime);
pikaData.setInfoMsg(infoMsg);
pikaDataDao.updateOne(pikaData);
} else {
throw new StoreUserCallbackException("cannot find this message " + correlation);
}
}
/**
* @param correlation
* @param infoMsg
*
* @throws StoreUserCallbackException
*/
public void update2Failed(String correlation, String infoMsg, Long costTime) throws StoreUserCallbackException {
PikaData pikaData = pikaDataDao.getByCorrelation(correlation);
if (pikaData != null) {
pikaData.setStatus(StoreDataStatusEnum.FAILED.getValue());
pikaData.setConsumeTime(costTime);
pikaData.setInfoMsg(infoMsg);
pikaDataDao.updateOne(pikaData);
} else {
throw new StoreUserCallbackException("update2Failed cannot find this message: " + correlation);
}
}
}