记得上下班打卡 | git大法好,push需谨慎

Commit a10da690 authored by 张国柄's avatar 张国柄

~consumer:order.replace.redisTemplate;

parent 8168d37c
...@@ -25,7 +25,6 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -25,7 +25,6 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
// StringRedisTemplate stringRedisTemplate; // StringRedisTemplate stringRedisTemplate;
@Autowired @Autowired
RedisDataSourceUtil redisDataSourceUtil; RedisDataSourceUtil redisDataSourceUtil;
StringRedisTemplate stringRedisTemplate = redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate();
@Autowired @Autowired
ConsumerOrderClose0RdsReceiver consumerOrderClose0RdsReceiver; ConsumerOrderClose0RdsReceiver consumerOrderClose0RdsReceiver;
@Autowired @Autowired
...@@ -51,7 +50,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -51,7 +50,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose0(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose0(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_0; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_0;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -67,7 +66,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -67,7 +66,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose1(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose1(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_1; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_1;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -83,7 +82,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -83,7 +82,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose2(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose2(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_2; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_2;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -99,7 +98,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -99,7 +98,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose3(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose3(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_3; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_3;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -115,7 +114,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -115,7 +114,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose4(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose4(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_4; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_4;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -131,7 +130,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -131,7 +130,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose5(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose5(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_5; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_5;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -147,7 +146,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -147,7 +146,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose6(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose6(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_6; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_6;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -163,7 +162,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -163,7 +162,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose7(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose7(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_7; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_7;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -179,7 +178,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -179,7 +178,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose8(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose8(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_8; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_8;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -195,7 +194,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig { ...@@ -195,7 +194,7 @@ public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionBizOrderClose9(RedisConnectionFactory factory) { public List<Subscription> subscriptionBizOrderClose9(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_9; MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_UN_PAY_9;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup()); this.initStream(redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate(), stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
......
...@@ -49,7 +49,6 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin ...@@ -49,7 +49,6 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin
// StringRedisTemplate stringRedisTemplate; // StringRedisTemplate stringRedisTemplate;
@Autowired @Autowired
RedisDataSourceUtil redisDataSourceUtil; RedisDataSourceUtil redisDataSourceUtil;
StringRedisTemplate stringRedisTemplate = redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate();
@Autowired @Autowired
private RedisUtil redisUtil; private RedisUtil redisUtil;
@Autowired @Autowired
...@@ -70,6 +69,7 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin ...@@ -70,6 +69,7 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin
boolean result = this.consumerMessageHandler(message.getValue()); boolean result = this.consumerMessageHandler(message.getValue());
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId()); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
StringRedisTemplate stringRedisTemplate = redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate();
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) { } catch (Exception e) {
...@@ -111,7 +111,7 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin ...@@ -111,7 +111,7 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin
} catch (Exception e) { } catch (Exception e) {
String redisStreamKey = this.getRedisStreamKey(); String redisStreamKey = this.getRedisStreamKey();
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", redisStreamKey, messageMap, e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", redisStreamKey, messageMap, e);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(messageMap).withStreamKey(redisStreamKey)); redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate().opsForStream().add(StreamRecords.mapBacked(messageMap).withStreamKey(redisStreamKey));
return false; return false;
} }
} }
...@@ -380,7 +380,7 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin ...@@ -380,7 +380,7 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin
public void sendMsgByRedis(String streamKey, String jsonMsg) { public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", jsonMsg); map.put("message", jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey)); redisDataSourceUtil.getRedisQueueUtil().getStringRedisTemplate().opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
} }
public Boolean backStoreCoupon(BackCouponParam params) { public Boolean backStoreCoupon(BackCouponParam params) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment