记得上下班打卡 | git大法好,push需谨慎

Commit 6563b39c authored by zhanggb's avatar zhanggb

~que:GoblinQueue.BIZ_NFT_MONGO:重新入队调整;

parent cbc252a7
package com.liquidnet.service.consumer.base.receiver; package com.liquidnet.service.consumer.base.receiver;
import com.fasterxml.jackson.databind.JsonNode;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.HttpUtil; import com.liquidnet.commons.lang.util.HttpUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.constant.MQConst; import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -56,11 +54,12 @@ public class ConsumerGoblinBizNftMongoReceiver extends AbstractBizRedisReceiver ...@@ -56,11 +54,12 @@ public class ConsumerGoblinBizNftMongoReceiver extends AbstractBizRedisReceiver
String postUrl = serviceGoblinUrl.concat("/goblin/que/mdb/adapt"); String postUrl = serviceGoblinUrl.concat("/goblin/que/mdb/adapt");
try { try {
String postRespStr = HttpUtil.postRaw(postUrl, msg, null); String postRespStr = HttpUtil.postRaw(postUrl, msg, null);
JsonNode postRespJNode = JsonUtils.fromJson(postRespStr, JsonNode.class), postRespCode; // 失败重新入队逻辑改至`被调用API`实现
if (null == postRespJNode || null == (postRespCode = postRespJNode.get("code")) || !postRespCode.asText().equals("0")) { // JsonNode postRespJNode = JsonUtils.fromJson(postRespStr, JsonNode.class), postRespCode;
log.warn("#MDB数据处理:处理失败[paramsStr={},postRespStr={}]", msg, postRespStr); // if (null == postRespJNode || null == (postRespCode = postRespJNode.get("code")) || !postRespCode.asText().equals("0")) {
return false; // log.warn("#MDB数据处理:处理失败[paramsStr={},postRespStr={}]", msg, postRespStr);
} // return false;
// }
return true; return true;
} catch (Exception e) { } catch (Exception e) {
log.error("Ex.MDB数据处理:处理异常[url={},paramsStr={}],ex:{}", postUrl, msg, e.getMessage()); log.error("Ex.MDB数据处理:处理异常[url={},paramsStr={}],ex:{}", postUrl, msg, e.getMessage());
......
...@@ -3,8 +3,10 @@ package com.liquidnet.service.goblin.service.impl.inner; ...@@ -3,8 +3,10 @@ package com.liquidnet.service.goblin.service.impl.inner;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.MdbMessage; import com.liquidnet.service.base.MdbMessage;
import com.liquidnet.service.base.ResponseDto; import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.goblin.util.GoblinRedisUtils; import com.liquidnet.service.goblin.util.GoblinRedisUtils;
import com.liquidnet.service.goblin.util.ObjectUtil; import com.liquidnet.service.goblin.util.ObjectUtil;
import com.liquidnet.service.goblin.util.QueueUtils;
import com.mongodb.BasicDBObject; import com.mongodb.BasicDBObject;
import com.mongodb.client.result.UpdateResult; import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -17,6 +19,8 @@ import org.springframework.stereotype.Service; ...@@ -17,6 +19,8 @@ import org.springframework.stereotype.Service;
@Slf4j @Slf4j
@Service @Service
public class GoblinQueBizArtworkMdbService { public class GoblinQueBizArtworkMdbService {
@Autowired
QueueUtils queueUtils;
@Autowired @Autowired
GoblinRedisUtils goblinRedisUtils; GoblinRedisUtils goblinRedisUtils;
@Autowired @Autowired
...@@ -29,39 +33,48 @@ public class GoblinQueBizArtworkMdbService { ...@@ -29,39 +33,48 @@ public class GoblinQueBizArtworkMdbService {
* @return ResponseDto<String> * @return ResponseDto<String>
*/ */
public ResponseDto<String> bizMongoAdaptProcessing(String msg) { public ResponseDto<String> bizMongoAdaptProcessing(String msg) {
MdbMessage queueBizMongoDto = JsonUtils.fromJson(msg, MdbMessage.class); try {
if (null != queueBizMongoDto) { MdbMessage queueBizMongoDto = JsonUtils.fromJson(msg, MdbMessage.class);
String collectName = queueBizMongoDto.getCollect(), columnName = queueBizMongoDto.getColumn(); if (null != queueBizMongoDto) {
String prefix = queueBizMongoDto.getPrefix(), bizId = queueBizMongoDto.getBizId(); String collectName = queueBizMongoDto.getCollect(), columnName = queueBizMongoDto.getColumn();
String prefix = queueBizMongoDto.getPrefix(), bizId = queueBizMongoDto.getBizId();
Object o; Object o;
switch (queueBizMongoDto.getOpType()) { switch (queueBizMongoDto.getOpType()) {
case 1:// insert case 1:// insert
o = goblinRedisUtils.get(prefix.concat(bizId)); o = goblinRedisUtils.get(prefix.concat(bizId));
if (null != o) { if (null != o) {
mongoTemplate.insert(o, collectName); mongoTemplate.insert(o, collectName);
} else { } else {
log.error("#MDB数据处理:处理失败[Redis获取数据为空,msg={}]", msg); log.warn("#MDB数据处理:处理失败[Redis获取数据为空,msg={}]", msg);
return ResponseDto.failure("处理失败"); // queueUtils.sendMsgByRedis(MQConst.GoblinQueue.BIZ_NFT_MONGO.getKey(), msg);
} // return ResponseDto.failure("处理失败");
break; }
case 2:// update break;
o = goblinRedisUtils.get(prefix.concat(bizId)); case 2:// update
if (null != o) { o = goblinRedisUtils.get(prefix.concat(bizId));
BasicDBObject basicDBObject = ObjectUtil.cloneBasicDBObject(); if (null != o) {
UpdateResult updateResult = mongoTemplate.getCollection(collectName).updateOne( BasicDBObject basicDBObject = ObjectUtil.cloneBasicDBObject();
Query.query(Criteria.where(columnName).is(bizId)).getQueryObject(), UpdateResult updateResult = mongoTemplate.getCollection(collectName).updateOne(
basicDBObject.append("$set", mongoTemplate.getConverter().convertToMongoType(o)) Query.query(Criteria.where(columnName).is(bizId)).getQueryObject(),
); basicDBObject.append("$set", mongoTemplate.getConverter().convertToMongoType(o))
} else { );
log.error("#MDB数据处理:处理失败[Redis获取数据为空,msg={}]", msg); } else {
return ResponseDto.failure("处理失败"); log.warn("#MDB数据处理:处理失败[Redis获取数据为空,msg={}]", msg);
} // queueUtils.sendMsgByRedis(MQConst.GoblinQueue.BIZ_NFT_MONGO.getKey(), msg);
break; // return ResponseDto.failure("处理失败");
default: }
log.error("#MDB数据处理:无效操作[未知的操作类型,msg={}]", msg); break;
return ResponseDto.failure("无效操作"); default:
log.warn("#MDB数据处理:无效操作[未知的操作类型,msg={}]", msg);
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.BIZ_NFT_MONGO.getKey(), msg);
return ResponseDto.failure("无效操作");
}
} }
} catch (Exception e) {
log.error("Ex.MDB数据处理:处理异常[msg={}]", msg, e);
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.BIZ_NFT_MONGO.getKey(), msg);
return ResponseDto.failure("操作异常");
} }
return ResponseDto.success(); return ResponseDto.success();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment