记得上下班打卡 | git大法好,push需谨慎

Commit fefc378f authored by jiangxiulong's avatar jiangxiulong

炳 mongo消费

parent fed72ff5
...@@ -56,31 +56,39 @@ public abstract class AbstractMdbRedisReceiver implements StreamListener<String, ...@@ -56,31 +56,39 @@ public abstract class AbstractMdbRedisReceiver implements StreamListener<String,
} }
private boolean consumerMessageHandler(String msg) { private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; boolean aBoolean = true;
try { try {
GoblinQueueBizMongoDto queueBizMongoDto = JsonUtils.fromJson(msg, GoblinQueueBizMongoDto.class); GoblinQueueBizMongoDto queueBizMongoDto = JsonUtils.fromJson(msg, GoblinQueueBizMongoDto.class);
if (null == queueBizMongoDto) { if (null != queueBizMongoDto) {
aBoolean = true;
} else {
String collectName = queueBizMongoDto.getCollect(), columnName = queueBizMongoDto.getColumn(); String collectName = queueBizMongoDto.getCollect(), columnName = queueBizMongoDto.getColumn();
String prefix = queueBizMongoDto.getBizId(), bizId = queueBizMongoDto.getBizId(); String prefix = queueBizMongoDto.getPrefix(), bizId = queueBizMongoDto.getBizId();
Object o;
switch (queueBizMongoDto.getOpType()) { switch (queueBizMongoDto.getOpType()) {
case 1:// insert case 1:// insert
mongoTemplate.insert(redisUtil.get(prefix.concat(bizId)), collectName); o = redisUtil.get(prefix.concat(bizId));
aBoolean = true; if (null != o) {
mongoTemplate.insert(o, collectName);
}
break; break;
case 2:// update case 2:// update
BasicDBObject basicDBObject = (BasicDBObject) AbstractMdbRedisReceiver.BASIC_DB_OBJECT.clone(); o = redisUtil.get(prefix.concat(bizId));
UpdateResult updateResult = mongoTemplate.getCollection(collectName).updateOne( if (null != o) {
Query.query(Criteria.where(columnName).is(bizId)).getQueryObject(), BasicDBObject basicDBObject = (BasicDBObject) AbstractMdbRedisReceiver.BASIC_DB_OBJECT.clone();
basicDBObject.append("$set", mongoTemplate.getConverter().convertToMongoType(redisUtil.get(prefix.concat(bizId)))) UpdateResult updateResult = mongoTemplate.getCollection(collectName).updateOne(
); Query.query(Criteria.where(columnName).is(bizId)).getQueryObject(),
aBoolean = updateResult.getModifiedCount() > 0; basicDBObject.append("$set", mongoTemplate.getConverter().convertToMongoType(o))
);
}
break;
default:
log.error("CONSUMER MSG ERR_HANDLE[未知的操作类型,{}]", msg);
break;
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
aBoolean = false;
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment