记得上下班打卡 | git大法好,push需谨慎

Commit 8219a7d3 authored by 张国柄's avatar 张国柄

MQ消费调整;

parent 9ccbb68f
package com.liquidnet.service.consumer.adam.receiver; package com.liquidnet.service.consumer.adam.receiver;
import com.liquidnet.common.sms.processor.SmsProcessor; import com.liquidnet.common.sms.processor.SmsProcessor;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SmsMessage; import com.liquidnet.service.base.SmsMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap;
@Slf4j @Slf4j
public abstract class AbstractSmsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { public abstract class AbstractSmsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
...@@ -25,29 +28,39 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String, ...@@ -25,29 +28,39 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSmsSendHandler(message.getValue().get("message")); boolean result = this.consumerSmsSendHandler(message.getValue().get("message"));
log.info("CONSUMER SMS RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
log.debug("CONSUMER SMS RESULT:{}", result); try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// 消费成功确认,消息删除和消息确认是一个事务 stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
if (result) { } catch (Exception e) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId()); log.error("CONSUMER SMS RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try { try {
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) { } catch (Exception ignored) {
log.error("CONSUMER SMS SUC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
} }
} }
} }
private boolean consumerSmsSendHandler(String msg) { private boolean consumerSmsSendHandler(String msg) {
boolean aBoolean = false;
try { try {
SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class); SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class);
return smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString()); aBoolean = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false; } finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
} }
return aBoolean;
} }
protected abstract String getRedisStreamKey(); protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
} }
package com.liquidnet.service.consumer.adam.receiver; package com.liquidnet.service.consumer.adam.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.adam.service.IBaseDao; import com.liquidnet.service.consumer.adam.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import java.util.HashMap;
@Slf4j @Slf4j
public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired @Autowired
...@@ -22,29 +27,39 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String, ...@@ -22,29 +27,39 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message")); boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.info("CONSUMER SMS RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
log.debug("CONSUMER SQL RESULT:{}", result); try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// 消费成功确认,消息删除和消息确认是一个事务 stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
if (result) { } catch (Exception e) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId()); log.error("CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try { try {
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId()); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) { } catch (Exception ignored) {
log.error("CONSUMER SMS SUCC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
} }
} }
} }
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerSqlDaoHandler(String msg) {
Boolean aBoolean = false;
try { try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
return baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs()); aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e); log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false; } finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
} }
return aBoolean;
} }
protected abstract String getRedisStreamKey(); protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
} }
...@@ -9,4 +9,9 @@ public class ConsumerAdamSmsNoticeRdsReceiver extends AbstractSmsRedisReceiver { ...@@ -9,4 +9,9 @@ public class ConsumerAdamSmsNoticeRdsReceiver extends AbstractSmsRedisReceiver {
protected String getRedisStreamKey() { protected String getRedisStreamKey() {
return MQConst.AdamQueue.SMS_NOTICE.getKey(); return MQConst.AdamQueue.SMS_NOTICE.getKey();
} }
@Override
protected String getRedisStreamGroup() {
return MQConst.AdamQueue.SMS_NOTICE.getGroup();
}
} }
...@@ -9,4 +9,9 @@ public class ConsumerAdamSqlUCenterRdsReceiver extends AbstractSqlRedisReceiver ...@@ -9,4 +9,9 @@ public class ConsumerAdamSqlUCenterRdsReceiver extends AbstractSqlRedisReceiver
protected String getRedisStreamKey() { protected String getRedisStreamKey() {
return MQConst.AdamQueue.SQL_UCENTER.getKey(); return MQConst.AdamQueue.SQL_UCENTER.getKey();
} }
@Override
protected String getRedisStreamGroup() {
return MQConst.AdamQueue.SQL_UCENTER.getGroup();
}
} }
...@@ -9,4 +9,9 @@ public class ConsumerAdamSqlUMemberRdsReceiver extends AbstractSqlRedisReceiver ...@@ -9,4 +9,9 @@ public class ConsumerAdamSqlUMemberRdsReceiver extends AbstractSqlRedisReceiver
protected String getRedisStreamKey() { protected String getRedisStreamKey() {
return MQConst.AdamQueue.SQL_UMEMBER.getKey(); return MQConst.AdamQueue.SQL_UMEMBER.getKey();
} }
@Override
protected String getRedisStreamGroup() {
return MQConst.AdamQueue.SQL_UMEMBER.getGroup();
}
} }
...@@ -9,4 +9,9 @@ public class ConsumerAdamSqlURegisterRdsReceiver extends AbstractSqlRedisReceive ...@@ -9,4 +9,9 @@ public class ConsumerAdamSqlURegisterRdsReceiver extends AbstractSqlRedisReceive
protected String getRedisStreamKey() { protected String getRedisStreamKey() {
return MQConst.AdamQueue.SQL_UREGISTER.getKey(); return MQConst.AdamQueue.SQL_UREGISTER.getKey();
} }
@Override
protected String getRedisStreamGroup() {
return MQConst.AdamQueue.SQL_UREGISTER.getGroup();
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment