记得上下班打卡 | git大法好,push需谨慎

Commit 3ba631b6 authored by 张国柄's avatar 张国柄

~config.log;

~consumer.queue;
parent 0800694d
package com.liquidnet.common.cache.redis.config;
import lombok.var;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
public class RedisStreamConfig {
private static String hostname;
static {
try {
hostname = InetAddress.getLocalHost().getHostName() + "_";
} catch (UnknownHostException ignored) {
hostname = "";
}
}
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
public static String getHostname() {
return hostname;
}
}
...@@ -115,6 +115,6 @@ logging: ...@@ -115,6 +115,6 @@ logging:
com: com:
liquidnet: info liquidnet: info
pattern: pattern:
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
...@@ -18,8 +18,8 @@ logging: ...@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB max-size: 200MB
pattern: pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level: level:
root: error root: error
......
...@@ -18,8 +18,8 @@ logging: ...@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB max-size: 200MB
pattern: pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level: level:
root: error root: error
......
...@@ -18,8 +18,8 @@ logging: ...@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB max-size: 200MB
pattern: pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level: level:
root: error root: error
......
...@@ -18,8 +18,8 @@ logging: ...@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB max-size: 200MB
pattern: pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level: level:
root: error root: error
......
...@@ -18,8 +18,8 @@ logging: ...@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB max-size: 200MB
pattern: pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level: level:
root: error root: error
......
...@@ -18,8 +18,8 @@ logging: ...@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB max-size: 200MB
pattern: pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level: level:
root: error root: error
......
...@@ -18,8 +18,8 @@ logging: ...@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB max-size: 200MB
pattern: pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level: level:
root: error root: error
......
...@@ -10,8 +10,8 @@ logging: ...@@ -10,8 +10,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB max-size: 200MB
pattern: pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level: level:
root: error root: error
......
...@@ -22,8 +22,8 @@ logging: ...@@ -22,8 +22,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB max-size: 200MB
pattern: pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n' console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level: level:
root: info root: info
......
...@@ -15,7 +15,7 @@ import org.springframework.data.redis.stream.Subscription; ...@@ -15,7 +15,7 @@ import org.springframework.data.redis.stream.Subscription;
import java.time.Duration; import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.*; import static com.liquidnet.service.base.constant.MQConst.AdamQueue;
@Configuration @Configuration
public class ConsumerAdamSmsSenderRedisStreamConfig { public class ConsumerAdamSmsSenderRedisStreamConfig {
......
...@@ -17,7 +17,7 @@ import org.springframework.data.redis.stream.Subscription; ...@@ -17,7 +17,7 @@ import org.springframework.data.redis.stream.Subscription;
import java.time.Duration; import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.*; import static com.liquidnet.service.base.constant.MQConst.AdamQueue;
@Configuration @Configuration
public class ConsumerAdamSqlUcenterRedisStreamConfig { public class ConsumerAdamSqlUcenterRedisStreamConfig {
......
...@@ -23,27 +23,25 @@ public abstract class AbstractChimeRedisReceiver implements StreamListener<Strin ...@@ -23,27 +23,25 @@ public abstract class AbstractChimeRedisReceiver implements StreamListener<Strin
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message")); boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId()); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception ignored) { } catch (Exception e) {
} log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} }
} }
private boolean consumerMessageHandler(String msg) { private boolean consumerMessageHandler(String msg) {
Boolean aBoolean = false; boolean aBoolean = false;
try { try {
ChimeUserOperLogVo textMessage = JsonUtils.fromJson(msg, ChimeUserOperLogVo.class); ChimeUserOperLogVo textMessage = JsonUtils.fromJson(msg, ChimeUserOperLogVo.class);
if (textMessage == null) { if (textMessage == null) {
...@@ -54,10 +52,9 @@ public abstract class AbstractChimeRedisReceiver implements StreamListener<Strin ...@@ -54,10 +52,9 @@ public abstract class AbstractChimeRedisReceiver implements StreamListener<Strin
//创建操作日志 //创建操作日志
chimeDataUtils.createUserOperLog(textMessage); chimeDataUtils.createUserOperLog(textMessage);
aBoolean = true; aBoolean = true;
log.info("consumerMessageHandler.msg===> ",msg);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -25,26 +25,24 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String, ...@@ -25,26 +25,24 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSmsSendHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SMS RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SMS RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER SMS EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception ignored) { } catch (Exception e) {
} log.error("#CONSUMER SMS EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} }
} }
private boolean consumerSmsSendHandler(String msg) { private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; boolean aBoolean = false;
try { try {
SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class); SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class);
...@@ -52,7 +50,7 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String, ...@@ -52,7 +50,7 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
ObjectNode templateParam = smsMessage.getTemplateParam(); ObjectNode templateParam = smsMessage.getTemplateParam();
aBoolean = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), null == templateParam ? "" : templateParam.toString()); aBoolean = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), null == templateParam ? "" : templateParam.toString());
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -3,7 +3,6 @@ package com.liquidnet.service.consumer.adam.receiver; ...@@ -3,7 +3,6 @@ package com.liquidnet.service.consumer.adam.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.adam.service.IBaseDao; import com.liquidnet.service.consumer.adam.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -23,36 +22,30 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String, ...@@ -23,36 +22,30 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]",redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception ignored) { } catch (Exception e) {
} log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerMessageHandler(String msg) {
Boolean aBoolean = false; boolean aBoolean = false;
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) { aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
aBoolean = true;
} else {
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
}
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -26,27 +26,24 @@ public abstract class AbstractCouponOrderBackRedisReceiver implements StreamList ...@@ -26,27 +26,24 @@ public abstract class AbstractCouponOrderBackRedisReceiver implements StreamList
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSqlOperationCouponOrderBackHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSqlOperationCouponOrderBackHandler(String msg) { private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; boolean aBoolean = false;
try { try {
OrderCloseMapping.orderCloseMessage mqMessage = JsonUtils.fromJson(msg, OrderCloseMapping.orderCloseMessage.class); OrderCloseMapping.orderCloseMessage mqMessage = JsonUtils.fromJson(msg, OrderCloseMapping.orderCloseMessage.class);
...@@ -64,7 +61,7 @@ public abstract class AbstractCouponOrderBackRedisReceiver implements StreamList ...@@ -64,7 +61,7 @@ public abstract class AbstractCouponOrderBackRedisReceiver implements StreamList
aBoolean = true; aBoolean = true;
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
package com.liquidnet.service.consumer.candy.receiver; package com.liquidnet.service.consumer.candy.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
...@@ -20,42 +19,33 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String, ...@@ -20,42 +19,33 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
private IBaseDao baseDao; private IBaseDao baseDao;
@Autowired @Autowired
StringRedisTemplate stringRedisTemplate; StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerMessageHandler(String msg) {
Boolean aBoolean = false; boolean aBoolean = false;
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) { aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
aBoolean = true;
} else {
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
}
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -17,7 +17,6 @@ import java.time.Duration; ...@@ -17,7 +17,6 @@ import java.time.Duration;
@Configuration @Configuration
public class RedisStreamConfig { public class RedisStreamConfig {
@Autowired @Autowired
private RedisPayReceiver redisPayReceiver; private RedisPayReceiver redisPayReceiver;
@Autowired @Autowired
......
...@@ -17,7 +17,6 @@ import java.time.Duration; ...@@ -17,7 +17,6 @@ import java.time.Duration;
@Configuration @Configuration
public class RedisStreamConfig2 { public class RedisStreamConfig2 {
@Autowired @Autowired
private RedisPayReceiver redisPayReceiver; private RedisPayReceiver redisPayReceiver;
@Autowired @Autowired
......
package com.liquidnet.service.consumer.dragon.receiver; package com.liquidnet.service.consumer.dragon.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao; import com.liquidnet.service.consumer.dragon.service.IBaseDao;
...@@ -30,72 +31,106 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma ...@@ -30,72 +31,106 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis PAY key:{} 的消息",getRedisStreamKey()); String redisStreamKey = this.getRedisStreamKey();
log.info("message id " + message.getId()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
log.info("stream " + message.getStream()); boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("body " + message.getValue()); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
// if(result){
// 消费成功确认,消息删除和消息确认是一个事务
log.info("consumer success delete message messageId:{} ",message.getId());
try {
// stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
// stringRedisTemplate.exec();
} catch (Exception e) {
e.printStackTrace();
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
}finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("error: {}",e);
}
}
// }
}
private boolean consumerSqlDaoHandler(String msg) {
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls())); } catch (Exception e) {
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs())); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", this.getRedisStreamKey(), result, message.getValue(), e);
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs()); }
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls); try {
if (rstBatchSqls) { stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
return true;
}else{
sendMySqlRedis(msg);
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", this.getRedisStreamKey(), result, message.getValue(), e);
log.error("CONSUMER SQL Exception error:{}", e);
} }
return false;
} }
/** private boolean consumerMessageHandler(String msg) {
* 给 REDIS 队列发送消息 数据库相关 boolean aBoolean = false;
*
* @param msg 接收到的内容
* @return
*/
private boolean sendMySqlRedis(String msg) {
try { try {
HashMap<String, String> map = new HashMap<>(); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
map.put("message", msg); aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(getRedisStreamKey());
stringRedisTemplate.opsForStream().add(record);
return true;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
return false; } finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
} }
return aBoolean;
} }
// @Override
// public void onMessage(MapRecord<String, String, String> message) {
// log.info("接受到来自redis PAY key:{} 的消息",getRedisStreamKey());
// log.info("message id " + message.getId());
// log.info("stream " + message.getStream());
// log.info("body " + message.getValue());
// boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
//
//// if(result){
// // 消费成功确认,消息删除和消息确认是一个事务
// log.info("consumer success delete message messageId:{} ",message.getId());
// try {
//// stringRedisTemplate.multi();
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
//// stringRedisTemplate.exec();
// } catch (Exception e) {
// e.printStackTrace();
// log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
// }finally {
// try {
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
// } catch (Exception e) {
// log.error("error: {}",e);
// }
// }
//// }
// }
//
// private boolean consumerMessageHandler(String msg) {
// try {
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
// log.debug("CONSUMER MSG ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
// log.debug("CONSUMER MSG ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
// Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// log.debug("CONSUMER MSG result of execution:{}", rstBatchSqls);
// if (rstBatchSqls) {
// return true;
// } else {
// sendMySqlRedis(msg);
// }
// } catch (Exception e) {
// log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
// }
// return false;
// }
//
// /**
// * 给 REDIS 队列发送消息 数据库相关
// *
// * @param msg 接收到的内容
// * @return
// */
// private boolean sendMySqlRedis(String msg) {
// try {
// HashMap<String, String> map = new HashMap<>();
// map.put("message", msg);
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(getRedisStreamKey()));
// return true;
// } catch (Exception e) {
// e.printStackTrace();
// return false;
// }
// }
protected abstract String getRedisStreamKey(); protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup(); protected abstract String getRedisStreamGroup();
......
package com.liquidnet.service.consumer.dragon.receiver; package com.liquidnet.service.consumer.dragon.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao; import com.liquidnet.service.consumer.dragon.service.IBaseDao;
...@@ -26,63 +27,99 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str ...@@ -26,63 +27,99 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis REFUND 的消息"); String redisStreamKey = DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode();
log.info("message id " + message.getId()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
log.info("stream " + message.getStream()); boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("body " + message.getValue()); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
// if(result){
log.info("consumer success delete message messageId:{} ",message.getId());
try {
// stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
// stringRedisTemplate.exec();
} catch (Exception e) {
log.error("delete redis queue message Exception error: {} ",e);
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
}finally {
stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
}
// }
}
private boolean consumerSqlDaoHandler(String msg) {
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
return true;
}else{
sendMySqlRedis(msg);
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
log.error("CONSUMER SQL Exception error:{}", e); }
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} }
return false;
} }
/** private boolean consumerMessageHandler(String msg) {
* 给 REDIS 队列发送消息 数据库相关 boolean aBoolean = false;
*
* @param msg 接收到的内容
* @return
*/
private boolean sendMySqlRedis(String msg) {
try { try {
HashMap<String, String> map = new HashMap<>(); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
map.put("message", msg); aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode());
stringRedisTemplate.opsForStream().add(record);
return true;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), msg, e);
return false; } finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode()));
}
} }
return aBoolean;
} }
// @Override
// public void onMessage(MapRecord<String, String, String> message) {
// log.info("接受到来自redis REFUND 的消息");
// log.info("message id " + message.getId());
// log.info("stream " + message.getStream());
// log.info("body " + message.getValue());
// boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
//// if(result){
// log.info("consumer success delete message messageId:{} ",message.getId());
// try {
//// stringRedisTemplate.multi();
// stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
// stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
//// stringRedisTemplate.exec();
// } catch (Exception e) {
// log.error("delete redis queue message Exception error: {} ",e);
// log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
// }finally {
// stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
// stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
// }
//// }
// }
//
// private boolean consumerSqlDaoHandler(String msg) {
// try {
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
// log.debug("CONSUMER MSG ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
// log.debug("CONSUMER MSG ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
// Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// log.debug("CONSUMER MSG result of execution:{}", rstBatchSqls);
// if (rstBatchSqls) {
// return true;
// }else{
// sendMySqlRedis(msg);
// }
// } catch (Exception e) {
// e.printStackTrace();
// log.error("CONSUMER MSG Exception error:{}", e);
// }
// return false;
// }
//
// /**
// * 给 REDIS 队列发送消息 数据库相关
// *
// * @param msg 接收到的内容
// * @return
// */
// private boolean sendMySqlRedis(String msg) {
// try {
// HashMap<String, String> map = new HashMap<>();
// map.put("message", msg);
// MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode());
// stringRedisTemplate.opsForStream().add(record);
// return true;
// } catch (Exception e) {
// e.printStackTrace();
// return false;
// }
// }
} }
\ No newline at end of file
...@@ -18,7 +18,7 @@ import org.springframework.data.redis.stream.Subscription; ...@@ -18,7 +18,7 @@ import org.springframework.data.redis.stream.Subscription;
import java.time.Duration; import java.time.Duration;
@Configuration //@Configuration
public class ConsumerGoblinXlsRedisStreamConfig { public class ConsumerGoblinXlsRedisStreamConfig {
@Autowired @Autowired
ConsumerGoblinXlsRdsReceiver consumerGoblinXlsRdsReceiver; ConsumerGoblinXlsRdsReceiver consumerGoblinXlsRdsReceiver;
......
package com.liquidnet.service.consumer.goblin.receiver; package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
...@@ -20,42 +19,33 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String, ...@@ -20,42 +19,33 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
private IBaseDao baseDao; private IBaseDao baseDao;
@Autowired @Autowired
StringRedisTemplate stringRedisTemplate; StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerMessageHandler(String msg) {
Boolean aBoolean = false; boolean aBoolean = false;
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) { aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
aBoolean = true;
} else {
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
}
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -5,7 +5,6 @@ import com.alibaba.excel.read.listener.PageReadListener; ...@@ -5,7 +5,6 @@ import com.alibaba.excel.read.listener.PageReadListener;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.liquidnet.common.cache.redis.util.RedisUtil; import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.consumer.goblin.dto.PhoneDto; import com.liquidnet.service.consumer.goblin.dto.PhoneDto;
import com.liquidnet.service.consumer.goblin.service.IBaseDao; import com.liquidnet.service.consumer.goblin.service.IBaseDao;
import com.liquidnet.service.goblin.constant.GoblinRedisConst; import com.liquidnet.service.goblin.constant.GoblinRedisConst;
...@@ -15,10 +14,10 @@ import org.springframework.data.redis.connection.stream.MapRecord; ...@@ -15,10 +14,10 @@ import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map;
@Slf4j @Slf4j
public abstract class AbstractXlsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { public abstract class AbstractXlsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
...@@ -33,38 +32,40 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String, ...@@ -33,38 +32,40 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String,
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER XLS [streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"), Integer.parseInt(message.getValue().get("type")), message.getValue().get("skuId")); boolean result = this.consumerMessageHandler(message.getValue());
log.info("CONSUMER XLS_PATH RESULT:{} ==> MESSAGE_ID:{}", result, message.getId()); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSqlDaoHandler(String xlsPath, Integer type, String skuId) { private boolean consumerMessageHandler(Map<String, String> message) {
LinkedList<Object[]> objs = CollectionUtil.linkedListObjectArr(); LinkedList<Object[]> objs = CollectionUtil.linkedListObjectArr();
Boolean aBoolean = false; String xlsPath = null, skuId = null;
Integer type = null;
boolean aBoolean = false;
try { try {
URL url = new URL(xlsPath); xlsPath = message.get("message");
InputStream is = url.openStream(); String finalSkuId = (skuId = message.get("skuId"));
EasyExcel.read(is, PhoneDto.class, new PageReadListener<PhoneDto>(dataList -> { Integer finalType = (type = Integer.parseInt(message.get("type")));
EasyExcel.read(new URL(xlsPath).openStream(), PhoneDto.class, new PageReadListener<PhoneDto>(dataList -> {
for (PhoneDto data : dataList) { for (PhoneDto data : dataList) {
if (data.getMobile() == null) { if (data.getMobile() == null) {
continue; continue;
} }
String redisKey = GoblinRedisConst.REDIS_CAN_BUY.concat(skuId + ":").concat(data.getMobile()); String redisKey = GoblinRedisConst.REDIS_CAN_BUY.concat(finalSkuId + ":").concat(data.getMobile());
if (type.equals(1)) {//添加 if (finalType.equals(1)) {//添加
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("添加 读取到一条数据{}", JSON.toJSONString(data)); log.debug("添加 读取到一条数据{}", JSON.toJSONString(data));
} }
...@@ -77,23 +78,16 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String, ...@@ -77,23 +78,16 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String,
} }
} }
})).sheet().doRead(); })).sheet().doRead();
objs.add(new Object[]{skuId, xlsPath, type, 1, LocalDateTime.now()}); objs.add(new Object[]{skuId, xlsPath, type, 1, LocalDateTime.now()});
aBoolean = baseDao.batchSql(SQL_INSERT_GOODS_BUY_ROSTER_LOG, objs);
} catch (Exception e) { } catch (Exception e) {
objs.add(new Object[]{skuId, xlsPath, type, 2, LocalDateTime.now()}); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), message, e);
aBoolean = false;
log.error("CONSUMER XLS FAIL ==> {}", e.getMessage(), e);
} finally {
try { try {
baseDao.batchSql(SQL_INSERT_GOODS_BUY_ROSTER_LOG, objs); objs.add(new Object[]{skuId, xlsPath, type, 2, LocalDateTime.now()});
} catch (Exception e) { aBoolean = baseDao.batchSql(SQL_INSERT_GOODS_BUY_ROSTER_LOG, objs);
} catch (Exception ignored) {
} }
// if (!aBoolean) {
// HashMap<String, String> map = CollectionUtil.mapStringString();
// map.put("message", xlsPath);
// map.put("skuId", skuId);
// map.put("type", type.toString());
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
// }
} }
return aBoolean; return aBoolean;
} }
......
package com.liquidnet.service.consumer.kylin.config; package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst; import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.*; import com.liquidnet.service.consumer.kylin.receiver.*;
import lombok.var; import lombok.var;
...@@ -14,11 +15,8 @@ import org.springframework.data.redis.connection.stream.StreamOffset; ...@@ -14,11 +15,8 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription; import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration @Configuration
public class ConsumerOrderCloseRedisStreamConfig { public class ConsumerOrderCloseRedisStreamConfig extends RedisStreamConfig {
@Autowired @Autowired
ConsumerOrderClose0RdsReceiver consumerOrderClose0RdsReceiver; ConsumerOrderClose0RdsReceiver consumerOrderClose0RdsReceiver;
@Autowired @Autowired
...@@ -40,81 +38,72 @@ public class ConsumerOrderCloseRedisStreamConfig { ...@@ -40,81 +38,72 @@ public class ConsumerOrderCloseRedisStreamConfig {
@Autowired @Autowired
ConsumerOrderClose9RdsReceiver consumerOrderClose9RdsReceiver; ConsumerOrderClose9RdsReceiver consumerOrderClose9RdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
private Subscription receiveOrderClose0(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose0(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_0.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_0.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_0.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_0.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_0.getKey(), ReadOffset.lastConsumed()), consumerOrderClose0RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_0.getKey(), ReadOffset.lastConsumed()), consumerOrderClose0RdsReceiver
); );
} }
private Subscription receiveOrderClose1(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose1(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_1.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_1.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_1.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_1.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_1.getKey(), ReadOffset.lastConsumed()), consumerOrderClose1RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_1.getKey(), ReadOffset.lastConsumed()), consumerOrderClose1RdsReceiver
); );
} }
private Subscription receiveOrderClose2(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose2(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_2.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_2.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_2.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_2.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_2.getKey(), ReadOffset.lastConsumed()), consumerOrderClose2RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_2.getKey(), ReadOffset.lastConsumed()), consumerOrderClose2RdsReceiver
); );
} }
private Subscription receiveOrderClose3(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose3(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_3.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_3.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_3.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_3.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_3.getKey(), ReadOffset.lastConsumed()), consumerOrderClose3RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_3.getKey(), ReadOffset.lastConsumed()), consumerOrderClose3RdsReceiver
); );
} }
private Subscription receiveOrderClose4(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose4(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_4.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_4.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_4.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_4.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_4.getKey(), ReadOffset.lastConsumed()), consumerOrderClose4RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_4.getKey(), ReadOffset.lastConsumed()), consumerOrderClose4RdsReceiver
); );
} }
private Subscription receiveOrderClose5(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose5(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_5.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_5.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_5.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_5.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_5.getKey(), ReadOffset.lastConsumed()), consumerOrderClose5RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_5.getKey(), ReadOffset.lastConsumed()), consumerOrderClose5RdsReceiver
); );
} }
private Subscription receiveOrderClose6(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose6(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_6.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_6.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_6.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_6.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_6.getKey(), ReadOffset.lastConsumed()), consumerOrderClose6RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_6.getKey(), ReadOffset.lastConsumed()), consumerOrderClose6RdsReceiver
); );
} }
private Subscription receiveOrderClose7(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose7(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_7.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_7.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_7.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_7.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_7.getKey(), ReadOffset.lastConsumed()), consumerOrderClose7RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_7.getKey(), ReadOffset.lastConsumed()), consumerOrderClose7RdsReceiver
); );
} }
private Subscription receiveOrderClose8(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose8(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_8.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_8.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_8.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_8.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_8.getKey(), ReadOffset.lastConsumed()), consumerOrderClose8RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_8.getKey(), ReadOffset.lastConsumed()), consumerOrderClose8RdsReceiver
); );
} }
private Subscription receiveOrderClose9(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) { private Subscription receiveOrderClose9(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck( return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_9.getGroup(), MQConst.GoblinQueue.GOBLIN_UN_PAY_9.name() + t), Consumer.from(MQConst.GoblinQueue.GOBLIN_UN_PAY_9.getGroup(), getHostname().concat(MQConst.GoblinQueue.GOBLIN_UN_PAY_9.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_9.getKey(), ReadOffset.lastConsumed()), consumerOrderClose9RdsReceiver StreamOffset.create(MQConst.GoblinQueue.GOBLIN_UN_PAY_9.getKey(), ReadOffset.lastConsumed()), consumerOrderClose9RdsReceiver
); );
} }
......
...@@ -6,7 +6,6 @@ import com.liquidnet.commons.lang.util.JsonUtils; ...@@ -6,7 +6,6 @@ import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.OrderCloseMapping; import com.liquidnet.service.base.OrderCloseMapping;
import com.liquidnet.service.consumer.kylin.Utils.KylinUtils; import com.liquidnet.service.consumer.kylin.Utils.KylinUtils;
import com.liquidnet.service.kylin.constant.KylinRedisConst; import com.liquidnet.service.kylin.constant.KylinRedisConst;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketEntitiesVo;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo; import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -28,27 +27,24 @@ public abstract class AbstractOptOrderTransferOverTimeRedisReceiver implements S ...@@ -28,27 +27,24 @@ public abstract class AbstractOptOrderTransferOverTimeRedisReceiver implements S
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerOrderTransferOverTimeHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerOrderTransferOverTimeHandler(String msg) { private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; boolean aBoolean = false;
try { try {
OrderCloseMapping.orderCloseMessage mqMessage = JsonUtils.fromJson(msg, OrderCloseMapping.orderCloseMessage.class); OrderCloseMapping.orderCloseMessage mqMessage = JsonUtils.fromJson(msg, OrderCloseMapping.orderCloseMessage.class);
...@@ -68,7 +64,7 @@ public abstract class AbstractOptOrderTransferOverTimeRedisReceiver implements S ...@@ -68,7 +64,7 @@ public abstract class AbstractOptOrderTransferOverTimeRedisReceiver implements S
} }
aBoolean = true; aBoolean = true;
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -2,7 +2,6 @@ package com.liquidnet.service.consumer.kylin.receiver; ...@@ -2,7 +2,6 @@ package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil; import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst; import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.goblin.constant.GoblinRedisConst; import com.liquidnet.service.goblin.constant.GoblinRedisConst;
...@@ -29,8 +28,6 @@ import java.util.HashMap; ...@@ -29,8 +28,6 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import static com.liquidnet.commons.lang.util.DateUtil.DTF_YMD_HMS;
@Slf4j @Slf4j
public abstract class AbstractOrderCloseReceiver implements StreamListener<String, MapRecord<String, String, String>> { public abstract class AbstractOrderCloseReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired @Autowired
...@@ -44,26 +41,24 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin ...@@ -44,26 +41,24 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER ORDER_CLOSE [streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerOrderCloseHandler(message.getValue()); boolean result = this.consumerMessageHandler(message.getValue());
log.info("CONSUMER ORDER_CLOSE RESULT:{} ==> MESSAGE_ID:{}", result, message.getId()); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER ORDER_CLOSE RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), message.getValue(), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerOrderCloseHandler(Map<String, String> messageMap) { private boolean consumerMessageHandler(Map<String, String> messageMap) {
try { try {
String orderCode = messageMap.get("id"), type = messageMap.get("type"), time = messageMap.get("time"); String orderCode = messageMap.get("id"), type = messageMap.get("type"), time = messageMap.get("time");
...@@ -78,11 +73,12 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin ...@@ -78,11 +73,12 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin
Thread.sleep(Math.abs(durationToMillis)); Thread.sleep(Math.abs(durationToMillis));
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
return consumerOrderCloseHandler(messageMap); return consumerMessageHandler(messageMap);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER ORDER_CLOSE FAIL ==> {}", messageMap, e); String redisStreamKey = this.getRedisStreamKey();
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(messageMap).withStreamKey(this.getRedisStreamKey())); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", redisStreamKey, messageMap, e);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(messageMap).withStreamKey(redisStreamKey));
return false; return false;
} }
} }
......
...@@ -25,27 +25,24 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String, ...@@ -25,27 +25,24 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSmsSendHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SMS RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SMS RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER SMS EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER SMS EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSmsSendHandler(String msg) { private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; boolean aBoolean = false;
try { try {
SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class); SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class);
...@@ -53,7 +50,7 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String, ...@@ -53,7 +50,7 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
ObjectNode templateParam = smsMessage.getTemplateParam(); ObjectNode templateParam = smsMessage.getTemplateParam();
aBoolean = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), null == templateParam ? "" : templateParam.toString()); aBoolean = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), null == templateParam ? "" : templateParam.toString());
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -27,23 +27,20 @@ public abstract class AbstractSqlOptOrderCloseRedisReceiver implements StreamLis ...@@ -27,23 +27,20 @@ public abstract class AbstractSqlOptOrderCloseRedisReceiver implements StreamLis
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlOperationOrderCloseHandler(message.getValue().get("message")); boolean result = this.consumerSqlOperationOrderCloseHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId()); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
...@@ -70,7 +67,7 @@ public abstract class AbstractSqlOptOrderCloseRedisReceiver implements StreamLis ...@@ -70,7 +67,7 @@ public abstract class AbstractSqlOptOrderCloseRedisReceiver implements StreamLis
aBoolean = true; aBoolean = true;
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
package com.liquidnet.service.consumer.kylin.receiver; package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
...@@ -20,33 +19,28 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String, ...@@ -20,33 +19,28 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
private IBaseDao baseDao; private IBaseDao baseDao;
@Autowired @Autowired
StringRedisTemplate stringRedisTemplate; StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerMessageHandler(String msg) {
Boolean aBoolean = false; boolean aBoolean = false;
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) { if (sqlMessage == null) {
...@@ -55,7 +49,7 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String, ...@@ -55,7 +49,7 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs()); aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} }
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -5,20 +5,19 @@ import com.alibaba.excel.read.listener.PageReadListener; ...@@ -5,20 +5,19 @@ import com.alibaba.excel.read.listener.PageReadListener;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.liquidnet.common.cache.redis.util.RedisUtil; import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.consumer.kylin.dto.PhoneDto; import com.liquidnet.service.consumer.kylin.dto.PhoneDto;
import com.liquidnet.service.consumer.service.IBaseDao;
import com.liquidnet.service.goblin.constant.GoblinRedisConst; import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import com.liquidnet.service.consumer.service.IBaseDao;
import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map;
@Slf4j @Slf4j
public abstract class AbstractXlsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { public abstract class AbstractXlsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
...@@ -33,38 +32,40 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String, ...@@ -33,38 +32,40 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String,
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER XLS [streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"), Integer.parseInt(message.getValue().get("type")), message.getValue().get("skuId")); boolean result = this.consumerMessageHandler(message.getValue());
log.info("CONSUMER XLS_PATH RESULT:{} ==> MESSAGE_ID:{}", result, message.getId()); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSqlDaoHandler(String xlsPath, Integer type, String skuId) { private boolean consumerMessageHandler(Map<String, String> message) {
LinkedList<Object[]> objs = CollectionUtil.linkedListObjectArr(); LinkedList<Object[]> objs = CollectionUtil.linkedListObjectArr();
Boolean aBoolean = false; String xlsPath = null, skuId = null;
Integer type = null;
boolean aBoolean = false;
try { try {
URL url = new URL(xlsPath); xlsPath = message.get("message");
InputStream is = url.openStream(); String finalSkuId = (skuId = message.get("skuId"));
EasyExcel.read(is, PhoneDto.class, new PageReadListener<PhoneDto>(dataList -> { Integer finalType = (type = Integer.parseInt(message.get("type")));
EasyExcel.read(new URL(xlsPath).openStream(), PhoneDto.class, new PageReadListener<PhoneDto>(dataList -> {
for (PhoneDto data : dataList) { for (PhoneDto data : dataList) {
if (data.getMobile() == null) { if (data.getMobile() == null) {
continue; continue;
} }
String redisKey = GoblinRedisConst.REDIS_CAN_BUY.concat(skuId + ":").concat(data.getMobile()); String redisKey = GoblinRedisConst.REDIS_CAN_BUY.concat(finalSkuId + ":").concat(data.getMobile());
if (type.equals(1)) {//添加 if (finalType.equals(1)) {//添加
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("添加 读取到一条数据{}", JSON.toJSONString(data)); log.debug("添加 读取到一条数据{}", JSON.toJSONString(data));
} }
...@@ -77,23 +78,16 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String, ...@@ -77,23 +78,16 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String,
} }
} }
})).sheet().doRead(); })).sheet().doRead();
objs.add(new Object[]{skuId, xlsPath, type, 1, LocalDateTime.now()}); objs.add(new Object[]{skuId, xlsPath, type, 1, LocalDateTime.now()});
aBoolean = baseDao.batchSql(SQL_INSERT_GOODS_BUY_ROSTER_LOG, objs);
} catch (Exception e) { } catch (Exception e) {
objs.add(new Object[]{skuId, xlsPath, type, 2, LocalDateTime.now()}); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), message, e);
aBoolean = false;
log.error("CONSUMER XLS FAIL ==> {}", e.getMessage(), e);
} finally {
try { try {
baseDao.batchSql(SQL_INSERT_GOODS_BUY_ROSTER_LOG, objs); objs.add(new Object[]{skuId, xlsPath, type, 2, LocalDateTime.now()});
} catch (Exception e) { aBoolean = baseDao.batchSql(SQL_INSERT_GOODS_BUY_ROSTER_LOG, objs);
} catch (Exception ignored) {
} }
// if (!aBoolean) {
// HashMap<String, String> map = CollectionUtil.mapStringString();
// map.put("message", xlsPath);
// map.put("skuId", skuId);
// map.put("type", type.toString());
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
// }
} }
return aBoolean; return aBoolean;
} }
......
package com.liquidnet.servce.consumer.stone.service.receiver; package com.liquidnet.servce.consumer.stone.service.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.servce.consumer.stone.service.IBaseDao; import com.liquidnet.servce.consumer.stone.service.IBaseDao;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
...@@ -29,72 +30,108 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma ...@@ -29,72 +30,108 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis PAY key:{} 的消息",getRedisStreamKey()); String redisStreamKey = this.getRedisStreamKey();
log.info("message id " + message.getId()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
log.info("stream " + message.getStream()); boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("body " + message.getValue()); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
// if(result){
// 消费成功确认,消息删除和消息确认是一个事务
log.info("consumer success delete message messageId:{} ",message.getId());
try {
// stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
// stringRedisTemplate.exec();
} catch (Exception e) {
e.printStackTrace();
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
}finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("error: {}",e);
}
}
// }
}
private boolean consumerSqlDaoHandler(String msg) {
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls())); } catch (Exception e) {
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs())); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", this.getRedisStreamKey(), result, message.getValue(), e);
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs()); }
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls); try {
if (rstBatchSqls) { stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
return true;
}else{
sendMySqlRedis(msg);
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", this.getRedisStreamKey(), result, message.getValue(), e);
log.error("CONSUMER SQL Exception error:{}", e);
} }
return false;
} }
/** private boolean consumerMessageHandler(String msg) {
* 给 REDIS 队列发送消息 数据库相关 boolean aBoolean = false;
*
* @param msg 接收到的内容
* @return
*/
private boolean sendMySqlRedis(String msg) {
try { try {
HashMap<String, String> map = new HashMap<>(); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
map.put("message", msg); aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(getRedisStreamKey());
stringRedisTemplate.opsForStream().add(record);
return true;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
return false; } finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
} }
return aBoolean;
} }
// @Override
// public void onMessage(MapRecord<String, String, String> message) {
// log.info("接受到来自redis PAY key:{} 的消息",getRedisStreamKey());
// log.info("message id " + message.getId());
// log.info("stream " + message.getStream());
// log.info("body " + message.getValue());
// boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
//
//// if(result){
// // 消费成功确认,消息删除和消息确认是一个事务
// log.info("consumer success delete message messageId:{} ",message.getId());
// try {
//// stringRedisTemplate.multi();
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
//// stringRedisTemplate.exec();
// } catch (Exception e) {
// e.printStackTrace();
// log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
// }finally {
// try {
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
// } catch (Exception e) {
// log.error("error: {}",e);
// }
// }
//// }
// }
//
// private boolean consumerSqlDaoHandler(String msg) {
// try {
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
// log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
// log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
// Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
// if (rstBatchSqls) {
// return true;
// }else{
// sendMySqlRedis(msg);
// }
// } catch (Exception e) {
// e.printStackTrace();
// log.error("CONSUMER SQL Exception error:{}", e);
// }
// return false;
// }
//
// /**
// * 给 REDIS 队列发送消息 数据库相关
// *
// * @param msg 接收到的内容
// * @return
// */
// private boolean sendMySqlRedis(String msg) {
// try {
// HashMap<String, String> map = new HashMap<>();
// map.put("message", msg);
// MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(getRedisStreamKey());
// stringRedisTemplate.opsForStream().add(record);
// return true;
// } catch (Exception e) {
// e.printStackTrace();
// return false;
// }
// }
protected abstract String getRedisStreamKey(); protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup(); protected abstract String getRedisStreamGroup();
......
...@@ -22,37 +22,30 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String, ...@@ -22,37 +22,30 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerMessageHandler(String msg) {
Boolean aBoolean = false; boolean aBoolean = false;
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) { aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
aBoolean = true;
} else {
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
}
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -33,28 +33,25 @@ public class ConsumerSweetStoneIntegralReceiver extends AbstractSqlRedisReceiver ...@@ -33,28 +33,25 @@ public class ConsumerSweetStoneIntegralReceiver extends AbstractSqlRedisReceiver
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerMessageHandler(String msg) {
Boolean aBoolean = false; boolean aBoolean = false;
try { try {
SweetStoneIntegralParam param = JsonUtils.fromJson(msg, SweetStoneIntegralParam.class); SweetStoneIntegralParam param = JsonUtils.fromJson(msg, SweetStoneIntegralParam.class);
if (param == null) { if (param == null) {
...@@ -83,7 +80,7 @@ public class ConsumerSweetStoneIntegralReceiver extends AbstractSqlRedisReceiver ...@@ -83,7 +80,7 @@ public class ConsumerSweetStoneIntegralReceiver extends AbstractSqlRedisReceiver
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -32,37 +32,30 @@ public class ConsumerSweetTemplateMsgReceiver extends AbstractSqlRedisReceiver { ...@@ -32,37 +32,30 @@ public class ConsumerSweetTemplateMsgReceiver extends AbstractSqlRedisReceiver {
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]", String redisStreamKey = this.getRedisStreamKey();
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) { } catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e); log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} finally { }
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); } catch (Exception e) {
} catch (Exception ignored) { log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
} }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerMessageHandler(String msg) {
Boolean aBoolean = false; boolean aBoolean = false;
try { try {
SweetOpenSendMsgParam sweetOpenSendMsgParam = JsonUtils.fromJson(msg, SweetOpenSendMsgParam.class); SweetOpenSendMsgParam sweetOpenSendMsgParam = JsonUtils.fromJson(msg, SweetOpenSendMsgParam.class);
if (sweetOpenSendMsgParam == null) { aBoolean = null == sweetOpenSendMsgParam || sweetWechatTemplateService.openSendMsg(sweetOpenSendMsgParam);
aBoolean = true;
} else {
aBoolean = sweetWechatTemplateService.openSendMsg(sweetOpenSendMsgParam);
}
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { } finally {
if (!aBoolean) { if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); HashMap<String, String> map = CollectionUtil.mapStringString();
......
...@@ -11,7 +11,7 @@ db.createCollection("GoblinStoreInfoVo"); ...@@ -11,7 +11,7 @@ db.createCollection("GoblinStoreInfoVo");
db.createCollection("GoblinGoodsInfoVo"); db.createCollection("GoblinGoodsInfoVo");
db.createCollection("GoblinGoodsSkuInfoVo"); db.createCollection("GoblinGoodsSkuInfoVo");
db.createCollection("GoblinStoreCouponBasicVo"); db.createCollection("GoblinStoreCouponBasicVo");
db.createCollection("GoblinUserCouponVo"); db.createCollection("GoblinUserCouponBasicVo");
db.createCollection("GoblinFrontBanner"); db.createCollection("GoblinFrontBanner");
db.createCollection("GoblinFrontCube"); db.createCollection("GoblinFrontCube");
db.createCollection("GoblinFrontHotWord"); db.createCollection("GoblinFrontHotWord");
...@@ -40,7 +40,7 @@ sh.shardCollection("test_ln_scene.GoblinStoreInfoVo",{"storeId":"hashed"}); ...@@ -40,7 +40,7 @@ sh.shardCollection("test_ln_scene.GoblinStoreInfoVo",{"storeId":"hashed"});
sh.shardCollection("test_ln_scene.GoblinGoodsInfoVo",{"spuId":"hashed"}); sh.shardCollection("test_ln_scene.GoblinGoodsInfoVo",{"spuId":"hashed"});
sh.shardCollection("test_ln_scene.GoblinGoodsSkuInfoVo",{"skuId":"hashed"}); sh.shardCollection("test_ln_scene.GoblinGoodsSkuInfoVo",{"skuId":"hashed"});
sh.shardCollection("test_ln_scene.GoblinStoreCouponBasicVo",{"storeCouponId":"hashed"}); sh.shardCollection("test_ln_scene.GoblinStoreCouponBasicVo",{"storeCouponId":"hashed"});
sh.shardCollection("test_ln_scene.GoblinUserCouponVo",{"ucouponId":"hashed"}); sh.shardCollection("test_ln_scene.GoblinUserCouponBasicVo",{"ucouponId":"hashed"});
sh.shardCollection("test_ln_scene.GoblinFrontBanner",{"bannerId":"hashed"}); sh.shardCollection("test_ln_scene.GoblinFrontBanner",{"bannerId":"hashed"});
sh.shardCollection("test_ln_scene.GoblinFrontCube",{"cubeId":"hashed"}); sh.shardCollection("test_ln_scene.GoblinFrontCube",{"cubeId":"hashed"});
sh.shardCollection("test_ln_scene.GoblinFrontHotWord",{"hotWordId":"hashed"}); sh.shardCollection("test_ln_scene.GoblinFrontHotWord",{"hotWordId":"hashed"});
...@@ -72,7 +72,7 @@ db.GoblinGoodsInfoVo.createIndex({spuNo:"hashed"}); ...@@ -72,7 +72,7 @@ db.GoblinGoodsInfoVo.createIndex({spuNo:"hashed"});
db.GoblinGoodsSkuInfoVo.createIndex({skuId:"hashed"}); db.GoblinGoodsSkuInfoVo.createIndex({skuId:"hashed"});
db.GoblinGoodsSkuInfoVo.createIndex({storeId:"hashed"}); db.GoblinGoodsSkuInfoVo.createIndex({storeId:"hashed"});
db.GoblinStoreCouponBasicVo.createIndex({storeCouponId:"hashed"}); db.GoblinStoreCouponBasicVo.createIndex({storeCouponId:"hashed"});
db.GoblinUserCouponVo.createIndex({uid:"hashed"}); db.GoblinUserCouponBasicVo.createIndex({uid:"hashed"});
db.GoblinFrontBanner.createIndex({bannerId:"hashed"}); db.GoblinFrontBanner.createIndex({bannerId:"hashed"});
db.GoblinFrontCube.createIndex({cubeId:"hashed"}); db.GoblinFrontCube.createIndex({cubeId:"hashed"});
db.GoblinFrontHotWord.createIndex({hotWordId:"hashed"}); db.GoblinFrontHotWord.createIndex({hotWordId:"hashed"});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment