记得上下班打卡 | git大法好,push需谨慎

Commit 51252913 authored by 胡佳晨's avatar 胡佳晨

增加 队列,goblin消费合并到kylin

parent 6092d222
......@@ -21,6 +21,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-cache-redis</artifactId>
......@@ -40,6 +48,11 @@
<artifactId>liquidnet-service-candy-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-service-goblin-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-sms</artifactId>
......
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinOrderAGRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinOrderAGRedisStreamConfig {
@Autowired
ConsumerGoblinOrderAGRdsReceiver consumerGoblinOrderAGRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinOrderAG(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup(), MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderAGRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinOrderAG(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderAG2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderAG3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinOrderCPRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinOrderCPRedisStreamConfig {
@Autowired
ConsumerGoblinOrderCPRdsReceiver consumerGoblinOrderCPRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinOrderCP(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup(), MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCPRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinOrderCP(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderCP(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderCP2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderCP(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderCP3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderCP(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinOrderCloseRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinOrderCloseRedisStreamConfig {
@Autowired
ConsumerGoblinOrderCloseRdsReceiver consumerGoblinOrderCloseRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinOrderClose(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getGroup(), MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCloseRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinOrderClose(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderClose(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderClose2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderClose(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderClose3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderClose(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinSelfMarketRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinSelfMarketRedisStreamConfig {
@Autowired
ConsumerGoblinSelfMarketRdsReceiver consumerGoblinSelfMarketRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinSelfMarket(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_SELF_MARKET.getGroup(), MQConst.GoblinQueue.GOBLIN_SELF_MARKET.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_SELF_MARKET.getKey(), ReadOffset.lastConsumed()), consumerGoblinSelfMarketRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinSelfMarket(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinSelfMarket(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinSelfMarket2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinSelfMarket(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinSelfMarket3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinSelfMarket(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinShopCartReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinShopCartStreamConfig {
@Autowired
ConsumerGoblinShopCartReceiver consumerGoblinShopCartReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 购物车
* @return Subscription
*/
private Subscription receiveSqlShopCart(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_SHOP_CART.getGroup(), MQConst.GoblinQueue.GOBLIN_SHOP_CART.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_SHOP_CART.getKey(), ReadOffset.lastConsumed()), consumerGoblinShopCartReceiver
);
}
/* -------------------------------------------------------- | */
@Bean
public Subscription subscriptionSqlShopCart1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlShopCart(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlShopCart2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlShopCart(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlShopCart3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlShopCart(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlShopCart4(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlShopCart(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinSqlCouponRdsReceiver;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinSqlGoodsRdsReceiver;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinSqlStoreRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinSqlUstoreRedisStreamConfig {
@Autowired
ConsumerGoblinSqlStoreRdsReceiver consumerGoblinSqlStoreRdsReceiver;
@Autowired
ConsumerGoblinSqlGoodsRdsReceiver consumerGoblinSqlGoodsRdsReceiver;
@Autowired
ConsumerGoblinSqlCouponRdsReceiver consumerGoblinSqlCouponRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 店铺相关
*
* @param listenerContainer StreamMessageListenerContainer
* @param t 消费者序号
* @return Subscription
*/
private Subscription receiveSqlStore(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.SQL_STORE.getGroup(), MQConst.GoblinQueue.SQL_STORE.name() + t),
StreamOffset.create(MQConst.GoblinQueue.SQL_STORE.getKey(), ReadOffset.lastConsumed()), consumerGoblinSqlStoreRdsReceiver);
}
/**
* 商品相关
*
* @param listenerContainer StreamMessageListenerContainer
* @param t 消费者序号
* @return Subscription
*/
private Subscription receiveSqlGoods(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.SQL_GOODS.getGroup(), MQConst.GoblinQueue.SQL_GOODS.name() + t),
StreamOffset.create(MQConst.GoblinQueue.SQL_GOODS.getKey(), ReadOffset.lastConsumed()), consumerGoblinSqlGoodsRdsReceiver);
}
/**
* 店铺优惠券相关
*
* @param listenerContainer StreamMessageListenerContainer
* @param t 消费者序号
* @return Subscription
*/
private Subscription receiveSqlCoupon(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.SQL_COUPON.getGroup(), MQConst.GoblinQueue.SQL_COUPON.name() + t),
StreamOffset.create(MQConst.GoblinQueue.SQL_COUPON.getKey(), ReadOffset.lastConsumed()), consumerGoblinSqlCouponRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 店铺相关 */
@Bean
public Subscription subscriptionSqlStore1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlStore(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlStore2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlStore(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | 用户中心 */
@Bean
public Subscription subscriptionSqlGoods1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlGoods(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlGoods2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlGoods(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | 店铺优惠券相关 */
@Bean
public Subscription subscriptionSqlCoupon1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCoupon2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCoupon3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCoupon4(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 4);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCoupon5(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 5);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCoupon6(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 6);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCoupon7(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 7);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCoupon8(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 8);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCoupon9(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 9);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCoupon10(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCoupon(listenerContainer, 10);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinStoreMarketRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinStoneMarketRedisStreamConfig {
@Autowired
ConsumerGoblinStoreMarketRdsReceiver consumerGoblinStoreMarketRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinStoreMarket(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_STORE_MARKET.getGroup(), MQConst.GoblinQueue.GOBLIN_STORE_MARKET.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_STORE_MARKET.getKey(), ReadOffset.lastConsumed()), consumerGoblinStoreMarketRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinStoreMarket(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreMarket(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinStoreMarket2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreMarket(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinStoreMarket3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreMarket(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinStoreOrderRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinStoreOrderRedisStreamConfig {
@Autowired
ConsumerGoblinStoreOrderRdsReceiver consumerGoblinStoreOrderRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinStoreOrder(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getGroup(), MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getKey(), ReadOffset.lastConsumed()), consumerGoblinStoreOrderRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinStoreOrder(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreOrder(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinStoreOrder2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreOrder(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinStoreOrder3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreOrder(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinUserOrderRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinUserOrderRedisStreamConfig {
@Autowired
ConsumerGoblinUserOrderRdsReceiver consumerGoblinUserOrderRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinUserOrder(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.getGroup(), MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.getKey(), ReadOffset.lastConsumed()), consumerGoblinUserOrderRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinUserOrder(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinUserOrder(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinUserOrder2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinUserOrder(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinUserOrder3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinUserOrder(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinXlsRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinXlsRedisStreamConfig {
@Autowired
ConsumerGoblinXlsRdsReceiver consumerGoblinXlsRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinXls(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getGroup(), MQConst.GoblinQueue.GOBLIN_XLS_OPERA.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getKey(), ReadOffset.lastConsumed()), consumerGoblinXlsRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinXls1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinXls(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinXls2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinXls(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinXls3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinXls(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode
public class PhoneDto {
private String uid;
private String mobile;
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.alibaba.excel.EasyExcel;
import com.alibaba.fastjson.JSON;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import com.liquidnet.service.goblin.constant.GoblinStatusConst;
import com.liquidnet.service.goblin.dto.vo.GoblinOrderSkuVo;
import com.liquidnet.service.goblin.dto.vo.GoblinStoreOrderVo;
import com.mongodb.BasicDBObject;
import com.mongodb.client.result.UpdateResult;
import io.netty.util.internal.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.io.InputStream;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedList;
@Slf4j
public abstract class AbstractGoblinOrderCloseReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private MongoConverter mongoConverter;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER XLS [streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"), message.getValue().get("type"));
log.info("CONSUMER XLS_PATH RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception ignored) {
}
}
}
private boolean consumerSqlDaoHandler(String message, String type) {
boolean aBoolean = false;
try {
aBoolean = checkOrderTime(message, type);
} catch (Exception e) {
log.error("CONSUMER ORDER {} CLOSE FAIL ==> {}", type, e.getMessage(), e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", message);
map.put("type", type);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
public boolean checkOrderTime(String valueData, String type) {
LocalDateTime now = LocalDateTime.now();
if (type.equals("GOBLIN")) {
String[] orderIds = getMasterCode(valueData);
for (String orderId : orderIds) {
GoblinStoreOrderVo orderVo = getGoblinOrder(orderId);
if (orderVo.getStatus().equals(GoblinStatusConst.Status.ORDER_STATUS_0.getValue())) {//订单回滚
LinkedList<String> sqls = CollectionUtil.linkedListString();
LinkedList<Object[]> sqlDataOrder = CollectionUtil.linkedListObjectArr();
LinkedList<Object[]> sqlDataSku = CollectionUtil.linkedListObjectArr();
sqls.add(SqlMapping.get("goblin_order.close.order"));
sqls.add(SqlMapping.get("goblin_order.close.sku"));
for (String orderSkuId : orderVo.getOrderSkuVoIds()) {
GoblinOrderSkuVo skuVo = getGoblinOrderSkuVo(orderSkuId);
//订单详情
skuVo.setStatus(GoblinStatusConst.Status.ORDER_STATUS_5.getValue());
updateGoblinOrderSkuVo(skuVo.getOrderSkuId(), skuVo);
setGoblinOrderSku(skuVo.getOrderSkuId(), skuVo);
//库存&限购&&待支付订单
String pre = GoblinStatusConst.MarketPreStatus.getPre(skuVo.getSkuId());
incrSkuStock(pre, skuVo.getSkuId(), skuVo.getNum());
decrSkuCountByUid(orderVo.getUserId(), skuVo.getSkuId(), skuVo.getNum());
//mysql
sqlDataSku.add(new Object[]{
skuVo.getStatus(), now, orderVo.getOrderId(), now, now
});
}
//订单
orderVo.setStatus(GoblinStatusConst.Status.ORDER_STATUS_5.getValue());
updateGoblinStoreOrderVo(orderVo.getOrderId(), orderVo);
setGoblinOrder(orderVo.getOrderId(), orderVo);
//mysql
sqlDataOrder.add(new Object[]{
orderVo.getStatus(), now, now, "超时关闭", orderVo.getOrderId(), now, now
});
//执行sql
sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getKey(),
SqlMapping.gets(sqls, sqlDataOrder, sqlDataSku));
}
}
}
return true;
}
private UpdateResult updateGoblinStoreOrderVo(String orderId, GoblinStoreOrderVo data) {
BasicDBObject object = cloneBasicDBObject().append("$set", mongoConverter.convertToMongoType(data));
return mongoTemplate.getCollection(GoblinStoreOrderVo.class.getSimpleName()).updateOne(
Query.query(Criteria.where("orderId").is(orderId)).getQueryObject(),
object);
}
private UpdateResult updateGoblinOrderSkuVo(String orderSkuId, GoblinOrderSkuVo data) {
BasicDBObject object = cloneBasicDBObject().append("$set", mongoConverter.convertToMongoType(data));
return mongoTemplate.getCollection(GoblinOrderSkuVo.class.getSimpleName()).updateOne(
Query.query(Criteria.where("orderSkuId").is(orderSkuId)).getQueryObject(),
object);
}
public String[] getMasterCode(String masterCode) {
String redisKey = GoblinRedisConst.REDIS_GOBLIN_ORDER_MASTER.concat(masterCode);
Object obj = redisUtil.get(redisKey);
if (obj == null) {
return null;
} else {
return ((String) obj).split(",");
}
}
// 获取 订单相关vo
public GoblinStoreOrderVo getGoblinOrder(String orderId) {
String redisKey = GoblinRedisConst.REDIS_GOBLIN_ORDER.concat(orderId);
Object obj = redisUtil.get(redisKey);
if (obj == null) {
return null;
} else {
return (GoblinStoreOrderVo) obj;
}
}
// 获取 订单相关Skuvo
public GoblinOrderSkuVo getGoblinOrderSkuVo(String orderSkuId) {
String redisKey = GoblinRedisConst.REDIS_GOBLIN_ORDER_SKU.concat(orderSkuId);
Object obj = redisUtil.get(redisKey);
if (obj == null) {
return null;
} else {
return (GoblinOrderSkuVo) obj;
}
}
// 赋值 订单相关Skuvo
public void setGoblinOrderSku(String orderSkuId, GoblinOrderSkuVo vo) {
String redisKey = GoblinRedisConst.REDIS_GOBLIN_ORDER_SKU.concat(orderSkuId);
redisUtil.set(redisKey, vo);
}
public int incrSkuStock(String marketPre, String skuId, Integer stock) {
String rk = GoblinRedisConst.REAL_STOCK_SKU;
if (marketPre != null && !marketPre.equals("null")) {
rk = rk.concat(marketPre + ":");
}
rk = rk.concat(skuId);
return (int) redisUtil.incr(rk, stock);
}
// 减少 用户sku购买个数
public int decrSkuCountByUid(String uid, String skuId, int number) {
String redisKey = GoblinRedisConst.REDIS_GOBLIN_BUY_COUNT.concat(uid + ":skuId:" + skuId);
return (int) redisUtil.decr(redisKey, number);
}
// 赋值 订单相关vo
public void setGoblinOrder(String orderId, GoblinStoreOrderVo vo) {
String redisKey = GoblinRedisConst.REDIS_GOBLIN_ORDER.concat(orderId);
redisUtil.set(redisKey, vo);
}
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
private static final BasicDBObject basicDBObject = new BasicDBObject();
public static BasicDBObject cloneBasicDBObject() {
return (BasicDBObject) basicDBObject.clone();
}
}
\ No newline at end of file
package com.liquidnet.service.consumer.kylin.receiver;
import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.read.listener.PageReadListener;
import com.alibaba.fastjson.JSON;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.consumer.kylin.dto.PhoneDto;
import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import com.liquidnet.service.consumer.service.IBaseDao;
import java.io.InputStream;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.LinkedList;
@Slf4j
public abstract class AbstractXlsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
private IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
private static final String SQL_INSERT_GOODS_BUY_ROSTER_LOG = "INSERT INTO goblin_goods_buy_roster_log (sku_id,buy_roster,buy_roster_type,parsing_result,created_at)VALUES(?,?,?,?,?)";
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER XLS [streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"), Integer.parseInt(message.getValue().get("type")), message.getValue().get("skuId"));
log.info("CONSUMER XLS_PATH RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception ignored) {
}
}
}
private boolean consumerSqlDaoHandler(String xlsPath, Integer type, String skuId) {
LinkedList<Object[]> objs = CollectionUtil.linkedListObjectArr();
Boolean aBoolean = false;
try {
URL url = new URL(xlsPath);
InputStream is = url.openStream();
EasyExcel.read(is, PhoneDto.class, new PageReadListener<PhoneDto>(dataList -> {
for (PhoneDto data : dataList) {
if (data.getMobile() == null) {
continue;
}
String redisKey = GoblinRedisConst.REDIS_CAN_BUY.concat(skuId + ":").concat(data.getMobile());
if (type.equals(1)) {//添加
if (log.isDebugEnabled()) {
log.debug("添加 读取到一条数据{}", JSON.toJSONString(data));
}
redisUtil.set(redisKey, 0);
} else {
if (log.isDebugEnabled()) {
log.debug("删除 读取到一条数据{}", JSON.toJSONString(data));
}
redisUtil.del(redisKey);
}
}
})).sheet().doRead();
objs.add(new Object[]{skuId, xlsPath, type, 1, LocalDateTime.now()});
} catch (Exception e) {
objs.add(new Object[]{skuId, xlsPath, type, 2, LocalDateTime.now()});
aBoolean = false;
log.error("CONSUMER XLS FAIL ==> {}", e.getMessage(), e);
} finally {
try {
baseDao.batchSql(SQL_INSERT_GOODS_BUY_ROSTER_LOG, objs);
} catch (Exception e) {
}
// if (!aBoolean) {
// HashMap<String, String> map = CollectionUtil.mapStringString();
// map.put("message", xlsPath);
// map.put("skuId", skuId);
// map.put("type", type.toString());
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
// }
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
\ No newline at end of file
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinOrderAGRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinOrderCPRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinOrderCloseRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinSelfMarketRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_SELF_MARKET.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_SELF_MARKET.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinShopCartReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_SHOP_CART.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_SHOP_CART.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinSqlCouponRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.SQL_COUPON.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.SQL_COUPON.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinSqlGoodsRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.SQL_GOODS.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.SQL_GOODS.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinSqlStoreRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.SQL_STORE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.SQL_STORE.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinStoreMarketRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_STORE_MARKET.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_STORE_MARKET.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinStoreOrderRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinUserOrderRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinXlsRdsReceiver extends AbstractXlsRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getGroup();
}
}
......@@ -54,7 +54,7 @@ public class QueueUtils {
*/
public void sendMsgByRedisGoblinStock(String masterOrderCode, LocalDateTime createTime) {
String streamKey;
int key = createTime.getSecond() % 10;
int key = createTime.getMinute() % 10;
switch (key) {
case 1:
streamKey = MQConst.GoblinQueue.GOBLIN_UN_PAY_1.getKey();
......@@ -89,6 +89,7 @@ public class QueueUtils {
}
HashMap<String, String> map = ObjectUtil.cloneHashMapStringAndString();
map.put("message", masterOrderCode);
map.put("type", "GOBLIN");
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment