记得上下班打卡 | git大法好,push需谨慎

Commit 86fc3936 authored by anjiabin's avatar anjiabin

修改队列消费pending相关

parent 4ea9f164
......@@ -2,8 +2,6 @@ package com.liquidnet.common.cache.redis.config;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
......@@ -11,7 +9,6 @@ import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.net.UnknownHostException;
......@@ -54,6 +51,22 @@ public class RedisStreamConfig {
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
// 创建配置对象
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(10)
// 执行消息轮询的执行器
// .executor(this.threadPoolTaskExecutor)
// 消息消费异常的handler
.errorHandler(t -> {
// StreamListener中的异常会在这里抛出
System.out.println("errorHandler: " + t.getMessage());
})
// 超时时间,设置为0,表示不超时(超时后会抛出异常)
.pollTimeout(Duration.ZERO)
.build();
return StreamMessageListenerContainer.create(factory, options);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment