记得上下班打卡 | git大法好,push需谨慎

Commit f7049fa9 authored by jiangxiulong's avatar jiangxiulong

Merge remote-tracking branch 'origin/dev' into dev

parents 8e2b1aa9 d2daec1f
package com.liquidnet.service.dragon.config; package com.liquidnet.service.dragon.config;
import com.liquidnet.service.dragon.receiver.RedisReceiver; import com.liquidnet.service.dragon.receiver.RedisPayReceiver;
import com.liquidnet.service.dragon.receiver.RedisRefundReceiver;
import lombok.var; import lombok.var;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
...@@ -18,18 +19,62 @@ import java.time.Duration; ...@@ -18,18 +19,62 @@ import java.time.Duration;
public class RedisStreamConfig { public class RedisStreamConfig {
@Autowired @Autowired
private RedisReceiver redisReceiver; private RedisPayReceiver redisPayReceiver;
@Autowired
private RedisRefundReceiver redisRefundReceiver;
@Bean
public Subscription subscriptionPay0(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("dragon-pay-group", "dragon-pay-0"),
StreamOffset.create("dragon-pay", ReadOffset.lastConsumed()), redisPayReceiver);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionPay1(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("dragon-pay-group", "dragon-pay-1"),
StreamOffset.create("dragon-pay", ReadOffset.lastConsumed()), redisPayReceiver);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionRefund0(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("dragon-refund-group", "dragon-refund-0"),
StreamOffset.create("dragon-refund", ReadOffset.lastConsumed()), redisRefundReceiver);
listenerContainer.start();
return subscription;
}
@Bean @Bean
public Subscription subscription(RedisConnectionFactory factory){ public Subscription subscriptionRefund1(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions .StreamMessageListenerContainerOptions
.builder() .builder()
.pollTimeout(Duration.ofSeconds(1)) .pollTimeout(Duration.ofSeconds(1))
.build(); .build();
var listenerContainer = StreamMessageListenerContainer.create(factory,options); var listenerContainer = StreamMessageListenerContainer.create(factory, options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("group-1","consumer-1"), var subscription = listenerContainer.receiveAutoAck(Consumer.from("dragon-refund-group", "dragon-refund-1"),
StreamOffset.create("mystream", ReadOffset.lastConsumed()),redisReceiver); StreamOffset.create("dragon-refund", ReadOffset.lastConsumed()), redisRefundReceiver);
listenerContainer.start(); listenerContainer.start();
return subscription; return subscription;
} }
......
...@@ -7,11 +7,11 @@ import org.springframework.stereotype.Component; ...@@ -7,11 +7,11 @@ import org.springframework.stereotype.Component;
@Slf4j @Slf4j
@Component @Component
public class RedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { public class RedisPayReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis的消息"); log.info("接受到来自redis PAY 的消息");
System.out.println("message id "+message.getId()); System.out.println("message id "+message.getId());
System.out.println("stream "+message.getStream()); System.out.println("stream "+message.getStream());
System.out.println("body "+message.getValue()); System.out.println("body "+message.getValue());
......
package com.liquidnet.service.dragon.receiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisRefundReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis REFUND 的消息");
System.out.println("message id "+message.getId());
System.out.println("stream "+message.getStream());
System.out.println("body "+message.getValue());
}
}
...@@ -20,7 +20,7 @@ public class DragonOrderRefundsServiceImpl implements IDragonOrderRefundsService ...@@ -20,7 +20,7 @@ public class DragonOrderRefundsServiceImpl implements IDragonOrderRefundsService
try { try {
HashMap<String ,String> map = new HashMap<>(); HashMap<String ,String> map = new HashMap<>();
map.put("message","测试 redis 订阅信息1"); map.put("message","测试 redis 订阅信息1");
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey("mystream"); MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey("dragon-refund");
stringRedisTemplate.opsForStream().add(record); stringRedisTemplate.opsForStream().add(record);
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
......
...@@ -17,11 +17,9 @@ public class DragonOrdersServiceImpl implements IDragonOrdersService { ...@@ -17,11 +17,9 @@ public class DragonOrdersServiceImpl implements IDragonOrdersService {
@Override @Override
public void sendRedisQueue() { public void sendRedisQueue() {
try { try {
stringRedisTemplate.convertAndSend("msg2", "测试 redis 订阅信息2");
HashMap<String ,String> map = new HashMap<>(); HashMap<String ,String> map = new HashMap<>();
map.put("message","测试 redis 订阅信息1"); map.put("message","测试 redis 订阅信息1");
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey("dragon"); MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey("dragon-pay");
stringRedisTemplate.opsForStream().add(record); stringRedisTemplate.opsForStream().add(record);
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment