记得上下班打卡 | git大法好,push需谨慎

Commit db20d1fa authored by zhanggb's avatar zhanggb

~queue:consumer-base.+biz.artwork.clq;

parent be62c0a2
...@@ -112,6 +112,7 @@ global-auth: ...@@ -112,6 +112,7 @@ global-auth:
- ${liquidnet.info.context}/nftGoods/list - ${liquidnet.info.context}/nftGoods/list
- ${liquidnet.info.context}/nftGoods/detail - ${liquidnet.info.context}/nftGoods/detail
- ${liquidnet.info.context}/rsc/** - ${liquidnet.info.context}/rsc/**
- ${liquidnet.info.context}/que/**
- ${liquidnet.info.context}/nftArtwork/** - ${liquidnet.info.context}/nftArtwork/**
- ${liquidnet.info.context}/nftPublish/** - ${liquidnet.info.context}/nftPublish/**
- ${liquidnet.info.context}/nftTrade/** - ${liquidnet.info.context}/nftTrade/**
......
...@@ -4,6 +4,7 @@ import com.liquidnet.common.cache.redis.config.RedisStreamConfig; ...@@ -4,6 +4,7 @@ import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst; import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.ConsumerGalaxyJsonNftPublishAndBuyReceiver; import com.liquidnet.service.consumer.base.receiver.ConsumerGalaxyJsonNftPublishAndBuyReceiver;
import com.liquidnet.service.consumer.base.receiver.ConsumerGalaxyJsonNftUserRegisterReceiver; import com.liquidnet.service.consumer.base.receiver.ConsumerGalaxyJsonNftUserRegisterReceiver;
import com.liquidnet.service.consumer.base.receiver.ConsumerGoblinBizArtworkClqReceiver;
import com.liquidnet.service.consumer.base.receiver.ConsumerGoblinBizArtworkUplReceiver; import com.liquidnet.service.consumer.base.receiver.ConsumerGoblinBizArtworkUplReceiver;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
...@@ -33,6 +34,8 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -33,6 +34,8 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
@Autowired @Autowired
ConsumerGoblinBizArtworkUplReceiver consumerGoblinBizArtworkUplReceiver; ConsumerGoblinBizArtworkUplReceiver consumerGoblinBizArtworkUplReceiver;
@Autowired @Autowired
ConsumerGoblinBizArtworkClqReceiver consumerGoblinBizArtworkClqReceiver;
@Autowired
private ConsumerGalaxyJsonNftPublishAndBuyReceiver jsonNftPublishAndBuyReceiver; private ConsumerGalaxyJsonNftPublishAndBuyReceiver jsonNftPublishAndBuyReceiver;
@Autowired @Autowired
private ConsumerGalaxyJsonNftUserRegisterReceiver jsonNftUserRegisterReceiver; private ConsumerGalaxyJsonNftUserRegisterReceiver jsonNftUserRegisterReceiver;
...@@ -53,6 +56,22 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -53,6 +56,22 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
return subscriptionList; return subscriptionList;
} }
@Bean// 藏品声明查询
public List<Subscription> subscriptionGoblinBizArtworkClq(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_CLQ;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 1; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinBizArtworkClqReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
/** /**
* galaxy发行和购买 * galaxy发行和购买
* @param factory * @param factory
......
package com.liquidnet.service.consumer.base.receiver;
import com.fasterxml.jackson.databind.JsonNode;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.HttpUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
@Slf4j
@Component
public class ConsumerGoblinBizArtworkClqReceiver extends AbstractBizRedisReceiver {
@Value("${liquidnet.service.goblin.url}")
private String serviceGoblinUrl;
@Override
protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
if (StringUtils.isEmpty(msg)) {
log.warn("CONSUMER MSG NULL_MSG ==> [{}]:{}", this.getRedisStreamKey(), msg);
aBoolean = true;
} else {
String[] msgArr = msg.split(",");
String skuId = msgArr[0], time = msgArr.length == 2 ? msgArr[1] : null;
LocalDateTime now = LocalDateTime.now(), checkTime = now.minusSeconds(10);
LocalDateTime createAt = StringUtils.isEmpty(time) ? checkTime : LocalDateTime.parse(time);
long durationToMillis = Duration.between(createAt, checkTime).toMillis();
if (durationToMillis >= 0) {
aBoolean = this.bizArtworkClqProcessing(skuId);
} else {
try {
Thread.sleep(Math.abs(durationToMillis));
} catch (InterruptedException ignored) {
}
aBoolean = this.bizArtworkClqProcessing(skuId);
}
}
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put(MQConst.QUEUE_MESSAGE_KEY, msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.BIZ_ARTWORK_CLQ.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.BIZ_ARTWORK_CLQ.getGroup();
}
private boolean bizArtworkClqProcessing(String skuId) {
String postUrl = serviceGoblinUrl + "/goblin/que/artwork/clq";
LinkedMultiValueMap<String, String> postDataMap = CollectionUtil.linkedMultiValueMapStringString();
try {
postDataMap.add("skuId", skuId);
String postRespStr = HttpUtil.post(postUrl, postDataMap);
JsonNode postRespJNode = JsonUtils.fromJson(postRespStr, JsonNode.class), postRespCode;
if (null == postRespJNode || null == (postRespCode = postRespJNode.get("code")) || !postRespCode.asText().equals("0")) {
log.warn("#NFT声明查询:请求失败[paramsStr={},postRespStr={}]", postDataMap, postRespStr);
return false;
}
return true;
} catch (Exception e) {
log.error("Ex.NFT声明查询:处理异常[url={},paramsStr={}],ex:{}", postUrl, postDataMap, e.getMessage());
return false;
}
}
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
}
...@@ -65,8 +65,12 @@ public class ConsumerGoblinBizArtworkUplReceiver extends AbstractBizRedisReceive ...@@ -65,8 +65,12 @@ public class ConsumerGoblinBizArtworkUplReceiver extends AbstractBizRedisReceive
} }
return true; return true;
} catch (Exception e) { } catch (Exception e) {
log.error("Ex.NFT素材上传:请求异常[url={},paramsStr={}],ex:{}", postUrl, postDataMap, e.getMessage()); log.error("Ex.NFT素材上传:处理异常[url={},paramsStr={}],ex:{}", postUrl, postDataMap, e.getMessage());
return false; return false;
} }
} }
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
} }
package com.liquidnet.service.goblin.controller.Inner; package com.liquidnet.service.goblin.controller.Inner;
import com.liquidnet.service.base.ResponseDto; import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.goblin.service.impl.inner.GoblinQueBizArtworkService; import com.liquidnet.service.goblin.service.impl.inner.GoblinQueBizArtworkClqService;
import com.liquidnet.service.goblin.service.impl.inner.GoblinQueBizArtworkUplService;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiImplicitParams;
...@@ -17,20 +18,31 @@ import org.springframework.web.bind.annotation.RestController; ...@@ -17,20 +18,31 @@ import org.springframework.web.bind.annotation.RestController;
import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotBlank;
@Slf4j @Slf4j
@Api(tags = "@API:QUE") @Api(tags = "@API:QUE:藏品上传声明")
@RestController @RestController
@Validated @Validated
@RequestMapping("que/artwork") @RequestMapping("que/artwork")
public class GoblinQueBizArtworkController { public class GoblinQueBizArtworkController {
@Autowired @Autowired
private GoblinQueBizArtworkService goblinQueBizArtworkService; private GoblinQueBizArtworkUplService goblinQueBizArtworkUplService;
@Autowired
private GoblinQueBizArtworkClqService goblinQueBizArtworkClqService;
@PostMapping("upl") @PostMapping("upl")
@ApiOperation("藏品上传声明") @ApiOperation("藏品上传声明")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(type = "form", required = true, dataType = "String", name = "skuId", value = "藏品ID", example = "1"), @ApiImplicitParam(type = "form", required = true, dataType = "String", name = "skuId", value = "藏品ID", example = "1"),
}) })
public ResponseDto<String> orderDetails(@NotBlank(message = "藏品ID不能为空") @RequestParam String skuId) { public ResponseDto<String> bizArtworkUpl(@NotBlank(message = "藏品ID不能为空") @RequestParam String skuId) {
return goblinQueBizArtworkService.bizArtworkUplProcessing(skuId); return goblinQueBizArtworkUplService.bizArtworkUplProcessing(skuId);
}
@PostMapping("clq")
@ApiOperation("藏品声明查询")
@ApiImplicitParams({
@ApiImplicitParam(type = "form", required = true, dataType = "String", name = "skuId", value = "藏品ID", example = "1"),
})
public ResponseDto<String> bizArtworkClq(@NotBlank(message = "藏品ID不能为空") @RequestParam String skuId) {
return goblinQueBizArtworkClqService.bizArtworkClqProcessing(skuId);
} }
} }
package com.liquidnet.service.goblin.service.impl.inner;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.galaxy.dto.param.GalaxyArtSeriesClaimResultQueryReqDto;
import com.liquidnet.service.galaxy.dto.param.GalaxyArtSeriesClaimResultQueryRespDto;
import com.liquidnet.service.galaxy.service.IGalaxyArtworkService;
import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import com.liquidnet.service.goblin.dto.vo.GoblinGoodsSkuInfoVo;
import com.liquidnet.service.goblin.util.GoblinMongoUtils;
import com.liquidnet.service.goblin.util.GoblinRedisUtils;
import com.liquidnet.service.goblin.util.QueueUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.LinkedList;
@Slf4j
@Service
public class GoblinQueBizArtworkClqService {
@Autowired
QueueUtils queueUtils;
@Autowired
GoblinRedisUtils goblinRedisUtils;
@Autowired
GoblinMongoUtils goblinMongoUtils;
@Autowired
MongoTemplate mongoTemplate;
@Resource(name = "galaxyArtworkServiceImpl")
private IGalaxyArtworkService galaxyArtworkService;
private static final String SQL_UPDATE_GOODS_SKU_NFT = "UPDATE goblin_goods_sku_nft SET upchain=?,series_id=?,series_hash=?,nft_hash=?,declare_at=?,updated_at=? WHERE sku_id=? ";
public ResponseDto<String> bizArtworkClqProcessing(String skuId) {
GoblinGoodsSkuInfoVo mgtGoodsSkuInfoVo = goblinMongoUtils.getGoodsSkuInfoVo(skuId);
if (null == mgtGoodsSkuInfoVo) {
log.warn("#NFT声明查询:藏品SKU不存在[skuId={}]", skuId);
return ResponseDto.success(String.format("藏品SKU不存在[skuId:%s]", skuId));
}
int skuType = mgtGoodsSkuInfoVo.getSkuType(), upchain = mgtGoodsSkuInfoVo.getUpchain();
String unbox = mgtGoodsSkuInfoVo.getUnbox();
// 非数字藏品 || 盲盒 || 非声明中 || 已有声明系列ID
if (1 != skuType || !"0".equals(unbox) || 9 != upchain || StringUtils.isNotEmpty(mgtGoodsSkuInfoVo.getSeriesId())) {
log.warn("#NFT声明查询:藏品SKU无效或已声明[skuId={},skuType={},unbox={},upchain={},seriesId={}]",
skuId, skuType, unbox, upchain, mgtGoodsSkuInfoVo.getSeriesId());
return ResponseDto.success(String.format("藏品SKU无效或已声明[skuId:%s]", skuId));
}
GalaxyArtSeriesClaimResultQueryRespDto resultQueryRespDto = this.queryNftSeriesClaimResult(skuId, mgtGoodsSkuInfoVo.getRouteType());
if (null == resultQueryRespDto) {
return ResponseDto.failure(String.format("藏品声明查询失败[skuId:%s]", skuId));// 声明查询失败,重新入队处理
}
String seriesId = resultQueryRespDto.getSeriesId();
String txHash = resultQueryRespDto.getTxHash();
String nftHash = resultQueryRespDto.getNftHash();
String chainTimestamp = resultQueryRespDto.getChainTimestamp();
LocalDateTime now = LocalDateTime.now();
mongoTemplate.getCollection(GoblinGoodsSkuInfoVo.class.getSimpleName()).updateOne(
Query.query(Criteria.where("skuId").is(skuId).and("delFlg").is("0")).getQueryObject(),
Update.update("upchain", 1).set("seriesId", seriesId).set("seriesHash", txHash).set("nftHash", nftHash).set("declareAt", chainTimestamp).getUpdateObject()
);
goblinRedisUtils.del(GoblinRedisConst.BASIC_GOODS_SKU.concat(skuId));
// Mysql持久化
// HashMap<String, String> sqlUpdateMap = CollectionUtil.mapStringString();
LinkedList<String> toMqSqls = CollectionUtil.linkedListString();
toMqSqls.add(SQL_UPDATE_GOODS_SKU_NFT);
LinkedList<Object[]> updateGoodsSkuNftObjs = CollectionUtil.linkedListObjectArr();
updateGoodsSkuNftObjs.add(new Object[]{1, seriesId, txHash, nftHash, chainTimestamp, now, skuId});
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.SQL_GOODS.getKey(), SqlMapping.gets(toMqSqls, updateGoodsSkuNftObjs));
// sqlUpdateMap.put(MQConst.QUEUE_MESSAGE_KEY, SqlMapping.gets(toMqSqls, updateGoodsSkuNftObjs));
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(sqlUpdateMap).withStreamKey(MQConst.GoblinQueue.SQL_GOODS.getKey()));
return ResponseDto.success();
}
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
private GalaxyArtSeriesClaimResultQueryRespDto queryNftSeriesClaimResult(String skuId, String routerType) {
GalaxyArtSeriesClaimResultQueryReqDto requestDto = GalaxyArtSeriesClaimResultQueryReqDto.getNew();
ResponseDto<GalaxyArtSeriesClaimResultQueryRespDto> responseDto = null;
try {
requestDto.setSkuId(skuId);
requestDto.setRouterType(routerType);
responseDto = galaxyArtworkService.seriesClaimResultQuery(requestDto);
if (!responseDto.isSuccess()) {
log.warn("#NFT声明查询:处理失败[paramsStr={},postRespStr={}]", JsonUtils.toJson(requestDto), JsonUtils.toJson(responseDto));
return null;
}
return responseDto.getData();
} catch (Exception e) {
log.error("Ex.NFT声明查询:处理异常[paramsStr={},postRespStr={}],ex:{}", JsonUtils.toJson(requestDto), JsonUtils.toJson(responseDto), e.getMessage());
return null;
}
}
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
}
...@@ -31,7 +31,7 @@ import java.util.LinkedList; ...@@ -31,7 +31,7 @@ import java.util.LinkedList;
@Slf4j @Slf4j
@Service @Service
public class GoblinQueBizArtworkService { public class GoblinQueBizArtworkUplService {
@Autowired @Autowired
QueueUtils queueUtils; QueueUtils queueUtils;
@Autowired @Autowired
...@@ -119,7 +119,7 @@ public class GoblinQueBizArtworkService { ...@@ -119,7 +119,7 @@ public class GoblinQueBizArtworkService {
// toQueueBeClaimQueryMsg.put(MQConst.QUEUE_MESSAGE_KEY, skuId.concat(",").concat(String.valueOf(LocalDateTime.now()))); // toQueueBeClaimQueryMsg.put(MQConst.QUEUE_MESSAGE_KEY, skuId.concat(",").concat(String.valueOf(LocalDateTime.now())));
// streamOperations.add(StreamRecords.mapBacked(toQueueBeClaimQueryMsg).withStreamKey(MQConst.GoblinQueue.BIZ_ARTWORK_CLQ.getKey())); // streamOperations.add(StreamRecords.mapBacked(toQueueBeClaimQueryMsg).withStreamKey(MQConst.GoblinQueue.BIZ_ARTWORK_CLQ.getKey()));
// return true; // return true;
return null; return ResponseDto.success();
} }
/* ------------------------------------------------------------------------------------ */ /* ------------------------------------------------------------------------------------ */
...@@ -139,12 +139,12 @@ public class GoblinQueBizArtworkService { ...@@ -139,12 +139,12 @@ public class GoblinQueBizArtworkService {
try { try {
responseDto = galaxyArtworkService.nftUpload(nftUploadReqDto); responseDto = galaxyArtworkService.nftUpload(nftUploadReqDto);
if (!responseDto.isSuccess()) { if (!responseDto.isSuccess()) {
log.warn("#NFT素材上传:请求失败[paramsStr={},postRespStr={}]", JsonUtils.toJson(nftUploadReqDto), JsonUtils.toJson(responseDto)); log.warn("#NFT素材上传:处理失败[paramsStr={},postRespStr={}]", JsonUtils.toJson(nftUploadReqDto), JsonUtils.toJson(responseDto));
return null; return null;
} }
return responseDto.getData(); return responseDto.getData();
} catch (Exception e) { } catch (Exception e) {
log.error("Ex.NFT素材上传:请求异常[paramsStr={},postRespStr={}],ex:{}", JsonUtils.toJson(nftUploadReqDto), JsonUtils.toJson(responseDto), e.getMessage()); log.error("Ex.NFT素材上传:处理异常[paramsStr={},postRespStr={}],ex:{}", JsonUtils.toJson(nftUploadReqDto), JsonUtils.toJson(responseDto), e.getMessage());
return null; return null;
} }
} }
...@@ -160,13 +160,17 @@ public class GoblinQueBizArtworkService { ...@@ -160,13 +160,17 @@ public class GoblinQueBizArtworkService {
try { try {
responseDto = galaxyArtworkService.seriesClaim(requestDto); responseDto = galaxyArtworkService.seriesClaim(requestDto);
if (!responseDto.isSuccess()) { if (!responseDto.isSuccess()) {
log.warn("#NFT系列声明:请求失败[paramsStr={},postRespStr={}]", JsonUtils.toJson(requestDto), JsonUtils.toJson(responseDto)); log.warn("#NFT系列声明:处理失败[paramsStr={},postRespStr={}]", JsonUtils.toJson(requestDto), JsonUtils.toJson(responseDto));
return null; return null;
} }
return responseDto.getData(); return responseDto.getData();
} catch (Exception e) { } catch (Exception e) {
log.error("Ex.NFT系列声明:请求异常[paramsStr={},postRespStr={}],ex:{}", JsonUtils.toJson(requestDto), JsonUtils.toJson(responseDto), e.getMessage()); log.error("Ex.NFT系列声明:处理异常[paramsStr={},postRespStr={}],ex:{}", JsonUtils.toJson(requestDto), JsonUtils.toJson(responseDto), e.getMessage());
return null; return null;
} }
} }
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment