记得上下班打卡 | git大法好,push需谨慎

Commit 09eb48f4 authored by 张国柄's avatar 张国柄

+queue:藏品生成队列及消费逻辑实现;

parent 3b8f4b8e
...@@ -166,6 +166,25 @@ public class GoblinRedisConst { ...@@ -166,6 +166,25 @@ public class GoblinRedisConst {
*/ */
public static final String USER_COUPON = PREFIX.concat("user_coupon:"); public static final String USER_COUPON = PREFIX.concat("user_coupon:");
/* ----------------------------------------------------------------- */
/**
* 我的藏品ID列表(首页)
* {goblin:u_d_art:${uid}, JsonUtils.toJson(List<String:artwork_id>)}
*/
public static final String USER_DIGITAL_ARTWORK_IDS = PREFIX.concat("u_d_art_ids:");
/**
* 我的藏品信息
* {goblin:u_d_art:${artwork_id}, JsonUtils.toJson(com.liquidnet.service.goblin.dto.vo.GoblinDigitalArtworkVo)}
*/
public static final String USER_DIGITAL_ARTWORK = PREFIX.concat("u_d_art:");
/**
* 我的藏品生成标记(有时效)
* {goblin:u_d_art:${uid+skuId+orderId},1}
*/
public static final String USER_DIGITAL_ARTWORK_GENMARK = PREFIX.concat("u_d_art_gm:");
/* ----------------------------------------------------------------- */ /* ----------------------------------------------------------------- */
/* ----------------------------------------------------------------- */ /* ----------------------------------------------------------------- */
} }
package com.liquidnet.service.goblin.dto;
import com.liquidnet.commons.lang.util.JsonUtils;
import lombok.Data;
import java.io.Serializable;
/**
* 购买藏品、开启盲盒后,生成藏品队列消息体
*
* @author zhanggb
* Created by IntelliJ IDEA at 2022/3/31
*/
@Data
public class GoblinQueueBizArtworkGenDto implements Serializable, Cloneable {
private static final long serialVersionUID = 8267639695935038399L;
private String uid;
private String skuId;
private String orderId;
private Integer source;
private static final GoblinQueueBizArtworkGenDto obj = new GoblinQueueBizArtworkGenDto();
public static GoblinQueueBizArtworkGenDto getNew() {
try {
return (GoblinQueueBizArtworkGenDto) obj.clone();
} catch (CloneNotSupportedException e) {
return new GoblinQueueBizArtworkGenDto();
}
}
public String toJson() {
return JsonUtils.toJson(this);
}
}
...@@ -31,7 +31,8 @@ public class GoblinStoreMgtDigitalGoodsAddSkuParam implements Serializable { ...@@ -31,7 +31,8 @@ public class GoblinStoreMgtDigitalGoodsAddSkuParam implements Serializable {
@NotNull(message = "藏品名称不能为空") @NotNull(message = "藏品名称不能为空")
@Size(max = 36, message = "藏品名称内容过长") @Size(max = 36, message = "藏品名称内容过长")
private String name; private String name;
@ApiModelProperty(position = 14, required = false, value = "款式名称[36],`unbox=0`时必传", example = "款式名称...") @ApiModelProperty(position = 14, required = true, value = "款式名称[36],`unbox=0`时必传", example = "款式名称...")
@NotNull(message = "款式名称不能为空")
@Size(max = 36, message = "款式名称内容过长") @Size(max = 36, message = "款式名称内容过长")
private String subtitle; private String subtitle;
@ApiModelProperty(position = 15, required = true, value = "藏品封面图片URL[256]", example = "http://123") @ApiModelProperty(position = 15, required = true, value = "藏品封面图片URL[256]", example = "http://123")
...@@ -100,9 +101,12 @@ public class GoblinStoreMgtDigitalGoodsAddSkuParam implements Serializable { ...@@ -100,9 +101,12 @@ public class GoblinStoreMgtDigitalGoodsAddSkuParam implements Serializable {
@NotNull(message = "藏品详情不能为空") @NotNull(message = "藏品详情不能为空")
private String details; private String details;
@ApiModelProperty(position = 31, required = true, value = "上架处理方式[1-等待手动上架|2-直接上架售卖|3-预约定时上架]", example = "1") // @ApiModelProperty(position = 31, required = true, value = "上架处理方式[1-等待手动上架|2-直接上架售卖|3-预约定时上架]", example = "1")
// @NotNull(message = "上架处理方式不能为空")
// @Pattern(regexp = "\\b(1|2|3)\\b", message = "上架处理方式参数无效")
@ApiModelProperty(position = 31, required = true, value = "上架处理方式[2-直接上架售卖|3-预约定时上架]", example = "1")
@NotNull(message = "上架处理方式不能为空") @NotNull(message = "上架处理方式不能为空")
@Pattern(regexp = "\\b(1|2|3)\\b", message = "上架处理方式参数无效") @Pattern(regexp = "\\b(2|3)\\b", message = "上架处理方式参数无效")
private String shelvesHandle; private String shelvesHandle;
@ApiModelProperty(position = 32, required = false, value = "预约上架时间[yyyy-MM-dd HH:mm:ss][上架处理方式为3-预约定时上架时需要指定]", example = "2022-03-25 00:00:00") @ApiModelProperty(position = 32, required = false, value = "预约上架时间[yyyy-MM-dd HH:mm:ss][上架处理方式为3-预约定时上架时需要指定]", example = "2022-03-25 00:00:00")
@Pattern(regexp = LnsRegex.Valid.DATETIME_FULL, message = "预约上架时间格式有误") @Pattern(regexp = LnsRegex.Valid.DATETIME_FULL, message = "预约上架时间格式有误")
......
...@@ -15,11 +15,11 @@ import java.time.LocalDateTime; ...@@ -15,11 +15,11 @@ import java.time.LocalDateTime;
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class GoblinDigitalArtworkListVo implements Serializable, Cloneable { public class GoblinDigitalArtworkListVo implements Serializable, Cloneable {
private static final long serialVersionUID = -1510553574954846520L; private static final long serialVersionUID = -1510553574954846520L;
@ApiModelProperty(position = 10, value = "是否盲盒[0-否|1-是]") @ApiModelProperty(position = 10, value = "藏品Id")
private String artworkId;
@ApiModelProperty(position = 11, value = "是否盲盒[0-否|1-是]")
private Integer unbox; private Integer unbox;
@ApiModelProperty(position = 11, value = "藏品Id")
private String artworkId;
@ApiModelProperty(position = 12, value = "藏品名称") @ApiModelProperty(position = 12, value = "藏品名称")
private String name; private String name;
@ApiModelProperty(position = 13, value = "藏品副名称") @ApiModelProperty(position = 13, value = "藏品副名称")
...@@ -30,8 +30,8 @@ public class GoblinDigitalArtworkListVo implements Serializable, Cloneable { ...@@ -30,8 +30,8 @@ public class GoblinDigitalArtworkListVo implements Serializable, Cloneable {
private Integer editionSn; private Integer editionSn;
@ApiModelProperty(position = 16, value = "藏品发行量") @ApiModelProperty(position = 16, value = "藏品发行量")
private Integer edition; private Integer edition;
@ApiModelProperty(position = 17, value = "获得方式[0-购买|1-兑换]") @ApiModelProperty(position = 17, value = "获得方式[1-购买|2-兑换|3-赠送|5-受赠]")
private Integer getway; private Integer source;
@ApiModelProperty(position = 18, value = "藏品状态,根据`unbox`区分盲盒来判断[0-生成中/未开启|1-已生成/已开启|2-生成失败/开启失败]") @ApiModelProperty(position = 18, value = "藏品状态,根据`unbox`区分盲盒来判断[0-生成中/未开启|1-已生成/已开启|2-生成失败/开启失败]")
private Integer state; private Integer state;
......
...@@ -11,8 +11,8 @@ import java.time.LocalDateTime; ...@@ -11,8 +11,8 @@ import java.time.LocalDateTime;
public class GoblinDigitalArtworkVo implements Serializable, Cloneable { public class GoblinDigitalArtworkVo implements Serializable, Cloneable {
private static final long serialVersionUID = -5920952408993245963L; private static final long serialVersionUID = -5920952408993245963L;
private String artworkId; private String artworkId;
private String skuId;
private String uid; private String uid;
private String skuId;
private String orderId; private String orderId;
private Integer editionSn; private Integer editionSn;
private String nftId; private String nftId;
......
...@@ -239,6 +239,7 @@ public class MQConst { ...@@ -239,6 +239,7 @@ public class MQConst {
SQL_STORE("goblin:stream:sql.store", "group.sql.ustore", "店铺相关"), SQL_STORE("goblin:stream:sql.store", "group.sql.ustore", "店铺相关"),
SQL_GOODS("goblin:stream:sql.goods", "group.sql.ustore", "商品相关"), SQL_GOODS("goblin:stream:sql.goods", "group.sql.ustore", "商品相关"),
SQL_COUPON("goblin:stream:sql.coupon", "group.sql.ustore", "店铺优惠券相关"), SQL_COUPON("goblin:stream:sql.coupon", "group.sql.ustore", "店铺优惠券相关"),
GOBLIN_STORE_MARKET("goblin:stream:store.market", "group.store.market", "店铺活动"), GOBLIN_STORE_MARKET("goblin:stream:store.market", "group.store.market", "店铺活动"),
GOBLIN_SELF_MARKET("goblin:stream:self.market", "group.self.market", "平台活动"), GOBLIN_SELF_MARKET("goblin:stream:self.market", "group.self.market", "平台活动"),
GOBLIN_ORDER_CREATE_PAY("goblin:stream:order:create_pay", "group.order:create_pay", "订单创建&支付"), GOBLIN_ORDER_CREATE_PAY("goblin:stream:order:create_pay", "group.order:create_pay", "订单创建&支付"),
...@@ -260,6 +261,9 @@ public class MQConst { ...@@ -260,6 +261,9 @@ public class MQConst {
GOBLIN_UN_PAY_7("goblin:stream:order:back:7", "group.order:back", "回滚关闭订单库存队列"), GOBLIN_UN_PAY_7("goblin:stream:order:back:7", "group.order:back", "回滚关闭订单库存队列"),
GOBLIN_UN_PAY_8("goblin:stream:order:back:8", "group.order:back", "回滚关闭订单库存队列"), GOBLIN_UN_PAY_8("goblin:stream:order:back:8", "group.order:back", "回滚关闭订单库存队列"),
GOBLIN_UN_PAY_9("goblin:stream:order:back:9", "group.order:back", "回滚关闭订单库存队列"), GOBLIN_UN_PAY_9("goblin:stream:order:back:9", "group.order:back", "回滚关闭订单库存队列"),
BIZ_ARTWORK_GEN("goblin:stream:biz_art:gen", "group.biz.artwork", "藏品生成"),
SQL_ARTWORK_GEN("goblin:stream:sql_art:gen", "group.biz.artwork", "藏品生成"),
; ;
private final String key; private final String key;
......
...@@ -30,14 +30,14 @@ public class GoblinDigitalArtwork implements Serializable { ...@@ -30,14 +30,14 @@ public class GoblinDigitalArtwork implements Serializable {
private String artworkId; private String artworkId;
/** /**
* 单品ID * 用户UID
*/ */
private String skuId; private String uid;
/** /**
* 用户UID * 单品ID
*/ */
private String uid; private String skuId;
/** /**
* 藏品订单号 * 藏品订单号
......
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.DESUtils; import com.liquidnet.commons.lang.util.DESUtils;
import com.liquidnet.commons.lang.util.HttpUtil; import com.liquidnet.commons.lang.util.HttpUtil;
import com.liquidnet.commons.lang.util.IdentityUtils; import com.liquidnet.commons.lang.util.IdentityUtils;
...@@ -74,12 +73,16 @@ public class TestAdam { ...@@ -74,12 +73,16 @@ public class TestAdam {
"sweet:manual:pushList", "sweet:manual:pushList",
"goblin:sku:relation:226416501783461229", "goblin:sku:relation:226416501783461229",
"goblin:bsc:goods:2264185547679ZZ87427", "goblin:bsc:goods:2284183012778",
"goblin:bsc:goods_sku:226418554767964895ZZ87427", "goblin:bsc:goods_sku:226418554767964895ZZ87427",
"goblin:user_coupon:926591841436344323959830", "goblin:user_coupon:926591841436344323959830",
"goblin:bsc:ustore:809406", "goblin:bsc:ustore:809406",
"goblin:bsc:ustore:922833055536291845022973", "goblin:bsc:ustore:922833055536291845022973",
"goblin:bsc:store:839fa8dde17b5bf248575370f1249eab", "goblin:bsc:store:839fa8dde17b5bf248575370f1249eab",
"goblin:bsc:self_g_c",
"goblin:u_d_art:2290114757532",
"goblin:u_d_art_ids:481073045099397123024514",
"goblin:u_d_art:TEST123228418301277894015TEST1234567890",
}; };
for (String key : keys) { for (String key : keys) {
long value = key.hashCode(); long value = key.hashCode();
...@@ -104,12 +107,5 @@ public class TestAdam { ...@@ -104,12 +107,5 @@ public class TestAdam {
@Test @Test
public void testTmp() { public void testTmp() {
// String post = HttpUtil.post("http://ttestkylin.zhengzai.tv/kylin/inner/cache/member?uid=925802662655180800214832", CollectionUtil.linkedMultiValueMapStringString());
// System.out.println("===" + post);
LocalDateTime now = LocalDateTime.now();
for (int i = 0; i < 15; i++) {
System.out.println(now.getSecond() + "----" + now.getSecond()%10);
}
} }
} }
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinBizArtworkGenRdsReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
ConsumerGoblinBizArtworkGenRdsReceiver consumerGoblinBizArtworkGenRdsReceiver;
@Bean// 藏品生成
public List<Subscription> subscriptionBizArtworkGen(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_GEN;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinBizArtworkGenRdsReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinSqlArtworkGenRdsReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class ConsumerCommonSqlRedisStreamConfig extends RedisStreamConfig {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
ConsumerGoblinSqlArtworkGenRdsReceiver consumerGoblinSqlArtworkGenRdsReceiver;
@Bean// 藏品生成
public List<Subscription> subscriptionSqlArtworkGen(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.SQL_ARTWORK_GEN;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinSqlArtworkGenRdsReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.consumer.kylin.service.IBaseDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
/**
* 公共的业务队列消息监听器,具体业务消费逻辑通过`consumerMessageHandler`实现
*
* @author zhanggb
* Created by IntelliJ IDEA at 2022/3/31
*/
@Slf4j
public abstract class AbstractBizRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
public IBaseDao baseDao;
@Autowired
public StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
}
protected abstract boolean consumerMessageHandler(String msg);
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
\ No newline at end of file
...@@ -3,7 +3,7 @@ package com.liquidnet.service.consumer.kylin.receiver; ...@@ -3,7 +3,7 @@ package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.service.IBaseDao; import com.liquidnet.service.consumer.kylin.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
...@@ -13,6 +13,12 @@ import org.springframework.data.redis.stream.StreamListener; ...@@ -13,6 +13,12 @@ import org.springframework.data.redis.stream.StreamListener;
import java.util.HashMap; import java.util.HashMap;
/**
* 公共的SQL队列消息监听器,具体SQL消费逻辑统一使用`consumerMessageHandler`
*
* @author zhanggb
* Created by IntelliJ IDEA at 2022/3/31
*/
@Slf4j @Slf4j
public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired @Autowired
......
...@@ -6,7 +6,7 @@ import com.alibaba.fastjson.JSON; ...@@ -6,7 +6,7 @@ import com.alibaba.fastjson.JSON;
import com.liquidnet.common.cache.redis.util.RedisUtil; import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.service.consumer.kylin.dto.PhoneDto; import com.liquidnet.service.consumer.kylin.dto.PhoneDto;
import com.liquidnet.service.consumer.service.IBaseDao; import com.liquidnet.service.consumer.kylin.service.IBaseDao;
import com.liquidnet.service.goblin.constant.GoblinRedisConst; import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
......
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.DateUtil;
import com.liquidnet.commons.lang.util.IDGenerator;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import com.liquidnet.service.goblin.dto.GoblinQueueBizArtworkGenDto;
import com.liquidnet.service.goblin.dto.vo.GoblinDigitalArtworkVo;
import com.liquidnet.service.goblin.dto.vo.GoblinGoodsSkuInfoVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@Slf4j
@Component
public class ConsumerGoblinBizArtworkGenRdsReceiver extends AbstractBizRedisReceiver {
@Autowired
private RedisUtil redisUtil;
@Autowired
private MongoTemplate mongoTemplate;
private static final String SQL_INSERT_ARTWORK = "INSERT INTO goblin_digital_artwork (artwork_id, sku_id, uid, order_id, source, state, created_at)VALUES(?,?,?,?,?,?,?)";
private static final String QUEUE_MSG_TL = "{\"nftOrderPayId\":\"1\",\"routerType\":\"zxinchain\",\"skuId\":\"2\",\"userId\":\"3\",\"buyTimestamp\":\"4\"}";
private static final String QUEUE_KEY_NFT = "galaxy:stream:rk.json.nftPublishAndBuy";
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.BIZ_ARTWORK_GEN.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.BIZ_ARTWORK_GEN.getGroup();
}
@Override
protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
GoblinQueueBizArtworkGenDto fromJsonObj = JsonUtils.fromJson(msg, GoblinQueueBizArtworkGenDto.class);
if (fromJsonObj == null) {
log.warn("CONSUMER MSG NULL_DTO ==> [{}]:{}", this.getRedisStreamKey(), msg);
aBoolean = true;
} else {
aBoolean = this.bizArtworkGenProcessing(fromJsonObj);
}
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
private boolean bizArtworkGenProcessing(GoblinQueueBizArtworkGenDto dto) {
String uid = dto.getUid(), skuId = dto.getSkuId(), orderId = dto.getOrderId();
Integer source = dto.getSource();
GoblinGoodsSkuInfoVo goodsSkuInfoVo = this.getGoodsSkuInfoVoFromRdb(skuId);
if (null != goodsSkuInfoVo) {
// 判断是否已生成
String genMarkerKey = GoblinRedisConst.USER_DIGITAL_ARTWORK_GENMARK.concat(uid).concat(skuId).concat(orderId);
if (this.hasUserDigitalArtworkVoFromRdb(genMarkerKey)) {// 已生成
log.warn("#CONSUMER MSG EXIST_ART[{}]:[uid={},skuId={},orderId={},source={}]", this.getRedisStreamKey(),
uid, skuId, orderId, source);
return true;
}
// GoblinDigitalArtworkVo existDigitalArtworkVo = this.getDigitalArtworkVoFromMdb(uid, skuId, orderId);
// if (null != existDigitalArtworkVo) {// 已生成
// log.warn("#CONSUMER MSG EXIST_ART[{}]:[uid={},skuId={},orderId={},source={}]", this.getRedisStreamKey(),
// uid, skuId, orderId, source);
// return true;
// }
LocalDateTime now = LocalDateTime.now();
GoblinDigitalArtworkVo digitalArtworkVo = GoblinDigitalArtworkVo.getNew();
String artworkId = IDGenerator.nextMilliId2();
digitalArtworkVo.setArtworkId(artworkId);
digitalArtworkVo.setSkuId(skuId);
digitalArtworkVo.setUid(uid);
digitalArtworkVo.setOrderId(orderId);
digitalArtworkVo.setSource(source);
digitalArtworkVo.setState(0);
digitalArtworkVo.setDelFlg("0");
digitalArtworkVo.setCreatedAt(now);
// Mongo记录VO
mongoTemplate.insert(digitalArtworkVo, GoblinDigitalArtworkVo.class.getSimpleName());
// Redis记录VO
redisUtil.set(GoblinRedisConst.USER_DIGITAL_ARTWORK.concat(digitalArtworkVo.getArtworkId()), JsonUtils.toJson(digitalArtworkVo));
// Redis更新藏品ID列表
List<String> userDigitalArtworkIds = this.getUserDigitalArtworkIdsFromRdb(uid);
if (CollectionUtils.isEmpty(userDigitalArtworkIds)) {
userDigitalArtworkIds = CollectionUtil.arrayListString();
userDigitalArtworkIds.add(artworkId);
} else {
userDigitalArtworkIds.add(0, artworkId);
int size = userDigitalArtworkIds.size();
if (size > 30) userDigitalArtworkIds.remove(size - 1);
}
redisUtil.set(GoblinRedisConst.USER_DIGITAL_ARTWORK_IDS.concat(uid), userDigitalArtworkIds);
StreamOperations<String, Object, Object> streamOperations = stringRedisTemplate.opsForStream();
// Mysql持久化
HashMap<String, String> sqlArtworkMap = CollectionUtil.mapStringString();
LinkedList<String> toMqSqls = CollectionUtil.linkedListString();
toMqSqls.add(SQL_INSERT_ARTWORK);
LinkedList<Object[]> initArtworkObjs = CollectionUtil.linkedListObjectArr();
initArtworkObjs.add(new Object[]{digitalArtworkVo.getArtworkId(), skuId, uid, orderId, source, digitalArtworkVo.getState(), now});
sqlArtworkMap.put("message", SqlMapping.gets(toMqSqls, initArtworkObjs));
streamOperations.add(StreamRecords.mapBacked(sqlArtworkMap).withStreamKey(MQConst.GoblinQueue.SQL_ARTWORK_GEN.getKey()));
// NFT发行购买
HashMap<String, String> bizNftBuyMap = CollectionUtil.mapStringString();
bizNftBuyMap.put("message", QUEUE_MSG_TL.replace("1", orderId).replace("2", skuId).replace("3", uid).replace("4", DateUtil.Formatter.yyyyMMddHHmmss.format(now)));
streamOperations.add(StreamRecords.mapBacked(bizNftBuyMap).withStreamKey(QUEUE_KEY_NFT));// TODO: 2022/3/30 ==zhanggb.anjiabin
// Redis生成标记(缓存三天),用于上面的生成检查
redisUtil.set(genMarkerKey, 1, 259200);
} else {
log.warn("#CONSUMER MSG NULL_SKU[{}]:[uid={},skuId={},orderId={},source={}]", this.getRedisStreamKey(), uid, skuId, orderId, source);
log.warn("#CONSUMER MSG NULL_SKU[{}]:[uid={},skuId={},orderId={},source={}]", this.getRedisStreamKey(), uid, skuId, orderId, source);
log.warn("#CONSUMER MSG NULL_SKU[{}]:[uid={},skuId={},orderId={},source={}]", this.getRedisStreamKey(), uid, skuId, orderId, source);
}
return true;
}
/* ------------------------------------------------------------------------------------ */
// public void setDigitalArtworkVo(GoblinDigitalArtworkVo vo) {
// mongoTemplate.insert(vo, GoblinDigitalArtworkVo.class.getSimpleName());
// redisUtil.set(GoblinRedisConst.USER_DIGITAL_ARTWORK.concat(vo.getArtworkId()), vo);
//
// LinkedList<Object[]> paramsList = CollectionUtil.linkedListObjectArr();
// paramsList.add(new Object[]{vo.getArtworkId(), vo.getSkuId(), vo.getUid(), vo.getOrderId(), vo.getSource(), vo.getState(), vo.getCreatedAt()});
// this.baseDao.batchSql(SQL_INSERT_ARTWORK, paramsList);
// }
//
// public void sendMsgByRedisToQueue(String streamKey, String jsonMsg) {
// HashMap<String, String> map = CollectionUtil.mapStringString();
// map.put("message", jsonMsg);
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
// }
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
public List<String> getUserDigitalArtworkIdsFromRdb(String uid) {
return (List<String>) redisUtil.get(GoblinRedisConst.USER_DIGITAL_ARTWORK_IDS.concat(uid));
}
public GoblinGoodsSkuInfoVo getGoodsSkuInfoVoFromRdb(String skuId) {
String rk = GoblinRedisConst.BASIC_GOODS_SKU.concat(skuId);
GoblinGoodsSkuInfoVo vo = (GoblinGoodsSkuInfoVo) redisUtil.get(rk);
if (null == vo && null != (vo = this.getGoodsSkuInfoVoFromMdb(skuId))) {
redisUtil.set(rk, vo);
}
return vo;
}
public boolean hasUserDigitalArtworkVoFromRdb(String genMarkerKey) {
return redisUtil.hasKey(genMarkerKey);
}
/* ------------------------------------------------------------------------------------ */
public GoblinGoodsSkuInfoVo getGoodsSkuInfoVoFromMdb(String skuId) {
return mongoTemplate.findOne(Query.query(Criteria.where("skuId").is(skuId)),
GoblinGoodsSkuInfoVo.class, GoblinGoodsSkuInfoVo.class.getSimpleName());
}
// public GoblinDigitalArtworkVo getDigitalArtworkVoFromMdb(String uid, String skuId, String orderId) {
// return mongoTemplate.findOne(Query.query(Criteria.where("uid").is(uid).and("skuId").is(skuId).and("orderId").is(orderId)),
// GoblinDigitalArtworkVo.class, GoblinDigitalArtworkVo.class.getSimpleName());
// }
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinSqlArtworkGenRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.SQL_ARTWORK_GEN.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.SQL_ARTWORK_GEN.getGroup();
}
}
package com.liquidnet.service.consumer.service; package com.liquidnet.service.consumer.kylin.service;
import java.util.LinkedList; import java.util.LinkedList;
......
package com.liquidnet.service.consumer.kylin.service.impl; package com.liquidnet.service.consumer.kylin.service.impl;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.consumer.service.IBaseDao; import com.liquidnet.service.consumer.kylin.service.IBaseDao;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
......
...@@ -83,8 +83,8 @@ create table goblin_digital_artwork ...@@ -83,8 +83,8 @@ create table goblin_digital_artwork
( (
mid bigint auto_increment primary key, mid bigint auto_increment primary key,
artwork_id varchar(64) not null comment '藏品ID', artwork_id varchar(64) not null comment '藏品ID',
sku_id varchar(64) not null comment '单品ID',
uid varchar(64) not null comment '用户UID', uid varchar(64) not null comment '用户UID',
sku_id varchar(64) not null comment '单品ID',
order_id varchar(64) not null comment '藏品订单号', order_id varchar(64) not null comment '藏品订单号',
edition_sn int null comment '序列号', edition_sn int null comment '序列号',
nft_id varchar(256) null comment '藏品NFT ID', nft_id varchar(256) null comment '藏品NFT ID',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment