记得上下班打卡 | git大法好,push需谨慎

Commit d3f6a767 authored by 张国柄's avatar 张国柄

~opt;

parent b0bfcade
...@@ -19,11 +19,7 @@ import java.time.Duration; ...@@ -19,11 +19,7 @@ import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
@Slf4j @Slf4j
@Component
public class RedisStreamConfig { public class RedisStreamConfig {
@Autowired
StringRedisTemplate stringRedisTemplate;
private String hostname; private String hostname;
{ {
try { try {
...@@ -33,7 +29,7 @@ public class RedisStreamConfig { ...@@ -33,7 +29,7 @@ public class RedisStreamConfig {
} }
} }
public void initStream(String key, String group) { public void initStream(StringRedisTemplate stringRedisTemplate, String key, String group) {
try { try {
if (stringRedisTemplate.hasKey(key)) { if (stringRedisTemplate.hasKey(key)) {
log.info("redis stream exist[{},{}]", key, group); log.info("redis stream exist[{},{}]", key, group);
......
...@@ -13,6 +13,7 @@ import org.springframework.data.redis.connection.stream.Consumer; ...@@ -13,6 +13,7 @@ import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription; import org.springframework.data.redis.stream.Subscription;
...@@ -24,6 +25,8 @@ import static com.liquidnet.service.base.constant.MQConst.AdamQueue; ...@@ -24,6 +25,8 @@ import static com.liquidnet.service.base.constant.MQConst.AdamQueue;
@Slf4j @Slf4j
@Configuration @Configuration
public class ConsumerAdamSqlUcenterRedisStreamConfig extends RedisStreamConfig { public class ConsumerAdamSqlUcenterRedisStreamConfig extends RedisStreamConfig {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired @Autowired
ConsumerAdamSqlURegisterRdsReceiver consumerAdamURegisterRdsReceiver; ConsumerAdamSqlURegisterRdsReceiver consumerAdamURegisterRdsReceiver;
@Autowired @Autowired
...@@ -35,7 +38,7 @@ public class ConsumerAdamSqlUcenterRedisStreamConfig extends RedisStreamConfig { ...@@ -35,7 +38,7 @@ public class ConsumerAdamSqlUcenterRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionSqlURegister(RedisConnectionFactory factory) { public List<Subscription> subscriptionSqlURegister(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
AdamQueue stream = AdamQueue.SQL_UREGISTER; AdamQueue stream = AdamQueue.SQL_UREGISTER;
this.initStream(stream.getKey(), stream.getGroup()); this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -51,7 +54,7 @@ public class ConsumerAdamSqlUcenterRedisStreamConfig extends RedisStreamConfig { ...@@ -51,7 +54,7 @@ public class ConsumerAdamSqlUcenterRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionSqlUCenter(RedisConnectionFactory factory) { public List<Subscription> subscriptionSqlUCenter(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
AdamQueue stream = AdamQueue.SQL_UCENTER; AdamQueue stream = AdamQueue.SQL_UCENTER;
this.initStream(stream.getKey(), stream.getGroup()); this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
...@@ -67,7 +70,7 @@ public class ConsumerAdamSqlUcenterRedisStreamConfig extends RedisStreamConfig { ...@@ -67,7 +70,7 @@ public class ConsumerAdamSqlUcenterRedisStreamConfig extends RedisStreamConfig {
public List<Subscription> subscriptionSqlUMember(RedisConnectionFactory factory) { public List<Subscription> subscriptionSqlUMember(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
AdamQueue stream = AdamQueue.SQL_UMEMBER; AdamQueue stream = AdamQueue.SQL_UMEMBER;
this.initStream(stream.getKey(), stream.getGroup()); this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck( subscriptionList.add(listenerContainer.receiveAutoAck(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment