记得上下班打卡 | git大法好,push需谨慎

Commit e8a31236 authored by 张国柄's avatar 张国柄

Merge remote-tracking branch 'origin/jxl_422_mongo_list' into pre

parents 6186dfa7 d826483d
package com.liquidnet.service.goblin.dto;
import com.liquidnet.commons.lang.util.JsonUtils;
import lombok.Data;
import java.io.Serializable;
/**
* mongo队列消息体
*
* @author jiangxiulong
*/
@Data
public class GoblinQueueBizMongoDto implements Serializable, Cloneable {
private static final long serialVersionUID = -5850588358672254766L;
/**
* Mongo集合名
*/
private String collect;
/**
* Mongo集合中的字段属性名称
*/
private String column;
/**
* Mongo集合中的字段属性值
*/
private String bizId;
/**
* Redis缓存中的Key前缀
*/
private String prefix;
/**
* 操作类型[1-insert|2-update]
*/
private int opType;
private static final GoblinQueueBizMongoDto obj = new GoblinQueueBizMongoDto();
public static GoblinQueueBizMongoDto getNew() {
try {
return (GoblinQueueBizMongoDto) obj.clone();
} catch (CloneNotSupportedException e) {
return new GoblinQueueBizMongoDto();
}
}
public String toJson() {
return JsonUtils.toJson(this);
}
}
......@@ -293,6 +293,7 @@ public class MQConst {
GOBLIN_NFT_ORDER("goblin:stream:nftOrder:create", "group.nftOrder:create", "NFT订单处理"),
BIZ_INTEGRAL("goblin:stream:biz_integral", "group.biz.integral", "增减积分操作"),
BIZ_NFT_MONGO("goblin:stream:biz:mongo:nft", "group.biz.mongo.nft", "NFT的mongo操作"),
;
......
......@@ -2,9 +2,7 @@ package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinBizArtworkClqRdsReceiver;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinBizArtworkUplRdsReceiver;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinBizIntegralReceiver;
import com.liquidnet.service.consumer.kylin.receiver.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -32,6 +30,10 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
// ConsumerGoblinBizArtworkGenRdsReceiver consumerGoblinBizArtworkGenRdsReceiver;
@Autowired
ConsumerGoblinBizIntegralReceiver consumerGoblinBizIntegralReceiver;
@Autowired
ConsumerGoblinMdbNftOrderReceiver consumerGoblinMdbNftOrderReceiver;
@Autowired
ConsumerGoblinMdbNftArtworkReceiver consumerGoblinMdbNftArtworkReceiver;
@Bean// 藏品上传声明
public List<Subscription> subscriptionBizArtworkUpl(RedisConnectionFactory factory) {
......@@ -96,4 +98,36 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
}
return subscriptionList;
}
@Bean// NFT订单 mongo操作
public List<Subscription> subscriptionBizNftOrder(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_NFT_MONGO;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 20; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinMdbNftOrderReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
@Bean// NFT数字藏品 mongo操作
public List<Subscription> subscriptionBizNftArtwork(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_GEN;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinMdbNftArtworkReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.goblin.dto.GoblinQueueBizMongoDto;
import com.mongodb.BasicDBObject;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.util.HashMap;
/**
* 公共的业务Mongo数据处理队列消息监听器,具体业务消费逻辑通过`consumerMessageHandler`实现
*
* @author zhanggb
* Created by IntelliJ IDEA at 2022/4/22
*/
@Slf4j
public abstract class AbstractMdbRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
public StringRedisTemplate stringRedisTemplate;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private RedisUtil redisUtil;
private static final BasicDBObject BASIC_DB_OBJECT = new BasicDBObject();
@Override
public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get(MQConst.QUEUE_MESSAGE_KEY));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
}
private boolean consumerMessageHandler(String msg) {
boolean aBoolean = true;
try {
GoblinQueueBizMongoDto queueBizMongoDto = JsonUtils.fromJson(msg, GoblinQueueBizMongoDto.class);
if (null != queueBizMongoDto) {
String collectName = queueBizMongoDto.getCollect(), columnName = queueBizMongoDto.getColumn();
String prefix = queueBizMongoDto.getPrefix(), bizId = queueBizMongoDto.getBizId();
Object o;
switch (queueBizMongoDto.getOpType()) {
case 1:// insert
o = redisUtil.get(prefix.concat(bizId));
if (null != o) {
mongoTemplate.insert(o, collectName);
}
break;
case 2:// update
o = redisUtil.get(prefix.concat(bizId));
if (null != o) {
BasicDBObject basicDBObject = (BasicDBObject) AbstractMdbRedisReceiver.BASIC_DB_OBJECT.clone();
UpdateResult updateResult = mongoTemplate.getCollection(collectName).updateOne(
Query.query(Criteria.where(columnName).is(bizId)).getQueryObject(),
basicDBObject.append("$set", mongoTemplate.getConverter().convertToMongoType(o))
);
}
break;
default:
log.error("CONSUMER MSG ERR_HANDLE[未知的操作类型,{}]", msg);
break;
}
}
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
aBoolean = false;
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put(MQConst.QUEUE_MESSAGE_KEY, msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
\ No newline at end of file
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinMdbNftArtworkReceiver extends AbstractMdbRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.BIZ_ARTWORK_GEN.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.BIZ_ARTWORK_GEN.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinMdbNftOrderReceiver extends AbstractMdbRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.BIZ_NFT_MONGO.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.BIZ_NFT_MONGO.getGroup();
}
}
......@@ -1045,10 +1045,12 @@ public class GoblinStoreMgtGoodsServiceImpl implements IGoblinstoreMgtGoodsServi
});
toMqSqls.add(SqlMapping.get("goblin_goods_sku_nft.insert_for_digital"));
LinkedList<Object[]> initGoodsSkuNftObjs = CollectionUtil.linkedListObjectArr();
initGoodsSkuNftObjs.add(new Object[]{
skuId, initGoodsSkuInfoVo.getRouteType(), initGoodsSkuInfoVo.getMaterialType(), initGoodsSkuInfoVo.getMaterialUrl(), initGoodsSkuInfoVo.getUpchain(),
initGoodsSkuInfoVo.getDisplayUrl(), initGoodsSkuInfoVo.getNftUrl(), now
});
if ("0".equals(initGoodsSkuInfoVo.getUnbox())) {
initGoodsSkuNftObjs.add(new Object[]{
skuId, initGoodsSkuInfoVo.getRouteType(), initGoodsSkuInfoVo.getMaterialType(), initGoodsSkuInfoVo.getMaterialUrl(), initGoodsSkuInfoVo.getUpchain(),
initGoodsSkuInfoVo.getDisplayUrl(), initGoodsSkuInfoVo.getNftUrl(), now
});
}
toMqSqls.add(SqlMapping.get("goblin_goods.update_by_edit_sku_for_digital"));
// LinkedList<Object[]> updateGoodsObjs = CollectionUtil.linkedListObjectArr();
......
......@@ -7,6 +7,7 @@ import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.UserPathDto;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import com.liquidnet.service.goblin.constant.GoblinStatusConst;
import com.liquidnet.service.goblin.dto.vo.*;
import com.liquidnet.service.goblin.entity.GoblinNftOrder;
......@@ -82,7 +83,7 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService {
return ResponseDto.failure("该商品当前状态不可购买~");
}
// 单独设置的售罄
if (null != skuVo.getSoldoutStatus() && skuVo.getSoldoutStatus().equals(1)) {
if (StringUtils.equals("1", skuVo.getSoldoutStatus())) {
return ResponseDto.failure("该商品已售罄~");
}
// 判断开售、停售时间
......@@ -453,7 +454,8 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService {
// nftOrderUtils.addNftOrderList(uid, orderVo.getOrderId());
// mongo
goblinMongoUtils.setGoblinNftOrderVo(orderVo);
goblinOrderUtils.setMongoList(GoblinNftOrderVo.class.getSimpleName(), "orderId", orderVo.getOrderId(), GoblinRedisConst.REDIS_GOBLIN_NFT_ORDER_INFO, 1);
// goblinMongoUtils.setGoblinNftOrderVo(orderVo);
// 执行sql
String sqlData = SqlMapping.gets(sqls, sqlDataOrder);
......@@ -645,7 +647,8 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService {
// redis
nftOrderUtils.setNftOrder(orderVo);
// mongo
goblinMongoUtils.updateGoblinNftOrderVo(orderVo);
goblinOrderUtils.setMongoList(GoblinNftOrderVo.class.getSimpleName(), "orderId", orderVo.getOrderId(), GoblinRedisConst.REDIS_GOBLIN_NFT_ORDER_INFO, 2);
// goblinMongoUtils.updateGoblinNftOrderVo(orderVo);
// mysql
LinkedList<String> sqls = CollectionUtil.linkedListString();
LinkedList<Object[]> sqlDataOrder = CollectionUtil.linkedListObjectArr();
......@@ -721,8 +724,13 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService {
nftOrderUtils.setNftOrder(nftOrder);
nftOrderUtils.setBackOrderVo(nftOrderRefundVo);
//mongo
goblinMongoUtils.updateGoblinNftOrderVo(nftOrder);
goblinMongoUtils.updateGoblinNftOrderRefundVo(nftOrderRefundVo);
goblinOrderUtils.setMongoList(GoblinNftOrderVo.class.getSimpleName(), "orderId", orderId, GoblinRedisConst.REDIS_GOBLIN_NFT_ORDER_INFO, 2);
// goblinMongoUtils.updateGoblinNftOrderVo(nftOrder);
HashMap<String, Object> updateRefundMdbMap = CollectionUtil.mapStringObject();
updateRefundMdbMap.put("status", nftOrderRefundVo.getStatus());
updateRefundMdbMap.put("refundAt", nftOrderRefundVo.getRefundAt());
updateRefundMdbMap.put("updatedAt", nftOrderRefundVo.getUpdatedAt());
goblinMongoUtils.updateGoblinNftOrderRefundVo(nftOrderRefundVo.getOrderRefundId(), updateRefundMdbMap);
//mysql
LinkedList<String> sqls = CollectionUtil.linkedListString();
LinkedList<Object[]> sqlsOrder = CollectionUtil.linkedListObjectArr();
......@@ -778,7 +786,8 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService {
digitalArtworkVo.setPublisher(goodsInfoVo.getPublisher());
// Mongo记录VO
goblinMongoUtils.insertGoblinUserDigitalArtworkVo(digitalArtworkVo);
goblinOrderUtils.setMongoList(GoblinUserDigitalArtworkVo.class.getSimpleName(), "artworkId", digitalArtworkVo.getArtworkId(), GoblinRedisConst.USER_DIGITAL_ARTWORK, 1);
// goblinMongoUtils.insertGoblinUserDigitalArtworkVo(digitalArtworkVo);
// Redis记录VO
goblinRedisUtils.setGoblinUserDigitalArtworkVo(digitalArtworkVo);
// Redis更新藏品ID列表
......
......@@ -10,6 +10,7 @@ import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
@Component
......@@ -113,11 +114,12 @@ public class GoblinMongoUtils {
public void setGoblinNftOrderVo(GoblinNftOrderVo vo) {
mongoTemplate.insert(vo, GoblinNftOrderVo.class.getSimpleName());
}
public UpdateResult updateGoblinNftOrderVo(GoblinNftOrderVo data) {
BasicDBObject object = ObjectUtil.cloneBasicDBObject().append("$set", mongoConverter.convertToMongoType(data));
public UpdateResult updateGoblinNftOrderVo(String orderId, HashMap<String, Object> data) {
return mongoTemplate.getCollection(GoblinNftOrderVo.class.getSimpleName()).updateOne(
Query.query(Criteria.where("orderId").is(data.getOrderId())).getQueryObject(),
object);
Query.query(Criteria.where("orderId").is(orderId)).getQueryObject(),
ObjectUtil.cloneBasicDBObject().append("$set", mongoConverter.convertToMongoType(data))
);
}
//添加 订单退款数据
......@@ -129,11 +131,11 @@ public class GoblinMongoUtils {
return mongoTemplate.findOne(Query.query(Criteria.where("refundCode").is(refundCode)), GoblinNftOrderRefundVo.class, GoblinNftOrderRefundVo.class.getSimpleName());
}
//修改 订单退款数据
public UpdateResult updateGoblinNftOrderRefundVo(GoblinNftOrderRefundVo data) {
BasicDBObject object = ObjectUtil.cloneBasicDBObject().append("$set", mongoConverter.convertToMongoType(data));
public UpdateResult updateGoblinNftOrderRefundVo(String orderRefundId, HashMap<String, Object> data) {
return mongoTemplate.getCollection(GoblinNftOrderRefundVo.class.getSimpleName()).updateOne(
Query.query(Criteria.where("orderRefundId").is(data.getOrderRefundId())).getQueryObject(),
object);
Query.query(Criteria.where("orderRefundId").is(orderRefundId)).getQueryObject(),
ObjectUtil.cloneBasicDBObject().append("$set", mongoConverter.convertToMongoType(data))
);
}
/**
......
......@@ -11,6 +11,7 @@ import com.liquidnet.service.candy.param.BackCouponParam;
import com.liquidnet.service.candy.vo.CandyUseResultVo;
import com.liquidnet.service.goblin.constant.GoblinStatusConst;
import com.liquidnet.service.goblin.dto.GoblinQueueBizIntegralDto;
import com.liquidnet.service.goblin.dto.GoblinQueueBizMongoDto;
import com.liquidnet.service.goblin.dto.vo.*;
import com.liquidnet.service.goblin.entity.GoblinBackOrder;
import com.liquidnet.service.goblin.entity.GoblinBackOrderLog;
......@@ -220,6 +221,24 @@ public class GoblinOrderUtils {
}
}
/**
* mongo操作入队列
* @param collect Mongo集合名
* @param column Mongo集合中的字段属性名称
* @param bizId Mongo集合中的字段属性值
* @param prefix Redis缓存中的Key前缀
* @param opType 操作类型[1-insert|2-update]
*/
public void setMongoList(String collect, String column, String bizId, String prefix, int opType) {
GoblinQueueBizMongoDto goblinQueueBizMongoDto = GoblinQueueBizMongoDto.getNew();
goblinQueueBizMongoDto.setCollect(collect);
goblinQueueBizMongoDto.setColumn(column);
goblinQueueBizMongoDto.setBizId(bizId);
goblinQueueBizMongoDto.setPrefix(prefix);
goblinQueueBizMongoDto.setOpType(opType);
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.BIZ_NFT_MONGO.getKey(), goblinQueueBizMongoDto.toJson());
}
public GoblinUseResultVo useStoreCoupon(String ucouponId, String content, BigDecimal totalPrice, List<GoblinOrderSku> goblinOrderSkuList, String uid) {
try {
List<GoblinUserCouponVo> voList = redisUtils.getUserCouponVos(uid);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment