记得上下班打卡 | git大法好,push需谨慎

Commit 00a41d21 authored by anjiabin's avatar anjiabin

修改redis消费队列方式

parent 86d3c4bf
...@@ -37,6 +37,7 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma ...@@ -37,6 +37,7 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma
log.info("body " + message.getValue()); log.info("body " + message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
if(result){
// 消费成功确认,消息删除和消息确认是一个事务 // 消费成功确认,消息删除和消息确认是一个事务
log.info("consumer success delete message messageId:{} ",message.getId()); log.info("consumer success delete message messageId:{} ",message.getId());
try { try {
...@@ -49,6 +50,7 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma ...@@ -49,6 +50,7 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage()); log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
} }
} }
}
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerSqlDaoHandler(String msg) {
try { try {
...@@ -59,8 +61,6 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma ...@@ -59,8 +61,6 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls); log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) { if (rstBatchSqls) {
return true; return true;
}else{
sendMySqlRedis(msg);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
......
...@@ -27,10 +27,11 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str ...@@ -27,10 +27,11 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis REFUND 的消息"); log.info("接受到来自redis REFUND 的消息");
System.out.println("message id " + message.getId()); log.info("message id " + message.getId());
System.out.println("stream " + message.getStream()); log.info("stream " + message.getStream());
System.out.println("body " + message.getValue()); log.info("body " + message.getValue());
consumerSqlDaoHandler(message.getValue().get("message")); boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
if(result){
log.error("consumer success delete message messageId:{} ",message.getId()); log.error("consumer success delete message messageId:{} ",message.getId());
try { try {
// stringRedisTemplate.multi(); // stringRedisTemplate.multi();
...@@ -42,8 +43,9 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str ...@@ -42,8 +43,9 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage()); log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
} }
} }
}
private void consumerSqlDaoHandler(String msg) { private boolean consumerSqlDaoHandler(String msg) {
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls())); log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
...@@ -51,13 +53,13 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str ...@@ -51,13 +53,13 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs()); Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls); log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) { if (rstBatchSqls) {
//应答 return true;
} else {
sendMySqlRedis(msg);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
log.error("CONSUMER SQL Exception error:{}", e);
} }
return false;
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment