记得上下班打卡 | git大法好,push需谨慎

Commit dc12e159 authored by 胡佳晨's avatar 胡佳晨

封装 rabbit 和 redis 的mq

parent 948429c7
package com.liquidnet.common.mq.constant;
public class MQConst {
// public static final String EXCHANGES_LIQUIDNET_SQL = "liquidnet.sql";
// public static final String ROUTING_KEY_SQL = "sql";
// public static final String QUEUES_SQL_MAIN = "queue.sql.main";
/* -------------------------------------------------------- | 短信中心 */
// 短信发送
public static final String EX_LNS_SMS_SENDER = "lns.sms.sender";
// 验证码
// public static final String RK_SMS_CODE = "sms.code";
// public static final String QUEUES_SMS_CODE = "queue.sms.code";
// 通知
public static final String RK_SMS_NOTICE = "sms.notice";
public static final String QUEUES_SMS_NOTICE = "queue.sms.notice";
......
package com.liquidnet.service.adam.util;
import com.liquidnet.common.exception.LiquidnetServiceException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@Component
public class QueueUtils {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
StringRedisTemplate stringRedisTemplate;
/**
* 发送 SqlMapping Json 字符串
*
* @param exchange 交换机
* @param route 路径
* @param sqlStr Json字符串
*/
public void sendSqlRabbit(String exchange, String route, String sqlStr) {
rabbitTemplate.convertAndSend(exchange, route, sqlStr);
}
/**
* 给 REDIS 队列发送消息 数据库相关
*
* @param redisKey RedisKey 消费Key
* @param sqlStr Json字符串
* @return
*/
public void sendSqlRedis(String redisKey, String sqlStr) {
HashMap<String, String> map = new HashMap<>();
map.put("message", sqlStr);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(redisKey);
stringRedisTemplate.opsForStream().add(record);
}
}
......@@ -9,6 +9,7 @@ import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.kylin.entity.KylinLackRegisters;
import com.liquidnet.service.kylin.mapper.KylinLackRegistersMapper;
import com.liquidnet.service.kylin.service.IKylinLackRegistersService;
import com.liquidnet.service.kylin.utils.QueueUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -21,7 +22,7 @@ import java.util.Map;
public class KylinLackRegistersServiceImpl implements IKylinLackRegistersService {
@Autowired
private RabbitTemplate rabbitTemplate;
private QueueUtils queueUtils;
@Override
public ResponseDto<String> addLackRegister(String performanceId, String ticketId) {
......@@ -36,7 +37,7 @@ public class KylinLackRegistersServiceImpl implements IKylinLackRegistersService
lackRegisters.setUserMobile(StringUtils.defaultString(((String) token.get("mobile")), ""));
lackRegisters.setIpAddress(CurrentUtil.getCliIpAddr());
lackRegisters.setCreatedAt(LocalDateTime.now());
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_PERFORMANCE_LACK, MQConst.ROUTING_KEY_SQL_PERFORMANCE_LACK,
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_PERFORMANCE_LACK, MQConst.ROUTING_KEY_SQL_PERFORMANCE_LACK,
SqlMapping.get("kylin_lack_register.insert", lackRegisters.getInsertObj()));
return ResponseDto.success("登记成功");
}catch (Exception e){
......
......@@ -10,6 +10,7 @@ import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketEntitiesVo;
import com.liquidnet.service.kylin.entity.KylinOrderTicketEntities;
import com.liquidnet.service.kylin.mapper.KylinOrderTicketEntitiesMapper;
import com.liquidnet.service.kylin.service.IKylinOrderTicketEntitiesService;
import com.liquidnet.service.kylin.utils.QueueUtils;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
......@@ -41,7 +42,7 @@ public class KylinOrderTicketEntitiesServiceImpl implements IKylinOrderTicketEn
@Autowired
MongoTemplate mongoTemplate;
@Autowired
RabbitTemplate rabbitTemplate;
QueueUtils queueUtils;
@Override
// @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
......@@ -82,7 +83,7 @@ public class KylinOrderTicketEntitiesServiceImpl implements IKylinOrderTicketEn
BulkWriteResult bulkWriteResult = mongoTemplate.getCollection(KylinOrderTicketEntitiesVo.class.getSimpleName()).bulkWrite(list);
log.info("bulkWriteResult:{}", JsonUtils.toJson(bulkWriteResult));
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_STATION, MQConst.RK_SQL_STATION,
queueUtils.sendSqlRabbit(MQConst.EX_LNS_SQL_STATION, MQConst.RK_SQL_STATION,
SqlMapping.get("kylin_order_ticket_entities.updateStatusByStation", paramsList));
}
}
......@@ -24,6 +24,7 @@ import com.liquidnet.service.kylin.entity.KylinOrderTicketStatus;
import com.liquidnet.service.kylin.service.IKylinOrderTicketsService;
import com.liquidnet.service.kylin.utils.DataUtils;
import com.liquidnet.service.kylin.utils.OrderUtils;
import com.liquidnet.service.kylin.utils.QueueUtils;
import com.mongodb.BasicDBObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
......@@ -60,7 +61,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsService {
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
private QueueUtils queueUtils;
@Autowired
private KylinRefundsStatusServiceImpl refundsStatusService;
......@@ -538,7 +539,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsService {
sqls.add(SqlMapping.get("kylin_order_ticket_entities.withDraw"));
sqls.add(SqlMapping.get("kylin_order_refund.withDraw"));
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_WITHDRAW, MQConst.ROUTING_KEY_SQL_ORDER_WITHDRAW,
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_WITHDRAW, MQConst.ROUTING_KEY_SQL_ORDER_WITHDRAW,
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC));
return ResponseDto.success(true);
} catch (Exception e) {
......
......@@ -10,6 +10,7 @@ import com.liquidnet.service.kylin.dto.vo.returns.KylinOrderRefundsVo;
import com.liquidnet.service.kylin.entity.*;
import com.liquidnet.service.kylin.utils.DataUtils;
import com.liquidnet.service.kylin.utils.OrderUtils;
import com.liquidnet.service.kylin.utils.QueueUtils;
import com.mongodb.BasicDBObject;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
......@@ -48,7 +49,7 @@ public class KylinRefundsStatusServiceImpl {
@Autowired
private OrderUtils orderUtils;
@Autowired
private RabbitTemplate rabbitTemplate;
private QueueUtils queueUtils;
public Boolean orderTicketRefunding(
KylinOrderTicketVo orderInfo, String orderTicketsId,
......@@ -141,7 +142,7 @@ public class KylinRefundsStatusServiceImpl {
/*int rows = kylinOrderRefundsMapper.insert(kylinOrderRefunds);*/
KylinOrderRefundsVo orderRefundsVo = new KylinOrderRefundsVo();
BeanUtils.copyProperties(kylinOrderRefunds,orderRefundsVo);
BeanUtils.copyProperties(kylinOrderRefunds, orderRefundsVo);
mongoTemplate.insert(orderRefundsVo, KylinOrderRefundsVo.class.getSimpleName());
// 退款入场人表
......@@ -162,8 +163,8 @@ public class KylinRefundsStatusServiceImpl {
/*int rowsR = kylinOrderRefundsEntitiesMapper.insert(kylinOrderRefundEntities);*/
KylinOrderRefundEntitiesVo orderRefundEntitiesVo = new KylinOrderRefundEntitiesVo();
BeanUtils.copyProperties(kylinOrderRefundEntities,orderRefundEntitiesVo);
mongoTemplate.insert(orderRefundEntitiesVo,KylinOrderRefundEntitiesVo.class.getSimpleName());
BeanUtils.copyProperties(kylinOrderRefundEntities, orderRefundEntitiesVo);
mongoTemplate.insert(orderRefundEntitiesVo, KylinOrderRefundEntitiesVo.class.getSimpleName());
}
//mq更改数据库
......@@ -188,7 +189,7 @@ public class KylinRefundsStatusServiceImpl {
kylinOrderRefunds.getRefundCate(), kylinOrderRefunds.getCreatedAt()
});
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_OVERTIME_REFUND, MQConst.ROUTING_KEY_SQL_ORDER_OVERTIME_REFUND,
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_OVERTIME_REFUND, MQConst.ROUTING_KEY_SQL_ORDER_OVERTIME_REFUND,
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD));
return true;
......@@ -268,13 +269,12 @@ public class KylinRefundsStatusServiceImpl {
kylinOrderRefunds.setCreatedAt(time);
KylinOrderRefundsVo orderRefundsVo = new KylinOrderRefundsVo();
BeanUtils.copyProperties(kylinOrderRefunds,orderRefundsVo);
BeanUtils.copyProperties(kylinOrderRefunds, orderRefundsVo);
orderRefundsVo.setCreatedAt(time);
orderRefundsVo.setApplicantAt(time);
mongoTemplate.insert(orderRefundsVo, KylinOrderRefundsVo.class.getSimpleName());
// 退款入场人表
KylinOrderRefundEntities kylinOrderRefundEntities = new KylinOrderRefundEntities();
String orderRefundsEntitiesId = IDGenerator.nextSnowId();
......@@ -285,9 +285,9 @@ public class KylinRefundsStatusServiceImpl {
kylinOrderRefundEntities.setCreatedAt(time);
KylinOrderRefundEntitiesVo orderRefundEntitiesVo = new KylinOrderRefundEntitiesVo();
BeanUtils.copyProperties(kylinOrderRefundEntities,orderRefundEntitiesVo);
BeanUtils.copyProperties(kylinOrderRefundEntities, orderRefundEntitiesVo);
orderRefundEntitiesVo.setCreatedAt(strTime);
mongoTemplate.insert(orderRefundEntitiesVo,KylinOrderRefundEntitiesVo.class.getSimpleName());
mongoTemplate.insert(orderRefundEntitiesVo, KylinOrderRefundEntitiesVo.class.getSimpleName());
//退款图片
KylinOrderRefundPic orderRefundPic = new KylinOrderRefundPic();
......@@ -297,9 +297,9 @@ public class KylinRefundsStatusServiceImpl {
orderRefundPic.setCreatedAt(time);
KylinOrderRefundPicVo orderRefundPicVo = new KylinOrderRefundPicVo();
BeanUtils.copyProperties(orderRefundPic,orderRefundPicVo);
BeanUtils.copyProperties(orderRefundPic, orderRefundPicVo);
orderRefundPicVo.setCreatedAt(strTime);
mongoTemplate.insert(orderRefundPicVo,KylinOrderRefundPicVo.class.getSimpleName());
mongoTemplate.insert(orderRefundPicVo, KylinOrderRefundPicVo.class.getSimpleName());
dataUtils.delOrderRefundVoByOrderId(orderInfo.getOrderTicketsId());
//MQ
......@@ -308,7 +308,7 @@ public class KylinRefundsStatusServiceImpl {
LinkedList<Object[]> sqlsDataB = new LinkedList<>();
LinkedList<Object[]> sqlsDataC = new LinkedList<>();
LinkedList<Object[]> sqlsDataD = new LinkedList<>();
LinkedList <Object[]> sqlsDataE = new LinkedList<>();
LinkedList<Object[]> sqlsDataE = new LinkedList<>();
sqls.add(SqlMapping.get("kylin_order_ticket_status.refund"));
sqls.add(SqlMapping.get("kylin_order_ticket_entities.refund"));
......@@ -338,11 +338,11 @@ public class KylinRefundsStatusServiceImpl {
});
//TODO 生成新QUERY
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_REFUND, MQConst.ROUTING_KEY_SQL_ORDER_REFUND,
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_REFUND, MQConst.ROUTING_KEY_SQL_ORDER_REFUND,
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD, sqlsDataE));
return kylinOrderRefunds.getOrderRefundsId();
}catch (Exception e){
} catch (Exception e) {
e.printStackTrace();
return "";
}
......
package com.liquidnet.service.kylin.utils;
import com.liquidnet.common.exception.LiquidnetServiceException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@Component
public class QueueUtils {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
StringRedisTemplate stringRedisTemplate;
/**
* 发送 SqlMapping Json 字符串
*
* @param exchange 交换机
* @param route 路径
* @param sqlStr Json字符串
*/
public void sendSqlRabbit(String exchange, String route, String sqlStr) {
rabbitTemplate.convertAndSend(exchange, route, sqlStr);
}
/**
* 给 REDIS 队列发送消息 数据库相关
*
* @param redisKey RedisKey 消费Key
* @param sqlStr Json字符串
* @return
*/
public void sendSqlRedis(String redisKey, String sqlStr) {
HashMap<String, String> map = new HashMap<>();
map.put("message", sqlStr);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(redisKey);
stringRedisTemplate.opsForStream().add(record);
}
}
......@@ -27,10 +27,7 @@ import com.liquidnet.service.kylin.entity.KylinOrderTicketRelations;
import com.liquidnet.service.kylin.entity.KylinOrderTicketStatus;
import com.liquidnet.service.kylin.entity.KylinOrderTickets;
import com.liquidnet.service.kylin.service.IKylinOrderTicketsOrderService;
import com.liquidnet.service.order.utils.DataUtils;
import com.liquidnet.service.order.utils.ObjectUtil;
import com.liquidnet.service.order.utils.OrderUtils;
import com.liquidnet.service.order.utils.TaobaoTicketUtils;
import com.liquidnet.service.order.utils.*;
import com.mongodb.BasicDBObject;
import com.taobao.api.TaobaoClient;
import com.taobao.api.request.AlibabaDamaiMevOpenBatchpushticketRequest;
......@@ -87,7 +84,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
@Autowired
private MongoConverter mongoConverter;
@Autowired
private RabbitTemplate rabbitTemplate;
private QueueUtils queueUtils;
@Autowired
private TaobaoTicketUtils taobaoTicketUtils;
@Autowired
......@@ -107,9 +104,9 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
List<AdamEntersVo> entersVoList = ObjectUtil.cloneArrayListObject();
String uid = CurrentUtil.getCurrentUid();
String lock = "userId:" + uid;
// if (!redisLockUtil.tryLock(lock, 1, 5)) {
// return ResponseDto.failure(ErrorMapping.get("20023"));//参数错误
// }
if (!redisLockUtil.tryLock(lock, 1, 5)) {
return ResponseDto.failure(ErrorMapping.get("20023"));//参数错误
}
try {
currentTime = System.currentTimeMillis();
KylinPerformanceVo performanceData = dataUtils.getPerformanceVo(payOrderParam.getPerformanceId());
......@@ -591,7 +588,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
// 执行sql
String sqlData = SqlMapping.gets(sqls, sqlsDataB, sqlsDataC, sqlsDataD, sqlsDataA);
currentTime = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE, MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE, MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
sqlData);
currentTime = System.currentTimeMillis() - currentTime;
log.debug("MQ 发送 -> time:" + (currentTime) + "毫秒");
......@@ -714,7 +711,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
sqlsDataD.add(objectD);
String sqlData = SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD);
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_AGAIN, MQConst.ROUTING_KEY_SQL_ORDER_AGAIN, sqlData);
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_AGAIN, MQConst.ROUTING_KEY_SQL_ORDER_AGAIN, sqlData);
log.info(UserPathDto.setData("再次支付", payAgainParam, payResultVo.getData()));
return ResponseDto.success(payResultVo.getData());
}
......@@ -836,7 +833,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
ObjectUtil.cloneBasicDBObject().append("$set", mongoConverter.convertToMongoType(orderTicketEntitiesVo))
);
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY, MQConst.ROUTING_KEY_SQL_ORDER_PAY,
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY, MQConst.ROUTING_KEY_SQL_ORDER_PAY,
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD));
//生成vo redis
......@@ -875,7 +872,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
}
if (null != adTemplate) {
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SMS_SENDER, MQConst.RK_SMS_NOTICE,
queueUtils.sendSqlRabbit(MQConst.EX_LNS_SMS_SENDER, MQConst.RK_SMS_NOTICE,
SmsMessage.builder().setPhone(orderTicketData.getUserMobile())
.setSignName(SmsEnum.ADSignName.M02.getVal())
.setTemplateCode(adTemplate.name())
......
......@@ -17,6 +17,7 @@ import com.liquidnet.service.kylin.entity.*;
import com.liquidnet.service.order.utils.DataUtils;
import com.liquidnet.service.order.utils.ObjectUtil;
import com.liquidnet.service.order.utils.OrderUtils;
import com.liquidnet.service.order.utils.QueueUtils;
import com.mongodb.BasicDBObject;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
......@@ -55,7 +56,7 @@ public class KylinRefundsStatusServiceImpl {
@Autowired
private OrderUtils orderUtils;
@Autowired
private RabbitTemplate rabbitTemplate;
private QueueUtils queueUtils;
public Boolean orderTicketRefunding(
KylinOrderTicketVo orderInfo, String orderTicketsId,
......@@ -208,7 +209,7 @@ public class KylinRefundsStatusServiceImpl {
objectC[12]=kylinOrderRefunds.getCreatedAt();
sqlsDataC.add(objectC);
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_OVERTIME_REFUND, MQConst.ROUTING_KEY_SQL_ORDER_OVERTIME_REFUND,
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_OVERTIME_REFUND, MQConst.ROUTING_KEY_SQL_ORDER_OVERTIME_REFUND,
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD));
return true;
......
package com.liquidnet.service.order.utils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@Component
public class QueueUtils {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
StringRedisTemplate stringRedisTemplate;
/**
* 发送 SqlMapping Json 字符串
*
* @param exchange 交换机
* @param route 路径
* @param sqlStr Json字符串
*/
public void sendSqlRabbit(String exchange, String route, String sqlStr) {
rabbitTemplate.convertAndSend(exchange, route, sqlStr);
}
/**
* 给 REDIS 队列发送消息 数据库相关
*
* @param redisKey RedisKey 消费Key
* @param sqlStr Json字符串
* @return
*/
public void sendSqlRedis(String redisKey, String sqlStr) {
HashMap<String, String> map = ObjectUtil.cloneHashMapStringAndString();
map.put("message", sqlStr);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(redisKey);
stringRedisTemplate.opsForStream().add(record);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment