记得上下班打卡 | git大法好,push需谨慎

Commit bc16895e authored by 胡佳晨's avatar 胡佳晨

修改 mq

parent a5b3cb90
package com.liquidnet.service.kylin.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.CurrentUtil;
import com.liquidnet.commons.lang.util.IDGenerator;
import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.entity.KylinLackRegisters;
import com.liquidnet.service.kylin.mapper.KylinLackRegistersMapper;
import com.liquidnet.service.kylin.service.IKylinLackRegistersService;
import com.liquidnet.service.kylin.utils.QueueUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -37,8 +34,7 @@ public class KylinLackRegistersServiceImpl implements IKylinLackRegistersService
lackRegisters.setUserMobile(StringUtils.defaultString(((String) token.get("mobile")), ""));
lackRegisters.setIpAddress(CurrentUtil.getCliIpAddr());
lackRegisters.setCreatedAt(LocalDateTime.now());
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_PERFORMANCE_LACK, MQConst.ROUTING_KEY_SQL_PERFORMANCE_LACK,
SqlMapping.get("kylin_lack_register.insert", lackRegisters.getInsertObj()));
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_PERFORMANCE_LACK.getKey(),SqlMapping.get("kylin_lack_register.insert", lackRegisters.getInsertObj()));
return ResponseDto.success("登记成功");
}catch (Exception e){
e.printStackTrace();
......
package com.liquidnet.service.kylin.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.DateUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.dto.param.KylinStationUploadParam;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketEntitiesVo;
import com.liquidnet.service.kylin.entity.KylinOrderTicketEntities;
import com.liquidnet.service.kylin.mapper.KylinOrderTicketEntitiesMapper;
import com.liquidnet.service.kylin.service.IKylinOrderTicketEntitiesService;
import com.liquidnet.service.kylin.utils.QueueUtils;
import com.mongodb.bulk.BulkWriteResult;
......@@ -16,7 +13,6 @@ import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
......@@ -82,8 +78,7 @@ public class KylinOrderTicketEntitiesServiceImpl implements IKylinOrderTicketEn
});
BulkWriteResult bulkWriteResult = mongoTemplate.getCollection(KylinOrderTicketEntitiesVo.class.getSimpleName()).bulkWrite(list);
log.info("bulkWriteResult:{}", JsonUtils.toJson(bulkWriteResult));
queueUtils.sendSqlRabbit(MQConst.EX_LNS_SQL_STATION, MQConst.RK_SQL_STATION,
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_STATION.getKey(),
SqlMapping.get("kylin_order_ticket_entities.updateStatusByStation", paramsList));
}
}
......@@ -2,7 +2,6 @@ package com.liquidnet.service.kylin.service.impl;
import com.alibaba.fastjson.JSON;
import com.github.pagehelper.PageInfo;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.CurrentUtil;
import com.liquidnet.commons.lang.util.DateUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
......@@ -10,6 +9,7 @@ import com.liquidnet.service.base.ErrorMapping;
import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.UserPathDto;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.constant.KylinTableStatusConst;
import com.liquidnet.service.kylin.dto.vo.middle.KylinTicketTimesVo;
import com.liquidnet.service.kylin.dto.vo.middle.KylinTicketVo;
......@@ -29,7 +29,6 @@ import com.mongodb.BasicDBObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.bson.Document;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
......@@ -538,8 +537,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsService {
sqls.add(SqlMapping.get("kylin_order_ticket_status.withDraw"));
sqls.add(SqlMapping.get("kylin_order_ticket_entities.withDraw"));
sqls.add(SqlMapping.get("kylin_order_refund.withDraw"));
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_WITHDRAW, MQConst.ROUTING_KEY_SQL_ORDER_WITHDRAW,
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_ORDER_WITHDRAW.getKey(),
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC));
return ResponseDto.success(true);
} catch (Exception e) {
......
package com.liquidnet.service.kylin.service.impl;
import com.alibaba.fastjson.JSON;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.*;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.constant.KylinTableStatusConst;
import com.liquidnet.service.kylin.dto.vo.mongo.*;
import com.liquidnet.service.kylin.dto.vo.returns.KylinOrderRefundsVo;
......@@ -15,7 +15,6 @@ import com.mongodb.BasicDBObject;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
......@@ -189,7 +188,7 @@ public class KylinRefundsStatusServiceImpl {
kylinOrderRefunds.getRefundCate(), kylinOrderRefunds.getCreatedAt()
});
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_OVERTIME_REFUND, MQConst.ROUTING_KEY_SQL_ORDER_OVERTIME_REFUND,
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_ORDER_OVERTIME_REFUND.getKey(),
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD));
return true;
......@@ -337,8 +336,7 @@ public class KylinRefundsStatusServiceImpl {
orderRefundPic.getOrderRefundsId(), orderRefundPic.getOrderRefundsId(), orderRefundPic.getPicUrl(), orderRefundPic.getCreatedAt()
});
//TODO 生成新QUERY
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_REFUND, MQConst.ROUTING_KEY_SQL_ORDER_REFUND,
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_ORDER_REFUND.getKey(),
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD, sqlsDataE));
return kylinOrderRefunds.getOrderRefundsId();
......
package com.liquidnet.service.order.service.impl;
import com.alibaba.fastjson.JSON;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.common.cache.redisson.util.RedisLockUtil;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.common.sms.constant.SmsEnum;
import com.liquidnet.commons.lang.util.*;
import com.liquidnet.service.adam.dto.vo.AdamAddressesVo;
import com.liquidnet.service.adam.dto.vo.AdamEntersVo;
import com.liquidnet.service.base.*;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.constant.KylinRedisConst;
import com.liquidnet.service.kylin.constant.KylinTableStatusConst;
import com.liquidnet.service.kylin.dto.param.PayAgainParam;
......@@ -28,14 +27,11 @@ import com.liquidnet.service.kylin.entity.KylinOrderTicketStatus;
import com.liquidnet.service.kylin.entity.KylinOrderTickets;
import com.liquidnet.service.kylin.service.IKylinOrderTicketsOrderService;
import com.liquidnet.service.order.utils.*;
import com.mongodb.BasicDBObject;
import com.taobao.api.TaobaoClient;
import com.taobao.api.request.AlibabaDamaiMevOpenBatchpushticketRequest;
import com.taobao.api.response.AlibabaDamaiMevOpenBatchpushticketResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -588,7 +584,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
// 执行sql
String sqlData = SqlMapping.gets(sqls, sqlsDataB, sqlsDataC, sqlsDataD, sqlsDataA);
currentTime = System.currentTimeMillis();
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE, MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_ORDER_CREATE.getKey(),
sqlData);
currentTime = System.currentTimeMillis() - currentTime;
log.debug("MQ 发送 -> time:" + (currentTime) + "毫秒");
......@@ -711,7 +707,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
sqlsDataD.add(objectD);
String sqlData = SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD);
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_AGAIN, MQConst.ROUTING_KEY_SQL_ORDER_AGAIN, sqlData);
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_ORDER_AGAIN.getKey(), sqlData);
log.info(UserPathDto.setData("再次支付", payAgainParam, payResultVo.getData()));
return ResponseDto.success(payResultVo.getData());
}
......@@ -833,7 +829,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
ObjectUtil.cloneBasicDBObject().append("$set", mongoConverter.convertToMongoType(orderTicketEntitiesVo))
);
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY, MQConst.ROUTING_KEY_SQL_ORDER_PAY,
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_ORDER_PAY.getKey(),
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD));
//生成vo redis
......@@ -871,7 +867,7 @@ public class KylinOrderTicketsServiceImpl implements IKylinOrderTicketsOrderServ
}
if (null != adTemplate) {
queueUtils.sendSqlRabbit(MQConst.EX_LNS_SMS_SENDER, MQConst.RK_SMS_NOTICE,
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SMS_NOTICE.getKey(),
SmsMessage.builder().setPhone(orderTicketData.getUserMobile())
.setSignName(SmsEnum.ADSignName.M02.getVal())
.setTemplateCode(adTemplate.name())
......
package com.liquidnet.service.order.service.impl;
import com.alibaba.fastjson.JSON;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.DateUtil;
import com.liquidnet.commons.lang.util.IDGenerator;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.constant.KylinTableStatusConst;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderRefundEntitiesVo;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderRefundPicVo;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketEntitiesVo;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo;
import com.liquidnet.service.kylin.dto.vo.returns.KylinOrderRefundsVo;
......@@ -19,10 +18,8 @@ import com.liquidnet.service.order.utils.ObjectUtil;
import com.liquidnet.service.order.utils.OrderUtils;
import com.liquidnet.service.order.utils.QueueUtils;
import com.mongodb.BasicDBObject;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
......@@ -219,7 +216,7 @@ public class KylinRefundsStatusServiceImpl {
objectC[12]=kylinOrderRefunds.getCreatedAt();
sqlsDataC.add(objectC);
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_OVERTIME_REFUND, MQConst.ROUTING_KEY_SQL_ORDER_OVERTIME_REFUND,
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_ORDER_OVERTIME_REFUND.getKey(),
SqlMapping.gets(sqls, sqlsDataA, sqlsDataB, sqlsDataC, sqlsDataD,sqlsDataE));
return true;
......
......@@ -29,16 +29,14 @@ public class QueueUtils {
}
/**
* 给 REDIS 队列发送消息 数据库相关
* 发送消息 - REDIS
*
* @param redisKey RedisKey 消费Key
* @param sqlStr Json字符串
* @return
* @param streamKey Redis消费Key
* @param jsonMsg Json字符串
*/
public void sendSqlRedis(String redisKey, String sqlStr) {
HashMap<String, String> map = ObjectUtil.cloneHashMapStringAndString();
map.put("message", sqlStr);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(redisKey);
stringRedisTemplate.opsForStream().add(record);
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = new HashMap<>();
map.put("message", jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
}
......@@ -2,9 +2,9 @@ package com.liquidnet.service.platform.service.impl.kylin;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.DateUtil;
import com.liquidnet.service.base.OrderCloseMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.constant.KylinTableStatusConst;
import com.liquidnet.service.kylin.dao.OrderScriptDto;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo;
......@@ -136,7 +136,7 @@ public class DMCheckOrderTimeImpl extends ServiceImpl<KylinOrderTicketsMapper, K
}
}
log.debug("MQLIST SIZE = " + mqList.size());
queueUtils.sendSqlRabbit(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE, MQConst.ROUTING_KEY_SQL_ORDER_CLOSE, OrderCloseMapping.get(mqList));
queueUtils.sendMsgByRedis(MQConst.KylinQueue.SQL_ORDER_CLOSE.getKey(), OrderCloseMapping.get(mqList));
}
}
return true;
......
......@@ -28,17 +28,16 @@ public class QueueUtils {
rabbitTemplate.convertAndSend(exchange, route, sqlStr);
}
/**
* 给 REDIS 队列发送消息 数据库相关
* 发送消息 - REDIS
*
* @param redisKey RedisKey 消费Key
* @param sqlStr Json字符串
* @return
* @param streamKey Redis消费Key
* @param jsonMsg Json字符串
*/
public void sendSqlRedis(String redisKey, String sqlStr) {
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = new HashMap<>();
map.put("message", sqlStr);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(redisKey);
stringRedisTemplate.opsForStream().add(record);
map.put("message", jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment