记得上下班打卡 | git大法好,push需谨慎

Commit 3036a748 authored by 张国柄's avatar 张国柄

+queue:创建藏品+NFT素材上传、声明;

parent 227a3e48
......@@ -49,13 +49,16 @@ public class GoblinStoreMgtDigitalGoodsAddParam implements Serializable {
@ApiModelProperty(position = 18, required = false, value = "商品标签列表")
private List<String> tagList;
@ApiModelProperty(position = 19, required = false, value = "注意事项[256]", example = "注意事项...")
@Size(max = 256, message = "商品简介内容过长")
@ApiModelProperty(position = 19, required = true, value = "注意事项[256]", example = "注意事项...")
// @NotBlank(message = "注意事项不能为空")
@Size(max = 256, message = "注意事项内容过长")
private String attention;
@ApiModelProperty(position = 20, required = false, value = "创作者[25]", example = "创作者...")
@ApiModelProperty(position = 20, required = true, value = "创作者[25]", example = "创作者...")
@NotBlank(message = "创作者不能为空")
@Size(max = 25, message = "创作者内容过长")
private String author;
@ApiModelProperty(position = 21, required = false, value = "发行方[25]", example = "发行方...")
@ApiModelProperty(position = 21, required = true, value = "发行方[25]", example = "发行方...")
@NotBlank(message = "发行方不能为空")
@Size(max = 25, message = "发行方内容过长")
private String publisher;
......
......@@ -27,6 +27,10 @@ public class GoblinStoreMgtDigitalGoodsAddSkuParam implements Serializable {
@ApiModelProperty(position = 12, required = false, value = "单品ID,编辑时必传")
private String skuId;
@ApiModelProperty(position = 12, required = false, value = "NFT区块链[zxinchain-至信链]", allowableValues = "zxinchain", example = "zxinchain")
@Pattern(regexp = "\\b(zxinchain)\\b", message = "NFT区块链参数无效")
private String routeType;
@ApiModelProperty(position = 13, required = true, value = "藏品名称[36]", example = "藏品名称...")
@NotNull(message = "藏品名称不能为空")
@Size(max = 36, message = "藏品名称内容过长")
......@@ -35,11 +39,11 @@ public class GoblinStoreMgtDigitalGoodsAddSkuParam implements Serializable {
@NotNull(message = "款式名称不能为空")
@Size(max = 36, message = "款式名称内容过长")
private String subtitle;
@ApiModelProperty(position = 15, required = true, value = "藏品封面图片URL[256]", example = "http://123")
@ApiModelProperty(position = 15, required = true, value = "藏品封面图片URL[256]", example = "https://img.zhengzai.tv/files/2020/08/31/5f4c75095e9bc.png")
@NotNull(message = "藏品封面图片不能为空")
@Size(max = 256, message = "藏品封面图片URL过长")
private String skuPic;
@ApiModelProperty(position = 16, required = true, value = "展示文件URL[256]", example = "1")
@ApiModelProperty(position = 16, required = true, value = "展示文件URL[256]", example = "https://img.zhengzai.tv/files/2020/08/31/5f4c75095e9bc.png")
@NotNull(message = "展示文件不能为空")
@Size(max = 256, message = "展示文件URL过长")
private String skuWatch;
......@@ -50,7 +54,7 @@ public class GoblinStoreMgtDigitalGoodsAddSkuParam implements Serializable {
@ApiModelProperty(position = 18, required = false, value = "我的藏品展示文件类型[1-图片|2-视频|3-模型],`unbox=0`时必传", example = "1")
@Pattern(regexp = "\\b(1|2|3)\\b", message = "我的藏品展示文件类型参数无效")
private String materialType;
@ApiModelProperty(position = 19, required = false, value = "我的藏品展示文件URL[256],`unbox=0`时必传", example = "http://123")
@ApiModelProperty(position = 19, required = false, value = "我的藏品展示文件URL[256],`unbox=0`时必传", example = "https://img.zhengzai.tv/files/2020/08/31/5f4c75095e9bc.png")
@Size(max = 256, message = "我的藏品展示文件URL过长")
private String materialUrl;
......@@ -138,6 +142,7 @@ public class GoblinStoreMgtDigitalGoodsAddSkuParam implements Serializable {
GoblinGoodsSkuInfoVo initVo = GoblinGoodsSkuInfoVo.getNew();
initVo.setSkuId(this.getSpuId().concat(StringUtils.right(String.valueOf(System.nanoTime()), 5)));
initVo.setSpuId(this.getSpuId());
initVo.setRouteType(StringUtils.isNotBlank(this.getRouteType()) ? this.getRouteType() : "zxinchain");
initVo.setSkuNo(this.getSpuId());
initVo.setSkuType(1);
initVo.setName(this.getName());
......@@ -185,6 +190,7 @@ public class GoblinStoreMgtDigitalGoodsAddSkuParam implements Serializable {
public GoblinGoodsSkuInfoVo initEditGoodsSkuInfoVo() {
GoblinGoodsSkuInfoVo updateVo = GoblinGoodsSkuInfoVo.getNew();
updateVo.setSkuId(this.getSkuId());
// updateVo.setRouteType(this.getRouteType());
updateVo.setDetails(this.getDetails());
updateVo.setShelvesHandle(this.getShelvesHandle());
......
......@@ -109,7 +109,7 @@ public class GoblinGoodsSkuInfoVo implements Serializable, Cloneable {
private Integer openingLimit;
@ApiModelProperty(position = 33, value = "NFT路由")
private String routeType;
@ApiModelProperty(position = 33, value = "NFT上传声明状态[0-待上传|1-已上传|2-上传失败|9-上传中]")
@ApiModelProperty(position = 33, value = "NFT上传声明状态[0-待上传|1-已声明|2-声明失败|9-声明中]")
private Integer upchain;
@ApiModelProperty(position = 33, value = "NFT预览图访问URL")
private String displayUrl;
......
......@@ -2,6 +2,7 @@ package com.liquidnet.client.admin.zhengzai.kylin.utils;
import com.liquidnet.common.cache.redis.util.RedisDataSourceUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component;
......@@ -22,7 +23,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
redisDataSourceUtil.getRedisKylinUtil().getStringRedisTemplate().opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
}
......@@ -324,7 +324,7 @@ public class HttpUtil {
// header
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add("Accept", MediaType.APPLICATION_JSON.toString());
httpHeaders.add("Accept", MediaType.APPLICATION_JSON_UTF8_VALUE);
httpHeaders.set("Content-Type", MediaType.APPLICATION_JSON_UTF8_VALUE);
if (headers != null) {
httpHeaders.addAll(headers);
......
package com.liquidnet.service.base.constant;
public class MQConst {
public static final String QUEUE_MESSAGE_KEY = "message";
public enum AdamQueue {
SMS_NOTICE("adam:stream:rk.sms.notice", "group.sms.sender", "短信通知"),
// SMS_SPREAD("adam:stream:rk.sms.spread", "group.sms.sender", "短信推广"),
......@@ -285,7 +287,8 @@ public class MQConst {
GOBLIN_UN_PAY_8("goblin:stream:order:back:8", "group.order:back", "回滚关闭订单库存队列"),
GOBLIN_UN_PAY_9("goblin:stream:order:back:9", "group.order:back", "回滚关闭订单库存队列"),
BIZ_ARTWORK_UPC("goblin:stream:biz_art:upc", "group.biz.artwork", "藏品声明查询"),
BIZ_ARTWORK_UPL("goblin:stream:biz_art:upl", "group.biz.artwork", "藏品上传声明"),
BIZ_ARTWORK_CLQ("goblin:stream:biz_art:clq", "group.biz.artwork", "藏品声明查询"),
BIZ_ARTWORK_GEN("goblin:stream:biz_art:gen", "group.biz.artwork", "藏品生成"),
SQL_ARTWORK_GEN("goblin:stream:sql_art:gen", "group.biz.artwork", "藏品生成"),
;
......
......@@ -45,7 +45,7 @@ public class GoblinGoodsSkuNft implements Serializable {
private String materialUrl;
/**
* NFT上传声明状态[0-待上传|1-已上传|2-上传失败|9-上传中]
* NFT上传声明状态[0-待上传|1-已声明|2-声明失败|9-声明中]
*/
private Integer upchain;
......
package com.liquidnet.service.adam.util;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
......@@ -35,7 +36,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
}
package com.liquidnet.service.candy.util;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
......@@ -22,7 +23,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
}
......@@ -45,7 +45,7 @@ public class QueueUtils {
}
// public void sendMsgByRedis(String streamKey, String jsonMsg) {
// HashMap<String, String> map = CollectionUtil.mapStringString();
// map.put("message", jsonMsg);
// map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
// try {
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
// } catch (Exception e) {
......
......@@ -3,7 +3,8 @@ package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinBizArtworkGenRdsReceiver;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinBizArtworkUpcRdsReceiver;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinBizArtworkClqRdsReceiver;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinBizArtworkUplRdsReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -24,20 +25,38 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
ConsumerGoblinBizArtworkUpcRdsReceiver consumerGoblinBizArtworkUpcRdsReceiver;
ConsumerGoblinBizArtworkUplRdsReceiver consumerGoblinBizArtworkUplRdsReceiver;
@Autowired
ConsumerGoblinBizArtworkClqRdsReceiver consumerGoblinBizArtworkClqRdsReceiver;
@Autowired
ConsumerGoblinBizArtworkGenRdsReceiver consumerGoblinBizArtworkGenRdsReceiver;
@Bean// 藏品上传声明
public List<Subscription> subscriptionBizArtworkUpl(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_UPL;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinBizArtworkUplRdsReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
@Bean// 藏品声明查询
public List<Subscription> subscriptionBizArtworkUpc(RedisConnectionFactory factory) {
public List<Subscription> subscriptionBizArtworkClq(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_UPC;
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_CLQ;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinBizArtworkUpcRdsReceiver
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinBizArtworkClqRdsReceiver
));
listenerContainer.start();
}
......
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.service.IBaseDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -24,7 +25,7 @@ public abstract class AbstractBizRedisReceiver implements StreamListener<String,
public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerMessageHandler(message.getValue().get(MQConst.QUEUE_MESSAGE_KEY));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
......
......@@ -8,6 +8,8 @@ import com.liquidnet.commons.lang.util.HttpUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.galaxy.dto.param.GalaxyArtSeriesClaimResultQueryReqDto;
import com.liquidnet.service.galaxy.dto.param.GalaxyArtSeriesClaimResultQueryRespDto;
import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import com.liquidnet.service.goblin.dto.vo.GoblinGoodsSkuInfoVo;
import lombok.extern.slf4j.Slf4j;
......@@ -19,20 +21,16 @@ import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@Slf4j
@Component
public class ConsumerGoblinBizArtworkUpcRdsReceiver extends AbstractBizRedisReceiver {
public class ConsumerGoblinBizArtworkClqRdsReceiver extends AbstractBizRedisReceiver {
@Autowired
private RedisUtil redisUtil;
@Autowired
......@@ -44,16 +42,16 @@ public class ConsumerGoblinBizArtworkUpcRdsReceiver extends AbstractBizRedisRece
// @Value("${liquidnet.service.galaxy.url}")// TODO: 2022/4/1 ==zhanggb
// private String sevGalaxyUrl;
private static final String SQL_UPDATE_GOODS_SKU_NFT = "UPDATE goblin_goods_sku_nft SET upchain=?,series_id=?,series_hash=?,nft_hash=?,declare_at=?,updated_at=? WHERE skuId=? AND upchain=9 ";
private static final String SQL_UPDATE_GOODS_SKU_NFT = "UPDATE goblin_goods_sku_nft SET upchain=?,series_id=?,series_hash=?,nft_hash=?,declare_at=?,updated_at=? WHERE sku_id=? ";
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.BIZ_ARTWORK_UPC.getKey();
return MQConst.GoblinQueue.BIZ_ARTWORK_CLQ.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.BIZ_ARTWORK_UPC.getGroup();
return MQConst.GoblinQueue.BIZ_ARTWORK_CLQ.getGroup();
}
@Override
......@@ -61,7 +59,7 @@ public class ConsumerGoblinBizArtworkUpcRdsReceiver extends AbstractBizRedisRece
boolean aBoolean = false;
try {
if (StringUtils.isEmpty(msg)) {
log.warn("CONSUMER MSG NULL_DTO ==> [{}]:{}", this.getRedisStreamKey(), msg);
log.warn("CONSUMER MSG NULL_MSG ==> [{}]:{}", this.getRedisStreamKey(), msg);
aBoolean = true;
} else {
String[] msgArr = msg.split(",");
......@@ -71,13 +69,13 @@ public class ConsumerGoblinBizArtworkUpcRdsReceiver extends AbstractBizRedisRece
LocalDateTime createAt = StringUtils.isEmpty(time) ? checkTime : LocalDateTime.parse(time);
long durationToMillis = Duration.between(createAt, checkTime).toMillis();
if (durationToMillis >= 0) {
aBoolean = this.bizArtworkUpcProcessing(skuId);
aBoolean = this.bizArtworkClqProcessing(skuId);
} else {
try {
Thread.sleep(Math.abs(durationToMillis));
} catch (InterruptedException ignored) {
}
aBoolean = this.bizArtworkUpcProcessing(skuId);
aBoolean = this.bizArtworkClqProcessing(skuId);
}
}
} catch (Exception e) {
......@@ -85,34 +83,35 @@ public class ConsumerGoblinBizArtworkUpcRdsReceiver extends AbstractBizRedisRece
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
map.put(MQConst.QUEUE_MESSAGE_KEY, msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
private boolean bizArtworkUpcProcessing(String skuId) {
GoblinGoodsSkuInfoVo goodsSkuInfoVo = this.getGoodsSkuInfoVoFromMdb(skuId);
if (null == goodsSkuInfoVo) {
private boolean bizArtworkClqProcessing(String skuId) {
GoblinGoodsSkuInfoVo mgtGoodsSkuInfoVo = this.getGoodsSkuInfoVoFromMdb(skuId);
if (null == mgtGoodsSkuInfoVo) {
log.warn("#CONSUMER MSG NULL_SKU[{}]:[skuId={}]", this.getRedisStreamKey(), skuId);
return true;
}
int skuType = goodsSkuInfoVo.getSkuType(), upchain = goodsSkuInfoVo.getUpchain();
String unbox = goodsSkuInfoVo.getUnbox();
if (1 != skuType || !"0".equals(unbox) || 9 != upchain) {// 非数字藏品 || 盲盒 || 非上传中
log.warn("#CONSUMER MSG VOID_SKU[{}]:[skuId={},skuType={},unbox={},upchain={}]",
this.getRedisStreamKey(), skuId, skuType, unbox, upchain);
int skuType = mgtGoodsSkuInfoVo.getSkuType(), upchain = mgtGoodsSkuInfoVo.getUpchain();
String unbox = mgtGoodsSkuInfoVo.getUnbox();
// 非数字藏品 || 盲盒 || 非声明中 || 已有声明系列ID
if (1 != skuType || !"0".equals(unbox) || 9 != upchain || StringUtils.isNotEmpty(mgtGoodsSkuInfoVo.getSeriesId())) {
log.warn("#CONSUMER MSG VOID_SKU[{}]:[skuId={},skuType={},unbox={},upchain={},seriesId={}]",
this.getRedisStreamKey(), skuId, skuType, unbox, upchain, mgtGoodsSkuInfoVo.getSeriesId());
return true;
}
List<String> checkNftClaimResult = this.checkNftClaimFromGalaxy(skuId, goodsSkuInfoVo.getRouteType());
if (CollectionUtils.isEmpty(checkNftClaimResult)) return false;
GalaxyArtSeriesClaimResultQueryRespDto resultQueryRespDto = this.queryNftSeriesClaimResult(skuId, mgtGoodsSkuInfoVo.getRouteType());
if (null == resultQueryRespDto) return false;// 声明查询失败,重新入队处理
String seriesId = checkNftClaimResult.get(0);
String txHash = checkNftClaimResult.get(1);
String nftHash = checkNftClaimResult.get(2);
String chainTimestamp = checkNftClaimResult.get(3);
String seriesId = resultQueryRespDto.getSeriesId();
String txHash = resultQueryRespDto.getTxHash();
String nftHash = resultQueryRespDto.getNftHash();
String chainTimestamp = resultQueryRespDto.getChainTimestamp();
LocalDateTime now = LocalDateTime.now();
mongoTemplate.getCollection(GoblinGoodsSkuInfoVo.class.getSimpleName()).updateOne(
......@@ -128,48 +127,43 @@ public class ConsumerGoblinBizArtworkUpcRdsReceiver extends AbstractBizRedisRece
LinkedList<Object[]> updateGoodsSkuNftObjs = CollectionUtil.linkedListObjectArr();
updateGoodsSkuNftObjs.add(new Object[]{1, seriesId, txHash, nftHash, chainTimestamp, now, skuId});
sqlUpdateMap.put("message", SqlMapping.gets(toMqSqls, updateGoodsSkuNftObjs));
sqlUpdateMap.put(MQConst.QUEUE_MESSAGE_KEY, SqlMapping.gets(toMqSqls, updateGoodsSkuNftObjs));
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(sqlUpdateMap).withStreamKey(MQConst.GoblinQueue.SQL_GOODS.getKey()));
return true;
}
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
private List<String> checkNftClaimFromGalaxy(String skuId, String routerType) {
LinkedMultiValueMap<String, String> paramsMap = CollectionUtil.linkedMultiValueMapStringString();
paramsMap.add("skuId", skuId);
paramsMap.add("routerType", routerType);
private GalaxyArtSeriesClaimResultQueryRespDto queryNftSeriesClaimResult(String skuId, String routerType) {
GalaxyArtSeriesClaimResultQueryReqDto artSeriesClaimResultQueryReqDto = GalaxyArtSeriesClaimResultQueryReqDto.getNew();
artSeriesClaimResultQueryReqDto.setSkuId(skuId);
artSeriesClaimResultQueryReqDto.setRouterType(routerType);
// String postUrl = sevGalaxyUrl + "/user/register", blockChainAddress;// TODO: 2022/4/1 ==zhanggb
String postUrl = "https://ENVgalaxy.zhengzai.tv/galaxy/artwork/seriesClaimResultQuery".replace("ENV", env.getProperty(CurrentUtil.CK_ENV_ACTIVE)), blockChainAddress;
String postUrl = "https://ENVgalaxy.zhengzai.tv/galaxy/artwork/seriesClaimResultQuery".replace("ENV", env.getProperty(CurrentUtil.CK_ENV_ACTIVE));
String postBoby = JsonUtils.toJson(artSeriesClaimResultQueryReqDto);
try {
LinkedMultiValueMap<String, String> headerMap = CollectionUtil.linkedMultiValueMapStringString();
headerMap.add("Accept", MediaType.APPLICATION_JSON_VALUE);
String postRespStr = HttpUtil.post(postUrl, paramsMap, headerMap);
JsonNode postRespJNode = JsonUtils.fromJson(postRespStr, JsonNode.class), postRespDataJNode;
if (null == postRespJNode || null == postRespJNode.get("code") || !postRespJNode.get("code").asText().equals("0")) {
log.warn("#CONSUMER MSG FAIL_UPC[{}]查询失败[paramsMap={},postRespStr={}]", this.getRedisStreamKey(), paramsMap, postRespStr);
String postRespStr = HttpUtil.postRaw(postUrl, postBoby, null);
JsonNode postRespJNode = JsonUtils.fromJson(postRespStr, JsonNode.class), postRespCode, postRespDataJNode;
if (null == postRespJNode || null == (postRespCode = postRespJNode.get("code")) || !postRespCode.asText().equals("0")) {
log.warn("#CONSUMER MSG FAIL_UPC[{}]查询失败[paramsStr={},postRespStr={}]", this.getRedisStreamKey(), postBoby, postRespStr);
return null;
}
if (1 != (postRespDataJNode = postRespJNode.get("data")).get("taskStatus").asInt()) {
log.warn("#CONSUMER MSG FAIL_UPC[{}]声明失败[paramsMap={},postRespStr={}]", this.getRedisStreamKey(), paramsMap, postRespStr);
log.warn("#CONSUMER MSG FAIL_UPC[{}]声明失败[paramsStr={},postRespStr={}]", this.getRedisStreamKey(), postBoby, postRespStr);
return null;
}
List<String> respDataList = CollectionUtil.arrayListString();
respDataList.add(postRespDataJNode.get("seriesId").asText());
respDataList.add(postRespDataJNode.get("txHash").asText());
respDataList.add(postRespDataJNode.get("nftHash").asText());
respDataList.add(postRespDataJNode.get("chainTimestamp").asText());
return respDataList;
return JsonUtils.OM().convertValue(postRespDataJNode, GalaxyArtSeriesClaimResultQueryRespDto.class);
} catch (Exception e) {
log.error("Ex.CONSUMER MSG ERROR_UPC[{}]请求异常[skuId={},url={},paramsMap={}],ex:{}",
this.getRedisStreamKey(), skuId, postUrl, paramsMap, e.getMessage());
log.error("Ex.CONSUMER MSG ERROR_UPC[{}]请求异常[url={},paramsStr={}],ex:{}", this.getRedisStreamKey(), postUrl, postBoby, e.getMessage());
return null;
}
}
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
......
package com.liquidnet.service.consumer.kylin.receiver;
import com.fasterxml.jackson.databind.JsonNode;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.CurrentUtil;
import com.liquidnet.commons.lang.util.HttpUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.galaxy.dto.param.GalaxyArtSeriesClaimReqDto;
import com.liquidnet.service.galaxy.dto.param.GalaxyArtSeriesClaimRespDto;
import com.liquidnet.service.galaxy.dto.param.GalaxyNftUploadReqDto;
import com.liquidnet.service.galaxy.dto.param.GalaxyNftUploadRespDto;
import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import com.liquidnet.service.goblin.dto.vo.GoblinGoodsInfoVo;
import com.liquidnet.service.goblin.dto.vo.GoblinGoodsSkuInfoVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.LinkedList;
@Slf4j
@Component
public class ConsumerGoblinBizArtworkUplRdsReceiver extends AbstractBizRedisReceiver {
@Autowired
private RedisUtil redisUtil;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
Environment env;
// @Value("${liquidnet.service.galaxy.url}")// TODO: 2022/4/1 ==zhanggb
// private String sevGalaxyUrl;
private static final String SQL_UPDATE_GOODS_SKU_NFT = "UPDATE goblin_goods_sku_nft SET upchain=?,display_url=?,nft_url=? WHERE sku_id=? AND upchain=0 ";
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.BIZ_ARTWORK_UPL.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.BIZ_ARTWORK_UPL.getGroup();
}
@Override
protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
if (StringUtils.isEmpty(msg)) {
log.warn("CONSUMER MSG NULL_DTO ==> [{}]:{}", this.getRedisStreamKey(), msg);
aBoolean = true;
} else {
aBoolean = this.bizArtworkUplProcessing(msg);
}
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put(MQConst.QUEUE_MESSAGE_KEY, msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
private boolean bizArtworkUplProcessing(String skuId) {
GoblinGoodsSkuInfoVo mgtGoodsSkuInfoVo = this.getGoodsSkuInfoVoFromMdb(skuId);
if (null == mgtGoodsSkuInfoVo) {
log.warn("#CONSUMER MSG NULL_SKU[{}]:[skuId={}]", this.getRedisStreamKey(), skuId);
return true;
}
int skuType = mgtGoodsSkuInfoVo.getSkuType(), upchain = mgtGoodsSkuInfoVo.getUpchain();
String unbox = mgtGoodsSkuInfoVo.getUnbox();
// 非数字藏品 || 盲盒 || 非声明中 || 已有声明系列ID
if (1 != skuType || !"0".equals(unbox) || 0 != upchain || StringUtils.isNotEmpty(mgtGoodsSkuInfoVo.getSeriesId())) {
log.warn("#CONSUMER MSG VOID_SKU[{}]:[skuId={},skuType={},unbox={},upchain={},seriesId={}]",
this.getRedisStreamKey(), skuId, skuType, unbox, upchain, mgtGoodsSkuInfoVo.getSeriesId());
return true;
}
String displayUrl = mgtGoodsSkuInfoVo.getDisplayUrl(), nftUrl = mgtGoodsSkuInfoVo.getNftUrl();
if (StringUtils.isBlank(displayUrl)) {// 未上传过的直接上传处理,已上传过的跳过上传直接声明
GalaxyNftUploadReqDto galaxyNftUploadReqDto = GalaxyNftUploadReqDto.getNew();
galaxyNftUploadReqDto.setSkuId(skuId);
galaxyNftUploadReqDto.setOriginalDisplayUrl(mgtGoodsSkuInfoVo.getSkuPic());
galaxyNftUploadReqDto.setOriginalNftUrl(mgtGoodsSkuInfoVo.getMaterialUrl());
galaxyNftUploadReqDto.setRouterType(mgtGoodsSkuInfoVo.getRouteType());
GalaxyNftUploadRespDto galaxyNftUploadRespDto = this.uploadNftMaterial(galaxyNftUploadReqDto);
if (null == galaxyNftUploadRespDto) return false;// 上传失败,重新入队处理
displayUrl = galaxyNftUploadRespDto.getDisplayUrl();
nftUrl = galaxyNftUploadRespDto.getNftUrl();
}
GoblinGoodsInfoVo mgtGoodsInfoVo = this.getGoodsInfoVoFromMdb(mgtGoodsSkuInfoVo.getSpuId());
String skuTitle = mgtGoodsSkuInfoVo.getName() + mgtGoodsSkuInfoVo.getSubtitle();
GalaxyArtSeriesClaimReqDto galaxyArtSeriesClaimReqDto = GalaxyArtSeriesClaimReqDto.getNew();
galaxyArtSeriesClaimReqDto.setAuthor(mgtGoodsInfoVo.getAuthor());
galaxyArtSeriesClaimReqDto.setCoverUrl(displayUrl);
galaxyArtSeriesClaimReqDto.setDisplayUrl(displayUrl);
galaxyArtSeriesClaimReqDto.setNftDesc(skuTitle);
galaxyArtSeriesClaimReqDto.setNftName(skuTitle);
galaxyArtSeriesClaimReqDto.setNftUrl(nftUrl);
galaxyArtSeriesClaimReqDto.setRouterType(mgtGoodsSkuInfoVo.getRouteType());
galaxyArtSeriesClaimReqDto.setSellCount(String.valueOf(mgtGoodsSkuInfoVo.getPrice()));
galaxyArtSeriesClaimReqDto.setSeriesDesc(skuTitle);
galaxyArtSeriesClaimReqDto.setSkuId(skuId);
galaxyArtSeriesClaimReqDto.setTotalCount(Long.valueOf(mgtGoodsSkuInfoVo.getSkuStock()));
// 声明失败,标记`声明失败`
upchain = null == this.claimNftSeries(galaxyArtSeriesClaimReqDto) ? 2 : 9;
mongoTemplate.getCollection(GoblinGoodsSkuInfoVo.class.getSimpleName()).updateOne(
Query.query(Criteria.where("skuId").is(skuId).and("delFlg").is("0")).getQueryObject(),
Update.update("upchain", upchain).set("displayUrl", displayUrl).set("nftUrl", nftUrl).getUpdateObject()
);
redisUtil.del(GoblinRedisConst.BASIC_GOODS_SKU.concat(skuId));
// Mysql持久化
HashMap<String, String> sqlUpdateMap = CollectionUtil.mapStringString();
LinkedList<String> toMqSqls = CollectionUtil.linkedListString();
toMqSqls.add(SQL_UPDATE_GOODS_SKU_NFT);
LinkedList<Object[]> updateGoodsSkuNftObjs = CollectionUtil.linkedListObjectArr();
updateGoodsSkuNftObjs.add(new Object[]{upchain, displayUrl, nftUrl, skuId});
sqlUpdateMap.put(MQConst.QUEUE_MESSAGE_KEY, SqlMapping.gets(toMqSqls, updateGoodsSkuNftObjs));
StreamOperations<String, Object, Object> streamOperations = stringRedisTemplate.opsForStream();
streamOperations.add(StreamRecords.mapBacked(sqlUpdateMap).withStreamKey(MQConst.GoblinQueue.SQL_GOODS.getKey()));
HashMap<String, String> toQueueBeClaimQueryMsg = CollectionUtil.mapStringString();
toQueueBeClaimQueryMsg.put(MQConst.QUEUE_MESSAGE_KEY, skuId);
streamOperations.add(StreamRecords.mapBacked(toQueueBeClaimQueryMsg).withStreamKey(MQConst.GoblinQueue.BIZ_ARTWORK_CLQ.getKey()));
return true;
}
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/**
* NFT素材上传
*
* @param nftUploadReqDto GalaxyNftUploadReqDto
* @return GalaxyNftUploadRespDto
*/
public GalaxyNftUploadRespDto uploadNftMaterial(GalaxyNftUploadReqDto nftUploadReqDto) {
// String postUrl = sevGalaxyUrl + "/user/register", blockChainAddress;// TODO: 2022/3/31 ==zhanggb
String postUrl = "https://ENVgalaxy.zhengzai.tv/galaxy/artwork/nftUpload".replace("ENV", env.getProperty(CurrentUtil.CK_ENV_ACTIVE)), blockChainAddress;
String postBody = JsonUtils.toJson(nftUploadReqDto);
try {
String postRespStr = HttpUtil.postRaw(postUrl, postBody, null);
JsonNode postRespJNode = JsonUtils.fromJson(postRespStr, JsonNode.class), postRespCode;
if (null == postRespJNode || null == (postRespCode = postRespJNode.get("code")) || !postRespCode.asText().equals("0")) {
log.warn("#NFT素材上传:请求失败[paramsStr={},postRespStr={}]", postBody, postRespStr);
return null;
}
return JsonUtils.OM().convertValue(postRespJNode.get("data"), GalaxyNftUploadRespDto.class);
} catch (Exception e) {
log.error("Ex.NFT素材上传:请求异常[url={},paramsStr={}],ex:{}", postUrl, postBody, e.getMessage());
return null;
}
}
/**
* NFT系列声明
*
* @param seriesClaimReqDto GalaxyArtSeriesClaimReqDto
* @return GalaxyArtSeriesClaimRespDto
*/
public GalaxyArtSeriesClaimRespDto claimNftSeries(GalaxyArtSeriesClaimReqDto seriesClaimReqDto) {
// String postUrl = sevGalaxyUrl + "/user/register", blockChainAddress;// TODO: 2022/3/31 ==zhanggb
String postUrl = "https://ENVgalaxy.zhengzai.tv/galaxy/artwork/seriesClaim".replace("ENV", env.getProperty(CurrentUtil.CK_ENV_ACTIVE)), blockChainAddress;
String postBody = JsonUtils.toJson(seriesClaimReqDto);
try {
String postRespStr = HttpUtil.postRaw(postUrl, postBody, null);
JsonNode postRespJNode = JsonUtils.fromJson(postRespStr, JsonNode.class), postRespCode;
if (null == postRespJNode || null == (postRespCode = postRespJNode.get("code")) || !postRespCode.asText().equals("0")) {
log.warn("#NFT系列声明:请求失败[paramsStr={},postRespStr={}]", postBody, postRespStr);
return null;
}
return JsonUtils.OM().convertValue(postRespJNode.get("data"), GalaxyArtSeriesClaimRespDto.class);
} catch (Exception e) {
log.error("Ex.NFT系列声明:请求异常[url={},paramsStr={}],ex:{}", postUrl, postBody, e.getMessage());
return null;
}
}
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
public GoblinGoodsSkuInfoVo getGoodsSkuInfoVoFromMdb(String skuId) {
return mongoTemplate.findOne(Query.query(Criteria.where("skuId").is(skuId).and("delFlg").is("0")),
GoblinGoodsSkuInfoVo.class, GoblinGoodsSkuInfoVo.class.getSimpleName());
}
public GoblinGoodsInfoVo getGoodsInfoVoFromMdb(String spuId) {
return mongoTemplate.findOne(Query.query(Criteria.where("spuId").is(spuId).and("delFlg").is("0")),
GoblinGoodsInfoVo.class, GoblinGoodsInfoVo.class.getSimpleName());
}
/* ------------------------------------------------------------------------------------ */
}
......@@ -33,7 +33,7 @@ alter table goblin_goods_sku add opening_limit int default 0 comment '盲盒开
# alter table goblin_goods_sku add route_type varchar(20) null comment 'NFT路由' after opening_limit;
# alter table goblin_goods_sku add material_type char null comment '素材原始文件类型[1-图片|2-视频|3-模型]' after watch_type;
# alter table goblin_goods_sku add material_url varchar(256) null comment '素材原始文件URL' after material_type;
# alter table goblin_goods_sku add upchain tinyint default 0 null comment 'NFT上传声明状态[0-待上传|1-已上传|2-上传失败|9-上传中]' after route_type;
# alter table goblin_goods_sku add upchain tinyint default 0 null comment 'NFT上传声明状态[0-待上传|1-已声明|2-声明失败|9-声明中]' after route_type;
# alter table goblin_goods_sku add display_url varchar(500) null comment 'NFT预览图URL' after upchain;
# alter table goblin_goods_sku add nft_url varchar(500) null comment 'NFT素材访问URL' after display_url;
# alter table goblin_goods_sku add series_id varchar(256) null comment 'NFT系列ID' after nft_url;
......@@ -49,7 +49,7 @@ create table goblin_goods_sku_nft
route_type varchar(20) null comment 'NFT路由',
material_type char null comment '素材原始文件类型[1-图片|2-视频|3-模型]',
material_url varchar(256) null comment '素材原始文件URL',
upchain tinyint default 0 comment 'NFT上传声明状态[0-待上传|1-已上传|2-上传失败|9-上传中]',
upchain tinyint default 0 comment 'NFT上传声明状态[0-待上传|1-已声明|2-声明失败|9-声明中]',
display_url varchar(500) null comment 'NFT预览图URL',
nft_url varchar(500) null comment 'NFT素材访问URL',
series_id varchar(256) null comment 'NFT系列ID',
......
......@@ -953,7 +953,7 @@ public class GoblinStoreMgtGoodsServiceImpl implements IGoblinstoreMgtGoodsServi
@Override
public void digitalGoodsAddSku(String uid, GoblinGoodsSkuInfoVo initGoodsSkuInfoVo, GoblinGoodsInfoVo mgtGoodsInfoVo) {
LocalDateTime now = LocalDateTime.now();
initGoodsSkuInfoVo.setRouteType("zxinchain");// TODO: 2022/3/25 ==zhanggb
// initGoodsSkuInfoVo.setRouteType("zxinchain");
initGoodsSkuInfoVo.setUpchain(0);
initGoodsSkuInfoVo.setCreatedAt(now);
initGoodsSkuInfoVo.setCreatedBy(uid);
......@@ -1020,6 +1020,9 @@ public class GoblinStoreMgtGoodsServiceImpl implements IGoblinstoreMgtGoodsServi
// LinkedList<Object[]> updateGoodsObjs = CollectionUtil.linkedListObjectArr();
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.SQL_GOODS.getKey(), SqlMapping.gets(toMqSqls, initGoodsSkuObjs, initGoodsSkuNftObjs, updateGoodsObjs));
if ("0".equals(initGoodsSkuInfoVo.getUnbox())) {
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.BIZ_ARTWORK_UPL.getKey(), skuId);// 藏品NFT上传、声明通过队列处理
}
}
@Override
......
......@@ -36,7 +36,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
......@@ -52,7 +52,7 @@ public class QueueUtils {
} else {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", xlsPath);
map.put(MQConst.QUEUE_MESSAGE_KEY, xlsPath);
map.put("type", type);
map.put("skuId", skuId);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getKey()));
......
......@@ -41,7 +41,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
......
......@@ -40,7 +40,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = ObjectUtil.cloneHashMapStringAndString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
......
......@@ -2,6 +2,7 @@ package com.liquidnet.service.platform.utils;
import com.liquidnet.common.cache.redis.util.RedisDataSourceUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component;
......@@ -36,7 +37,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
redisDataSourceUtil.getRedisKylinUtil().getStringRedisTemplate().opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
......@@ -48,7 +49,7 @@ public class QueueUtils {
*/
public void sendMsgByCandyRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
redisDataSourceUtil.getRedisCandyUtil().getStringRedisTemplate().opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
......@@ -60,7 +61,7 @@ public class QueueUtils {
*/
public void sendMsgByGoblinRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
redisDataSourceUtil.getRedisGoblinUtil().getStringRedisTemplate().opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
}
......@@ -36,7 +36,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
......@@ -52,7 +52,7 @@ public class QueueUtils {
} else {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", xlsPath);
map.put(MQConst.QUEUE_MESSAGE_KEY, xlsPath);
map.put("type", type);
map.put("skuId", skuId);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getKey()));
......
package com.liquidnet.service.stone.util;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
......@@ -35,7 +36,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
}
package com.liquidnet.service.sweet.utils;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
......@@ -35,7 +36,7 @@ public class QueueUtils {
*/
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment