记得上下班打卡 | git大法好,push需谨慎

Commit 2ddb4997 authored by 张国柄's avatar 张国柄

+短信发送队列消费;

parent 9becb120
......@@ -4,8 +4,22 @@ public class MQConst {
// public static final String EXCHANGES_LIQUIDNET_SQL = "liquidnet.sql";
// public static final String ROUTING_KEY_SQL = "sql";
// public static final String QUEUES_SQL_MAIN = "queue.sql.main";
/* -------------------------------------------------------- | 短信中心 */
/* -------------------------------------------------------- */
// 短信发送
public static final String EX_LNS_SMS_SENDER = "lns.sms.sender";
// 验证码
public static final String RK_SMS_CODE = "sms.code";
public static final String QUEUES_SMS_CODE = "queue.sms.code";
// 通知
public static final String RK_SMS_NOTICE = "sms.notice";
public static final String QUEUES_SMS_NOTICE = "queue.sms.notice";
// 推广(Batch)
public static final String RK_SMS_SPREAD = "sms.spread";
public static final String QUEUES_SMS_SPREAD = "queue.sms.spread";
/* -------------------------------------------------------- | 用户中心 */
// 用户中心
public static final String EX_LNS_SQL_UCENTER = "lns.sql.ucenter";
......@@ -19,7 +33,7 @@ public class MQConst {
public static final String RK_SQL_UMEMBER = "sql.umember";
public static final String QUEUES_SQL_UMEMBER = "queue.sql.umember";
/* -------------------------------------------------------- */
/* -------------------------------------------------------- | 票务中心 */
//缺票登记
public static final String EXCHANGES_LIQUIDNET_SQL_PERFORMANCE_LACK = "liquidnet.sql.performance.lack";
......
package com.liquidnet.service.base;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.liquidnet.commons.lang.util.JsonUtils;
import java.io.Serializable;
public class SmsMessage implements Serializable, Cloneable {
private static final long serialVersionUID = 8484332083582497676L;
private String phone;
private String signName;
private String templateCode;
private ObjectNode templateParam;
public String getPhone() {
return phone;
}
public String getSignName() {
return signName;
}
public String getTemplateCode() {
return templateCode;
}
public ObjectNode getTemplateParam() {
return templateParam;
}
private final static SmsMessage instance = new SmsMessage();
public static SmsMessage getNew() {
try {
return (SmsMessage) instance.clone();
} catch (CloneNotSupportedException e) {
return new SmsMessage();
}
}
public SmsMessage setPhone(String phone) {
this.phone = phone;
return this;
}
public SmsMessage setSignName(String signName) {
this.signName = signName;
return this;
}
public SmsMessage setTemplateCode(String templateCode) {
this.templateCode = templateCode;
return this;
}
public SmsMessage setTemplateParam(String key, String val) {
if (null == this.templateParam) {
this.templateParam = JsonUtils.OM().createObjectNode();
}
this.templateParam.put(key, val);
return this;
}
}
......@@ -31,6 +31,11 @@
<artifactId>liquidnet-common-cache-redisson</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-sms</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
......
package com.liquidnet.service.consumer.adam.service.processor;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.common.sms.processor.SmsProcessor;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SmsMessage;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.adam.service.IBaseDao;
import com.rabbitmq.client.Channel;
......@@ -28,6 +30,8 @@ import java.io.IOException;
public class ConsumerAdamUCenterProcessor {
@Resource
IBaseDao baseDao;
@Resource
SmsProcessor smsProcessor;
// @RabbitListener(bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL), key = MQConst.ROUTING_KEY_SQL,
......@@ -62,20 +66,42 @@ public class ConsumerAdamUCenterProcessor {
String consumerQueue = properties.getConsumerQueue();
long deliveryTag = properties.getDeliveryTag();
log.info("CONSUMER SQL ==> [consumerQueue:{},deliveryTag:{}]", consumerQueue, deliveryTag);
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(new String(msg.getBody()), SqlMapping.SqlMessage.class);
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
String msgBody = new String(msg.getBody());
log.debug("CONSUMER SQL ==> Preparing:{}", msgBody);
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msgBody, SqlMapping.SqlMessage.class);
try {
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
channel.basicAck(deliveryTag, false);
} else {
log.warn("###CONSUMER SQL[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, JsonUtils.toJson(sqlMessage));
log.warn("###CONSUMER SQL[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody);
channel.basicAck(deliveryTag, false);
}
} catch (IOException e) {
log.error("CONSUMER SQL[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, JsonUtils.toJson(sqlMessage), e);
log.error("CONSUMER SQL[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody, e);
}
}
private void consumerSmsSendHandler(Message msg, Channel channel) {
MessageProperties properties = msg.getMessageProperties();
String consumerQueue = properties.getConsumerQueue();
long deliveryTag = properties.getDeliveryTag();
log.info("CONSUMER SMS ==> [consumerQueue:{},deliveryTag:{}]", consumerQueue, deliveryTag);
String msgBody = new String(msg.getBody());
log.debug("CONSUMER SMS ==> Preparing:{}", msgBody);
SmsMessage smsMessage = JsonUtils.fromJson(msgBody, SmsMessage.class);
try {
boolean result = smsProcessor.aliyunDysmsSend(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
log.debug("CONSUMER SMS result of execution:{}", result);
if (result) {
channel.basicAck(deliveryTag, false);
} else {
log.warn("###CONSUMER SMS[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody);
channel.basicAck(deliveryTag, false);
}
} catch (IOException e) {
log.error("CONSUMER SMS[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody, e);
}
}
......@@ -121,5 +147,34 @@ public class ConsumerAdamUCenterProcessor {
this.consumerSqlDaoHandler(msg, channel);
}
/* ================================================================== | 短信验证码 */
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
key = MQConst.RK_SMS_CODE,
value = @Queue(MQConst.QUEUES_SMS_CODE)
),
concurrency = "10"
)
public void consumerSqlForSmsCode(Message msg, Channel channel) {
this.consumerSmsSendHandler(msg, channel);
}
/* ================================================================== | 短信通知 */
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
key = MQConst.RK_SMS_NOTICE,
value = @Queue(MQConst.QUEUES_SMS_NOTICE)
),
concurrency = "5"
)
public void consumerSqlForSmsNotice(Message msg, Channel channel) {
this.consumerSmsSendHandler(msg, channel);
}
/* ================================================================== | */
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment