记得上下班打卡 | git大法好,push需谨慎

Commit b9375d4c authored by 胡佳晨's avatar 胡佳晨

dragon 消费

parent 13a02d47
package com.liquidnet.service.dragon.config;
import com.liquidnet.service.dragon.receiver.RedisReceiver;
import com.liquidnet.service.dragon.receiver.RedisPayReceiver;
import com.liquidnet.service.dragon.receiver.RedisRefundReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
......@@ -18,18 +19,62 @@ import java.time.Duration;
public class RedisStreamConfig {
@Autowired
private RedisReceiver redisReceiver;
private RedisPayReceiver redisPayReceiver;
@Autowired
private RedisRefundReceiver redisRefundReceiver;
@Bean
public Subscription subscriptionPay0(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("dragon-pay-group", "dragon-pay-0"),
StreamOffset.create("dragon-pay", ReadOffset.lastConsumed()), redisPayReceiver);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionPay1(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("dragon-pay-group", "dragon-pay-1"),
StreamOffset.create("dragon-pay", ReadOffset.lastConsumed()), redisPayReceiver);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionRefund0(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("dragon-refund-group", "dragon-refund-0"),
StreamOffset.create("dragon-refund", ReadOffset.lastConsumed()), redisRefundReceiver);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscription(RedisConnectionFactory factory){
public Subscription subscriptionRefund1(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
var listenerContainer = StreamMessageListenerContainer.create(factory,options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("group-1","consumer-1"),
StreamOffset.create("mystream", ReadOffset.lastConsumed()),redisReceiver);
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
var subscription = listenerContainer.receiveAutoAck(Consumer.from("dragon-refund-group", "dragon-refund-1"),
StreamOffset.create("dragon-refund", ReadOffset.lastConsumed()), redisRefundReceiver);
listenerContainer.start();
return subscription;
}
......
......@@ -7,11 +7,11 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
public class RedisPayReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis的消息");
log.info("接受到来自redis PAY 的消息");
System.out.println("message id "+message.getId());
System.out.println("stream "+message.getStream());
System.out.println("body "+message.getValue());
......
package com.liquidnet.service.dragon.receiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisRefundReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis REFUND 的消息");
System.out.println("message id "+message.getId());
System.out.println("stream "+message.getStream());
System.out.println("body "+message.getValue());
}
}
......@@ -20,7 +20,7 @@ public class DragonOrderRefundsServiceImpl implements IDragonOrderRefundsService
try {
HashMap<String ,String> map = new HashMap<>();
map.put("message","测试 redis 订阅信息1");
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey("mystream");
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey("dragon-refund");
stringRedisTemplate.opsForStream().add(record);
}catch (Exception e){
e.printStackTrace();
......
......@@ -17,11 +17,9 @@ public class DragonOrdersServiceImpl implements IDragonOrdersService {
@Override
public void sendRedisQueue() {
try {
stringRedisTemplate.convertAndSend("msg2", "测试 redis 订阅信息2");
HashMap<String ,String> map = new HashMap<>();
map.put("message","测试 redis 订阅信息1");
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey("dragon");
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey("dragon-pay");
stringRedisTemplate.opsForStream().add(record);
}catch (Exception e){
e.printStackTrace();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment