记得上下班打卡 | git大法好,push需谨慎

Commit 9ccbb68f authored by jiangxiulong's avatar jiangxiulong

Merge remote-tracking branch 'origin/dev' into dev

parents 62100e15 6398aa93
...@@ -3,7 +3,6 @@ package com.liquidnet.service.consumer.dragon.receiver; ...@@ -3,7 +3,6 @@ package com.liquidnet.service.consumer.dragon.receiver;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao; import com.liquidnet.service.consumer.dragon.service.IBaseDao;
import com.liquidnet.service.dragon.constant.DragonConstant;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
...@@ -37,19 +36,19 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma ...@@ -37,19 +36,19 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma
log.info("body " + message.getValue()); log.info("body " + message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
if(result){ // if(result){
// 消费成功确认,消息删除和消息确认是一个事务 // 消费成功确认,消息删除和消息确认是一个事务
log.info("consumer success delete message messageId:{} ",message.getId()); log.info("consumer success delete message messageId:{} ",message.getId());
try { try {
// stringRedisTemplate.multi(); // stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// stringRedisTemplate.exec(); // stringRedisTemplate.exec();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage()); log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
} }
} // }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerSqlDaoHandler(String msg) {
...@@ -61,6 +60,8 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma ...@@ -61,6 +60,8 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls); log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) { if (rstBatchSqls) {
return true; return true;
}else{
sendMySqlRedis(msg);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
...@@ -79,7 +80,7 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma ...@@ -79,7 +80,7 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma
try { try {
HashMap<String, String> map = new HashMap<>(); HashMap<String, String> map = new HashMap<>();
map.put("message", msg); map.put("message", msg);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_KEY.getCode()); MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(getRedisStreamKey());
stringRedisTemplate.opsForStream().add(record); stringRedisTemplate.opsForStream().add(record);
return true; return true;
} catch (Exception e) { } catch (Exception e) {
......
...@@ -31,18 +31,18 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str ...@@ -31,18 +31,18 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str
log.info("stream " + message.getStream()); log.info("stream " + message.getStream());
log.info("body " + message.getValue()); log.info("body " + message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
if(result){ // if(result){
log.error("consumer success delete message messageId:{} ",message.getId()); log.error("consumer success delete message messageId:{} ",message.getId());
try { try {
// stringRedisTemplate.multi(); // stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId()); stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
// stringRedisTemplate.exec(); // stringRedisTemplate.exec();
} catch (Exception e) { } catch (Exception e) {
log.error("delete redis queue message Exception error: {} ",e); log.error("delete redis queue message Exception error: {} ",e);
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage()); log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
} }
} // }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerSqlDaoHandler(String msg) {
...@@ -54,6 +54,8 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str ...@@ -54,6 +54,8 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls); log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) { if (rstBatchSqls) {
return true; return true;
}else{
sendMySqlRedis(msg);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment