记得上下班打卡 | git大法好,push需谨慎

Commit 3b8f4b8e authored by jiangxiulong's avatar jiangxiulong

使用自己的队列

parent 4d015335
...@@ -242,6 +242,7 @@ public class MQConst { ...@@ -242,6 +242,7 @@ public class MQConst {
GOBLIN_STORE_MARKET("goblin:stream:store.market", "group.store.market", "店铺活动"), GOBLIN_STORE_MARKET("goblin:stream:store.market", "group.store.market", "店铺活动"),
GOBLIN_SELF_MARKET("goblin:stream:self.market", "group.self.market", "平台活动"), GOBLIN_SELF_MARKET("goblin:stream:self.market", "group.self.market", "平台活动"),
GOBLIN_ORDER_CREATE_PAY("goblin:stream:order:create_pay", "group.order:create_pay", "订单创建&支付"), GOBLIN_ORDER_CREATE_PAY("goblin:stream:order:create_pay", "group.order:create_pay", "订单创建&支付"),
GOBLIN_NFT_ORDER("goblin:stream:nftOrder:create", "group.nftOrder:create", "NFT订单处理"),
GOBLIN_SHOP_CART("goblin:stream:sql.shopcart","group.shop.shopcart","购物车"), GOBLIN_SHOP_CART("goblin:stream:sql.shopcart","group.shop.shopcart","购物车"),
GOBLIN_ORDER_AGAIN("goblin:stream:order:again", "group.order:again", "订单再次支付"), GOBLIN_ORDER_AGAIN("goblin:stream:order:again", "group.order:again", "订单再次支付"),
GOBLIN_ORDER_CLOSE("goblin:stream:order:close", "group.order:close", "订单关闭"), GOBLIN_ORDER_CLOSE("goblin:stream:order:close", "group.order:close", "订单关闭"),
......
...@@ -2,6 +2,7 @@ package com.liquidnet.service.consumer.kylin.config; ...@@ -2,6 +2,7 @@ package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig; import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst; import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinNftOrderRdsReceiver;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinOrderCPRdsReceiver; import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinOrderCPRdsReceiver;
import lombok.var; import lombok.var;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -20,21 +21,28 @@ public class ConsumerGoblinOrderCPRedisStreamConfig extends RedisStreamConfig { ...@@ -20,21 +21,28 @@ public class ConsumerGoblinOrderCPRedisStreamConfig extends RedisStreamConfig {
@Autowired @Autowired
ConsumerGoblinOrderCPRdsReceiver consumerGoblinOrderCPRdsReceiver; ConsumerGoblinOrderCPRdsReceiver consumerGoblinOrderCPRdsReceiver;
@Autowired
ConsumerGoblinNftOrderRdsReceiver consumerGoblinNftOrderRdsReceiver;
/** /**
* 缺票登记 * 商城订单
*
* @param listenerContainer
* @param t
* @return
*/ */
private Subscription receiveGoblinOrderCP(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveGoblinOrderCP(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.name() + t)), return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCPRdsReceiver); StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCPRdsReceiver);
} }
/**
* NFT订单
*/
private Subscription receiveGoblinNftOrder(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_NFT_ORDER.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_NFT_ORDER.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_NFT_ORDER.getKey(), ReadOffset.lastConsumed()), consumerGoblinNftOrderRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */ /* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */ /*-------------------------------------- 商城订单 --------------------------------------*/
@Bean @Bean
public Subscription subscriptionGoblinOrderCP0(RedisConnectionFactory factory) { public Subscription subscriptionGoblinOrderCP0(RedisConnectionFactory factory) {
...@@ -60,5 +68,31 @@ public class ConsumerGoblinOrderCPRedisStreamConfig extends RedisStreamConfig { ...@@ -60,5 +68,31 @@ public class ConsumerGoblinOrderCPRedisStreamConfig extends RedisStreamConfig {
return subscription; return subscription;
} }
/*-------------------------------------- NFT订单 --------------------------------------*/
@Bean
public Subscription subscriptionGoblinNftOrder0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinNftOrder(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinNftOrder1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinNftOrder(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinNftOrder2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinNftOrder(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */ /* -------------------------------------------------------- | */
} }
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinNftOrderRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_NFT_ORDER.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_NFT_ORDER.getGroup();
}
}
...@@ -57,4 +57,8 @@ XGROUP CREATE goblin:stream:order:back:7 group.order:back 0 ...@@ -57,4 +57,8 @@ XGROUP CREATE goblin:stream:order:back:7 group.order:back 0
XADD goblin:stream:order:back:8 * 0 0 XADD goblin:stream:order:back:8 * 0 0
XGROUP CREATE goblin:stream:order:back:8 group.order:back 0 XGROUP CREATE goblin:stream:order:back:8 group.order:back 0
XADD goblin:stream:order:back:9 * 0 0 XADD goblin:stream:order:back:9 * 0 0
XGROUP CREATE goblin:stream:order:back:9 group.order:back 0 XGROUP CREATE goblin:stream:order:back:9 group.order:back 0
\ No newline at end of file
#--- NFT
XADD goblin:stream:nftOrder:create * 0 0
XGROUP CREATE goblin:stream:nftOrder:create group.nftOrder:create 0
\ No newline at end of file
...@@ -318,7 +318,7 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService { ...@@ -318,7 +318,7 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService {
// 执行sql // 执行sql
String sqlData = SqlMapping.gets(sqls, sqlDataOrder); String sqlData = SqlMapping.gets(sqls, sqlDataOrder);
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(), sqlData); queueUtils.sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_NFT_ORDER.getKey(), sqlData);
log.info(UserPathDto.setData("NFT下单(唤起支付)", nftOrder, NftPayResultVo)); log.info(UserPathDto.setData("NFT下单(唤起支付)", nftOrder, NftPayResultVo));
if (isFree) {// 免费直接回调 if (isFree) {// 免费直接回调
...@@ -484,7 +484,7 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService { ...@@ -484,7 +484,7 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService {
// mongo // mongo
goblinMongoUtils.updateGoblinNftOrderVo(orderVo); goblinMongoUtils.updateGoblinNftOrderVo(orderVo);
// mysql // mysql
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(), queueUtils.sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_NFT_ORDER.getKey(),
SqlMapping.gets(sqls, sqlDataOrder)); SqlMapping.gets(sqls, sqlDataOrder));
// 退款 // 退款
nftOrderUtils.refundOrderSku(orderId, syncOrderParam.getPaymentId(), syncOrderParam.getPaymentType()); nftOrderUtils.refundOrderSku(orderId, syncOrderParam.getPaymentId(), syncOrderParam.getPaymentType());
...@@ -511,11 +511,13 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService { ...@@ -511,11 +511,13 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService {
// mongo // mongo
goblinMongoUtils.updateGoblinNftOrderVo(orderVo); goblinMongoUtils.updateGoblinNftOrderVo(orderVo);
// mysql // mysql
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(), queueUtils.sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_NFT_ORDER.getKey(),
SqlMapping.gets(sqls, sqlDataOrder)); SqlMapping.gets(sqls, sqlDataOrder));
// 加积分 // 加积分
goblinOrderUtils.doTask(uid, orderVo.getPriceActual()); goblinOrderUtils.doTask(uid, orderVo.getPriceActual());
// TODO: jxl 2022/3/31 仍炳上链的对列 盲盒要不要仍
} }
// 写入用户订单列表 因取消的订单不展示 所以放在这里 // 写入用户订单列表 因取消的订单不展示 所以放在这里
nftOrderUtils.addNftOrderList(uid, orderVo.getOrderId()); nftOrderUtils.addNftOrderList(uid, orderVo.getOrderId());
...@@ -578,7 +580,7 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService { ...@@ -578,7 +580,7 @@ public class GoblinNftOrderServiceImpl implements IGoblinNftOrderService {
sqlsBackOrder.add(new Object[]{ sqlsBackOrder.add(new Object[]{
nftOrderRefundVo.getStatus(), now, now, nftOrderRefundVo.getOrderRefundId(), now, now nftOrderRefundVo.getStatus(), now, now, nftOrderRefundVo.getOrderRefundId(), now, now
}); });
queueUtils.sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getKey(), queueUtils.sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_NFT_ORDER.getKey(),
SqlMapping.gets(sqls, sqlsOrder, sqlsBackOrder)); SqlMapping.gets(sqls, sqlsOrder, sqlsBackOrder));
//减积分 //减积分
goblinOrderUtils.desTask(nftOrder.getUserId(), nftOrderRefundVo.getPrice()); goblinOrderUtils.desTask(nftOrder.getUserId(), nftOrderRefundVo.getPrice());
......
...@@ -279,7 +279,7 @@ public class GoblinNftOrderUtils { ...@@ -279,7 +279,7 @@ public class GoblinNftOrderUtils {
setBackOrderVo(backOrderVo);// redis setBackOrderVo(backOrderVo);// redis
goblinMongoUtils.insertGoblinNftOrderRefundVo(backOrderVo);// mongo goblinMongoUtils.insertGoblinNftOrderRefundVo(backOrderVo);// mongo
queueUtils.sendMsgByRedis(// mysql queueUtils.sendMsgByRedis(// mysql
MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getKey(), MQConst.GoblinQueue.GOBLIN_NFT_ORDER.getKey(),
SqlMapping.get("goblin_nft_order_refund.insert", SqlMapping.get("goblin_nft_order_refund.insert",
backOrderVo.getOrderRefundId(), backOrderVo.getRefundCode(), backOrderVo.getOrderId(), backOrderVo.getOrderRefundId(), backOrderVo.getRefundCode(), backOrderVo.getOrderId(),
backOrderVo.getOrderCode(), backOrderVo.getStoreId(), backOrderVo.getUserId(), backOrderVo.getOrderCode(), backOrderVo.getStoreId(), backOrderVo.getUserId(),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment