记得上下班打卡 | git大法好,push需谨慎

Commit 387db323 authored by 张国柄's avatar 张国柄

Kylin:替换RABBIT为REDIS实现MQ;

parent 6d2af5d3
......@@ -4,6 +4,7 @@ public class MQConst {
public enum AdamQueue {
SMS_NOTICE("adam:stream:rk.sms.notice", "group.sms.sender", "短信通知"),
// SMS_SPREAD("adam:stream:rk.sms.spread", "group.sms.sender", "短信推广"),
SQL_UREGISTER("adam:stream:rk.sql.uregister", "group.sql.ucenter", "用户注册"),
SQL_UCENTER("adam:stream:rk.sql.ucenter", "group.sql.ucenter", "用户中心"),
SQL_UMEMBER("adam:stream:rk.sql.umember", "group.sql.ucenter", "购买会员"),
......@@ -30,4 +31,41 @@ public class MQConst {
return desc;
}
}
public enum KylinQueue {
SMS_NOTICE("adam:stream:rk.sms.notice", "group.sms.sender", "短信通知"),
// SMS_SPREAD("adam:stream:rk.sms.spread", "group.sms.sender", "短信推广"),
SQL_PERFORMANCE_LACK("kylin:stream:rk.performance.lack", "group.performance.lack", "缺票登记"),
SQL_ORDER_CREATE("kylin:stream:rk.order.create", "group.order.create", "创建订单"),
SQL_ORDER_AGAIN("kylin:stream:rk.order.again", "group.order.again", "订单再次支付"),
SQL_ORDER_CLOSE("kylin:stream:rk.order.close", "group.order.close", "订单关闭"),
SQL_ORDER_PAY("kylin:stream:rk.order.pay", "group.order.pay", "订单支付"),
SQL_ORDER_REFUND("kylin:stream:rk.order.refund", "group.order.refund", "订单申请退款"),
SQL_ORDER_WITHDRAW("kylin:stream:rk.order.withdraw", "group.order.withdraw", "订单申请撤回"),
SQL_ORDER_OVERTIME_REFUND("kylin:stream:rk.order.overtime.refund", "group.order.overtime.refund", "超时支付申请退款"),
SQL_STATION("kylin:stream:rk.station", "group.station", "验票更新"),
;
private final String key;
private final String group;
private final String desc;
KylinQueue(String key, String group, String desc) {
this.key = key;
this.group = group;
this.desc = desc;
}
public String getKey() {
return key;
}
public String getGroup() {
return group;
}
public String getDesc() {
return desc;
}
}
}
package com.liquidnet.service.consumer.adam.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerAdamSmsNoticeRdsReceiver extends AbstractSmsRedisReceiver {
@Override
......
package com.liquidnet.service.consumer.adam.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerAdamSqlUCenterRdsReceiver extends AbstractSqlRedisReceiver {
@Override
......
package com.liquidnet.service.consumer.adam.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerAdamSqlUMemberRdsReceiver extends AbstractSqlRedisReceiver {
@Override
......
package com.liquidnet.service.consumer.adam.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerAdamSqlURegisterRdsReceiver extends AbstractSqlRedisReceiver {
@Override
......
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.common.sms.processor.SmsProcessor;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SmsMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import javax.annotation.Resource;
@Slf4j
public abstract class AbstractSmsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Resource
SmsProcessor smsProcessor;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSmsSendHandler(message.getValue().get("message"));
log.debug("CONSUMER SMS RESULT:{}", result);
// 消费成功确认,消息删除和消息确认是一个事务
if (result) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId());
try {
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SMS SUC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
}
}
}
private boolean consumerSmsSendHandler(String msg) {
try {
SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class);
return smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false;
}
}
protected abstract String getRedisStreamKey();
}
\ No newline at end of file
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.OrderCloseMapping;
import com.liquidnet.service.consumer.kylin.Utils.KylinUtils;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketEntitiesVo;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
@Slf4j
public abstract class AbstractSqlOptOrderCloseRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
@Autowired
private KylinUtils kylinUtils;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlOperationOrderCloseHandler(message.getValue().get("message"));
log.debug("CONSUMER SQL RESULT:{}", result);
// 消费成功确认,消息删除和消息确认是一个事务
if (result) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId());
try {
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SMS SUCC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
}
}
}
private boolean consumerSqlOperationOrderCloseHandler(String msg) {
try {
OrderCloseMapping.orderCloseMessage mqMessage = JsonUtils.fromJson(msg, OrderCloseMapping.orderCloseMessage.class);
for (int x = 0; x < mqMessage.getOrderTicketIds().size(); x++) {
String t = mqMessage.getOrderTicketIds().get(x);
String orderTicketId = t.split(",")[0];
String uid = t.split(",")[1];
KylinOrderTicketVo vo = kylinUtils.getOrderTicketVo(orderTicketId);
vo.setStatus(2);
redisUtil.set("kylin:order:id:" + orderTicketId, vo);
kylinUtils.resetOrderListVo(uid, 2, orderTicketId, vo);
// redis 限购
for (int i = 0; i < vo.getEntitiesVoList().size(); i++) {
KylinOrderTicketEntitiesVo items = vo.getEntitiesVoList().get(i);
kylinUtils.changeBuyInfo(items.getUserId(), items.getEnterIdCode(), items.getPerformanceId(), items.getTicketId(), -1);
}
}
return true;
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false;
}
}
protected abstract String getRedisStreamKey();
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.service.IBaseDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
@Slf4j
public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
private IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.debug("CONSUMER SQL RESULT:{}", result);
// 消费成功确认,消息删除和消息确认是一个事务
if (result) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId());
try {
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SMS SUCC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
}
}
}
private boolean consumerSqlDaoHandler(String msg) {
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
return baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false;
}
}
protected abstract String getRedisStreamKey();
}
\ No newline at end of file
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSmsNoticeRdsReceiver extends AbstractSmsRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SMS_NOTICE.getKey();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSqlOptOrderCloseRedisReceiver extends AbstractSqlOptOrderCloseRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SQL_ORDER_CLOSE.getKey();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSqlOrderAgainRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return null;
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSqlOrderCreateRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return null;
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSqlOrderOvertimeRefundRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return null;
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSqlOrderPayRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return null;
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSqlOrderRefundRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return null;
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSqlOrderWithdrawRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return null;
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSqlPerformanceLackRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return null;
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import org.springframework.stereotype.Component;
@Component
public class ConsumerKylinSqlStationRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return null;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment