记得上下班打卡 | git大法好,push需谨慎

Commit fe4c3436 authored by 张国柄's avatar 张国柄

MQ消费调整;

parent 7e68aba0
......@@ -24,7 +24,7 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]",
log.debug("CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSmsSendHandler(message.getValue().get("message"));
......@@ -34,7 +34,7 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SMS RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
log.error("#CONSUMER SMS RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
......
......@@ -23,17 +23,17 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.info("CONSUMER SMS RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
......@@ -48,7 +48,7 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
......
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.common.sms.processor.SmsProcessor;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SmsMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import javax.annotation.Resource;
import java.util.HashMap;
@Slf4j
public abstract class AbstractSmsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
......@@ -21,33 +24,44 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]",
log.debug("CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSmsSendHandler(message.getValue().get("message"));
log.info("CONSUMER SMS RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
log.debug("CONSUMER SMS RESULT:{}", result);
// 消费成功确认,消息删除和消息确认是一个事务
if (result) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SMS RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SMS SUC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
} catch (Exception ignored) {
}
}
}
private boolean consumerSmsSendHandler(String msg) {
boolean aBoolean = false;
try {
SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class);
return smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
aBoolean = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false;
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
\ No newline at end of file
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.OrderCloseMapping;
import com.liquidnet.service.consumer.kylin.Utils.KylinUtils;
......@@ -9,9 +10,12 @@ import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.util.HashMap;
@Slf4j
public abstract class AbstractSqlOptOrderCloseRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
......@@ -23,25 +27,28 @@ public abstract class AbstractSqlOptOrderCloseRedisReceiver implements StreamLis
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlOperationOrderCloseHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
log.debug("CONSUMER SQL RESULT:{}", result);
// 消费成功确认,消息删除和消息确认是一个事务
if (result) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SMS SUCC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
} catch (Exception ignored) {
}
}
}
private boolean consumerSqlOperationOrderCloseHandler(String msg) {
boolean aBoolean = false;
try {
OrderCloseMapping.orderCloseMessage mqMessage = JsonUtils.fromJson(msg, OrderCloseMapping.orderCloseMessage.class);
......@@ -61,12 +68,20 @@ public abstract class AbstractSqlOptOrderCloseRedisReceiver implements StreamLis
}
}
return true;
aBoolean = true;
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false;
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.service.IBaseDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.util.HashMap;
@Slf4j
public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
......@@ -18,33 +22,44 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
log.debug("CONSUMER SQL RESULT:{}", result);
// 消费成功确认,消息删除和消息确认是一个事务
if (result) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SMS SUCC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
} catch (Exception ignored) {
}
}
}
private boolean consumerSqlDaoHandler(String msg) {
Boolean aBoolean = false;
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
return baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false;
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
\ No newline at end of file
......@@ -9,4 +9,9 @@ public class ConsumerKylinSmsNoticeRdsReceiver extends AbstractSmsRedisReceiver
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SMS_NOTICE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SMS_NOTICE.getGroup();
}
}
......@@ -9,4 +9,9 @@ public class ConsumerKylinSqlOptOrderCloseRedisReceiver extends AbstractSqlOptOr
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_ORDER_CLOSE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SQL_ORDER_CLOSE.getGroup();
}
}
......@@ -9,4 +9,9 @@ public class ConsumerKylinSqlOrderAgainRdsReceiver extends AbstractSqlRedisRecei
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_ORDER_AGAIN.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SQL_ORDER_AGAIN.getGroup();
}
}
......@@ -9,4 +9,9 @@ public class ConsumerKylinSqlOrderCreateRdsReceiver extends AbstractSqlRedisRece
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_ORDER_CREATE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SQL_ORDER_CREATE.getGroup();
}
}
......@@ -9,4 +9,9 @@ public class ConsumerKylinSqlOrderOvertimeRefundRdsReceiver extends AbstractSqlR
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_ORDER_OVERTIME_REFUND.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SQL_ORDER_OVERTIME_REFUND.getGroup();
}
}
......@@ -9,4 +9,9 @@ public class ConsumerKylinSqlOrderPayRdsReceiver extends AbstractSqlRedisReceive
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_ORDER_PAY.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SQL_ORDER_PAY.getGroup();
}
}
......@@ -9,4 +9,9 @@ public class ConsumerKylinSqlOrderRefundRdsReceiver extends AbstractSqlRedisRece
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_ORDER_REFUND.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SQL_ORDER_REFUND.getGroup();
}
}
......@@ -9,4 +9,9 @@ public class ConsumerKylinSqlOrderWithdrawRdsReceiver extends AbstractSqlRedisRe
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_ORDER_WITHDRAW.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SQL_ORDER_WITHDRAW.getGroup();
}
}
......@@ -9,4 +9,9 @@ public class ConsumerKylinSqlPerformanceLackRdsReceiver extends AbstractSqlRedis
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_PERFORMANCE_LACK.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SQL_PERFORMANCE_LACK.getGroup();
}
}
......@@ -9,4 +9,9 @@ public class ConsumerKylinSqlStationRdsReceiver extends AbstractSqlRedisReceiver
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_STATION.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SQL_STATION.getGroup();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment