记得上下班打卡 | git大法好,push需谨慎

Commit fd709cc4 authored by 胡佳晨's avatar 胡佳晨

迁移 consumer-goblin

parent c074aba1
package com.liquidnet.service.consumer.base.config.candy; package com.liquidnet.service.consumer.base.config.candy;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig; import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponBackRdsReceiver; import com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponBackRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
...@@ -11,55 +11,36 @@ import org.springframework.data.redis.connection.stream.Consumer; ...@@ -11,55 +11,36 @@ import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription; import org.springframework.data.redis.stream.Subscription;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_BACK; import java.util.ArrayList;
import java.util.List;
@Configuration @Configuration
public class ConsumerCandyCouponBackRedisStreamConfig extends RedisStreamConfig { public class ConsumerCandyCouponBackRedisStreamConfig extends RedisStreamConfig {
@Autowired @Autowired
ConsumerCandyCouponBackRdsReceiver consumerCandyCouponBackRdsReceiver; ConsumerCandyCouponBackRdsReceiver consumerCandyCouponBackRdsReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveSqlCandyCouponBack(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_BACK.getGroup(), getConsumerName(COUPON_BACK.name() + t)),
StreamOffset.create(COUPON_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponBackRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionSqlCandyCouponBack0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCandyCouponBack1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean @Bean
public Subscription subscriptionSqlCandyCouponBack2(RedisConnectionFactory factory) { public List<Subscription> couponBack(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); List<Subscription> subscriptionList = new ArrayList<>();
var subscription = receiveSqlCandyCouponBack(listenerContainer, 2); MQConst.CandyQueue stream = MQConst.CandyQueue.COUPON_BACK;
listenerContainer.start(); this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
return subscription; for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponBackRdsReceiver
));
listenerContainer.start();
}
return subscriptionList;
} }
/* -------------------------------------------------------- | */
} }
package com.liquidnet.service.consumer.base.config.candy; package com.liquidnet.service.consumer.base.config.candy;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig; import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponOrderBackRdsReceiver; import com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponOrderBackRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
...@@ -11,55 +11,35 @@ import org.springframework.data.redis.connection.stream.Consumer; ...@@ -11,55 +11,35 @@ import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription; import org.springframework.data.redis.stream.Subscription;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_ORDER_BACK; import java.util.ArrayList;
import java.util.List;
@Configuration @Configuration
public class ConsumerCandyCouponOrderBackRedisStreamConfig extends RedisStreamConfig { public class ConsumerCandyCouponOrderBackRedisStreamConfig extends RedisStreamConfig {
@Autowired @Autowired
ConsumerCandyCouponOrderBackRdsReceiver consumerCandyCouponOrderBackRdsReceiver; ConsumerCandyCouponOrderBackRdsReceiver consumerCandyCouponOrderBackRdsReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveSqlCandyCouponBack(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_ORDER_BACK.getGroup(), getConsumerName(COUPON_ORDER_BACK.name() + t)),
StreamOffset.create(COUPON_ORDER_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponOrderBackRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionSqlCandyCouponOrderBack0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCandyCouponOrderBack1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean @Bean
public Subscription subscriptionSqlCandyCouponOrderBack2(RedisConnectionFactory factory) { public List<Subscription> couponOrderBack(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); List<Subscription> subscriptionList = new ArrayList<>();
var subscription = receiveSqlCandyCouponBack(listenerContainer, 2); MQConst.CandyQueue stream = MQConst.CandyQueue.COUPON_ORDER_BACK;
listenerContainer.start(); this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
return subscription; for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponOrderBackRdsReceiver
));
listenerContainer.start();
}
return subscriptionList;
} }
/* -------------------------------------------------------- | */
} }
package com.liquidnet.service.consumer.base.config.candy; package com.liquidnet.service.consumer.base.config.candy;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig; import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponReceiveRdsReceiver; import com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponReceiveRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
...@@ -11,55 +11,35 @@ import org.springframework.data.redis.connection.stream.Consumer; ...@@ -11,55 +11,35 @@ import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription; import org.springframework.data.redis.stream.Subscription;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_RECEIVE; import java.util.ArrayList;
import java.util.List;
@Configuration @Configuration
public class ConsumerCandyCouponReceiveRedisStreamConfig extends RedisStreamConfig { public class ConsumerCandyCouponReceiveRedisStreamConfig extends RedisStreamConfig {
@Autowired @Autowired
ConsumerCandyCouponReceiveRdsReceiver consumerCandyCouponReceiveRdsReceiver; ConsumerCandyCouponReceiveRdsReceiver consumerCandyCouponReceiveRdsReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveSqlCandyCouponReceive(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_RECEIVE.getGroup(), getConsumerName(COUPON_RECEIVE.name() + t)),
StreamOffset.create(COUPON_RECEIVE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponReceiveRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionSqlCandyCouponReceive0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCandyCouponReceive1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean @Bean
public Subscription subscriptionSqlCandyCouponReceive2(RedisConnectionFactory factory) { public List<Subscription> couponReceive(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); List<Subscription> subscriptionList = new ArrayList<>();
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 2); MQConst.CandyQueue stream = MQConst.CandyQueue.COUPON_RECEIVE;
listenerContainer.start(); this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
return subscription; for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponReceiveRdsReceiver
));
listenerContainer.start();
}
return subscriptionList;
} }
/* -------------------------------------------------------- | */
} }
package com.liquidnet.service.consumer.base.config.candy; package com.liquidnet.service.consumer.base.config.candy;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig; import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponUseRdsReceiver; import com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponUseRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
...@@ -11,56 +11,35 @@ import org.springframework.data.redis.connection.stream.Consumer; ...@@ -11,56 +11,35 @@ import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription; import org.springframework.data.redis.stream.Subscription;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_USE; import java.util.ArrayList;
import java.util.List;
@Configuration @Configuration
public class ConsumerCandyCouponUseRedisStreamConfig extends RedisStreamConfig { public class ConsumerCandyCouponUseRedisStreamConfig extends RedisStreamConfig {
@Autowired @Autowired
ConsumerCandyCouponUseRdsReceiver consumerCandyCouponUseRdsReceiver; ConsumerCandyCouponUseRdsReceiver consumerCandyCouponUseRdsReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveSqlCandyCouponUse(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_USE.getGroup(), getConsumerName(COUPON_USE.name() + t)),
StreamOffset.create(COUPON_USE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponUseRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionSqlCandyCouponUse0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponUse(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCandyCouponUse1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponUse(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean @Bean
public Subscription subscriptionSqlCandyCouponUse2(RedisConnectionFactory factory) { public List<Subscription> couponUse(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); List<Subscription> subscriptionList = new ArrayList<>();
var subscription = receiveSqlCandyCouponUse(listenerContainer, 2); MQConst.CandyQueue stream = MQConst.CandyQueue.COUPON_USE;
listenerContainer.start(); this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
return subscription; for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponUseRdsReceiver
));
listenerContainer.start();
}
return subscriptionList;
} }
/* -------------------------------------------------------- | */
} }
package com.liquidnet.service.consumer.base.config.goblin;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.goblin.ConsumerGoblinOrderAGRdsReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class ConsumerGoblinOrderAGRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinOrderAGRdsReceiver consumerGoblinOrderAGRdsReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Bean
public List<Subscription> orderAg(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderAGRdsReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.base.config.goblin;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.goblin.ConsumerGoblinOrderCPRdsReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class ConsumerGoblinOrderCPRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinOrderCPRdsReceiver consumerGoblinOrderCPRdsReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Bean
public List<Subscription> orderCP(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCPRdsReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.base.receiver.goblin;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinOrderAGRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup();
}
}
package com.liquidnet.service.consumer.base.receiver.goblin;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinOrderCPRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.config; //package com.liquidnet.service.consumer.goblin.config;
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig; //import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderAGRdsReceiver; //import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderAGRdsReceiver;
import lombok.var; //import lombok.var;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; //import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; //import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory; //import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer; //import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord; //import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset; //import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset; //import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; //import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription; //import org.springframework.data.redis.stream.Subscription;
//
@Configuration //@Configuration
public class ConsumerGoblinOrderAGRedisStreamConfig extends RedisStreamConfig { //public class ConsumerGoblinOrderAGRedisStreamConfig extends RedisStreamConfig {
@Autowired // @Autowired
ConsumerGoblinOrderAGRdsReceiver consumerGoblinOrderAGRdsReceiver; // ConsumerGoblinOrderAGRdsReceiver consumerGoblinOrderAGRdsReceiver;
//
/** // /**
* 缺票登记 // * 缺票登记
* // *
* @param listenerContainer // * @param listenerContainer
* @param t // * @param t
* @return // * @return
*/ // */
private Subscription receiveGoblinOrderAG(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { // private Subscription receiveGoblinOrderAG(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.name() + t)), // return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderAGRdsReceiver); // StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderAGRdsReceiver);
} // }
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */ // /* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
/* -------------------------------------------------------- | 缺票登记 */ // /* -------------------------------------------------------- | 缺票登记 */
//
@Bean // @Bean
public Subscription subscriptionGoblinOrderAG0(RedisConnectionFactory factory) { // public Subscription subscriptionGoblinOrderAG0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); // var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 0); // var subscription = receiveGoblinOrderAG(listenerContainer, 0);
listenerContainer.start(); // listenerContainer.start();
return subscription; // return subscription;
} // }
//
@Bean // @Bean
public Subscription subscriptionGoblinOrderAG1(RedisConnectionFactory factory) { // public Subscription subscriptionGoblinOrderAG1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); // var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 1); // var subscription = receiveGoblinOrderAG(listenerContainer, 1);
listenerContainer.start(); // listenerContainer.start();
return subscription; // return subscription;
} // }
//
@Bean // @Bean
public Subscription subscriptionGoblinOrderAG2(RedisConnectionFactory factory) { // public Subscription subscriptionGoblinOrderAG2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); // var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 2); // var subscription = receiveGoblinOrderAG(listenerContainer, 2);
listenerContainer.start(); // listenerContainer.start();
return subscription; // return subscription;
} // }
//
/* -------------------------------------------------------- | */ // /* -------------------------------------------------------- | */
} //}
package com.liquidnet.service.consumer.goblin.config; //package com.liquidnet.service.consumer.goblin.config;
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig; //import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderCPRdsReceiver; //import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderCPRdsReceiver;
import lombok.var; //import lombok.var;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; //import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; //import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory; //import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer; //import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord; //import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset; //import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset; //import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; //import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription; //import org.springframework.data.redis.stream.Subscription;
//
@Configuration //@Configuration
public class ConsumerGoblinOrderCPRedisStreamConfig extends RedisStreamConfig { //public class ConsumerGoblinOrderCPRedisStreamConfig extends RedisStreamConfig {
@Autowired // @Autowired
ConsumerGoblinOrderCPRdsReceiver consumerGoblinOrderCPRdsReceiver; // ConsumerGoblinOrderCPRdsReceiver consumerGoblinOrderCPRdsReceiver;
//
/** // /**
* 商城订单 // * 商城订单
*/ // */
private Subscription receiveGoblinOrderCP(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { // private Subscription receiveGoblinOrderCP(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.name() + t)), // return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCPRdsReceiver); // StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCPRdsReceiver);
} // }
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */ // /* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
/*-------------------------------------- 商城订单 --------------------------------------*/ // /*-------------------------------------- 商城订单 --------------------------------------*/
//
@Bean // @Bean
public Subscription subscriptionGoblinOrderCP0(RedisConnectionFactory factory) { // public Subscription subscriptionGoblinOrderCP0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); // var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderCP(listenerContainer, 0); // var subscription = receiveGoblinOrderCP(listenerContainer, 0);
listenerContainer.start(); // listenerContainer.start();
return subscription; // return subscription;
} // }
//
@Bean // @Bean
public Subscription subscriptionGoblinOrderCP1(RedisConnectionFactory factory) { // public Subscription subscriptionGoblinOrderCP1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); // var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderCP(listenerContainer, 1); // var subscription = receiveGoblinOrderCP(listenerContainer, 1);
listenerContainer.start(); // listenerContainer.start();
return subscription; // return subscription;
} // }
//
@Bean // @Bean
public Subscription subscriptionGoblinOrderCP2(RedisConnectionFactory factory) { // public Subscription subscriptionGoblinOrderCP2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory); // var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderCP(listenerContainer, 2); // var subscription = receiveGoblinOrderCP(listenerContainer, 2);
listenerContainer.start(); // listenerContainer.start();
return subscription; // return subscription;
} // }
//
/* -------------------------------------------------------- | */ // /* -------------------------------------------------------- | */
} //}
package com.liquidnet.service.consumer.goblin.receiver; //package com.liquidnet.service.consumer.goblin.receiver;
//
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.service.IBaseDao; //import com.liquidnet.service.consumer.goblin.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; //import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate; //import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; //import org.springframework.data.redis.stream.StreamListener;
//
/** ///**
* 公共的业务队列消息监听器,具体业务消费逻辑通过`consumerMessageHandler`实现 // * 公共的业务队列消息监听器,具体业务消费逻辑通过`consumerMessageHandler`实现
* // *
* @author zhanggb // * @author zhanggb
* Created by IntelliJ IDEA at 2022/3/31 // * Created by IntelliJ IDEA at 2022/3/31
*/ // */
@Slf4j //@Slf4j
public abstract class AbstractBizRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { //public abstract class AbstractBizRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired // @Autowired
public IBaseDao baseDao; // public IBaseDao baseDao;
@Autowired // @Autowired
public StringRedisTemplate stringRedisTemplate; // public StringRedisTemplate stringRedisTemplate;
//
@Override // @Override
public void onMessage(MapRecord<String, String, String> message) { // public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey(); // String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue()); // log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get(MQConst.QUEUE_MESSAGE_KEY)); // boolean result = this.consumerMessageHandler(message.getValue().get(MQConst.QUEUE_MESSAGE_KEY));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId()); // log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
//
try { // try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); // stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
try { // try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId()); // stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
} // }
//
protected abstract boolean consumerMessageHandler(String msg); // protected abstract boolean consumerMessageHandler(String msg);
//
protected abstract String getRedisStreamKey(); // protected abstract String getRedisStreamKey();
//
protected abstract String getRedisStreamGroup(); // protected abstract String getRedisStreamGroup();
} //}
\ No newline at end of file \ No newline at end of file
package com.liquidnet.service.consumer.goblin.receiver; //package com.liquidnet.service.consumer.goblin.receiver;
//
import com.liquidnet.commons.lang.util.CollectionUtil; //import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; //import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; //import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.goblin.service.IBaseDao; //import com.liquidnet.service.consumer.goblin.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; //import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords; //import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate; //import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; //import org.springframework.data.redis.stream.StreamListener;
//
import java.util.HashMap; //import java.util.HashMap;
//
/** ///**
* 公共的SQL队列消息监听器,具体SQL消费逻辑统一使用`consumerMessageHandler` // * 公共的SQL队列消息监听器,具体SQL消费逻辑统一使用`consumerMessageHandler`
* // *
* @author zhanggb // * @author zhanggb
* Created by IntelliJ IDEA at 2022/3/31 // * Created by IntelliJ IDEA at 2022/3/31
*/ // */
@Slf4j //@Slf4j
public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { //public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired // @Autowired
private IBaseDao baseDao; // private IBaseDao baseDao;
@Autowired // @Autowired
StringRedisTemplate stringRedisTemplate; // StringRedisTemplate stringRedisTemplate;
//
@Override // @Override
public void onMessage(MapRecord<String, String, String> message) { // public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey(); // String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue()); // log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message")); // boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId()); // log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
//
try { // try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); // stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
try { // try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId()); // stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
} // }
//
private boolean consumerMessageHandler(String msg) { // private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; // boolean aBoolean = false;
try { // try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); // SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) { // if (sqlMessage == null) {
aBoolean = true; // aBoolean = true;
} else { // } else {
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs()); // aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} // }
} catch (Exception e) { // } catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e); // log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { // } finally {
if (!aBoolean) { // if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); // HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg); // map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey())); // stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
} // }
} // }
return aBoolean; // return aBoolean;
} // }
//
protected abstract String getRedisStreamKey(); // protected abstract String getRedisStreamKey();
//
protected abstract String getRedisStreamGroup(); // protected abstract String getRedisStreamGroup();
} //}
\ No newline at end of file \ No newline at end of file
package com.liquidnet.service.consumer.goblin.receiver; //package com.liquidnet.service.consumer.goblin.receiver;
//
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
@Component //@Component
public class ConsumerGoblinOrderAGRdsReceiver extends AbstractSqlRedisReceiver { //public class ConsumerGoblinOrderAGRdsReceiver extends AbstractSqlRedisReceiver {
@Override // @Override
protected String getRedisStreamKey() { // protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey(); // return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey();
} // }
//
@Override // @Override
protected String getRedisStreamGroup() { // protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup(); // return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup();
} // }
} //}
package com.liquidnet.service.consumer.goblin.receiver; //package com.liquidnet.service.consumer.goblin.receiver;
//
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
@Component //@Component
public class ConsumerGoblinOrderCPRdsReceiver extends AbstractSqlRedisReceiver { //public class ConsumerGoblinOrderCPRdsReceiver extends AbstractSqlRedisReceiver {
@Override // @Override
protected String getRedisStreamKey() { // protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(); // return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey();
} // }
//
@Override // @Override
protected String getRedisStreamGroup() { // protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup(); // return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup();
} // }
} //}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment