记得上下班打卡 | git大法好,push需谨慎

Commit d941ca9c authored by 张国柄's avatar 张国柄

fix:mq;

parent a41f781b
......@@ -3,12 +3,12 @@ package com.liquidnet.common.mq.constant;
public class MQConst {
public static final String EXCHANGES_LIQUIDNET_SQL = "liquidnet.sql";
public static final String ROUTING_KEY_SQL = "sql";
public static final String QUEUES_SQL_MAIN = "queue.sql.main";
public static final String ROUTING_KEY_SQL_KYLIN_STATION = "sql.kylin.station";
public static final String QUEUES_SQL_KYLIN_STATION = "queue.sql.kylin.station";
//缺票登记
public static final String EXCHANGES_LIQUIDNET_SQL_PERFORMANCE_LACK = "liquidnet.sql.performance.lack";
public static final String ROUTING_KEY_SQL_PERFORMANCE_LACK = "performance.lack";
......
......@@ -6,6 +6,9 @@ import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.service.IBaseDao;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
......@@ -25,17 +28,12 @@ public class ConsumerProcessor {
@Resource
IBaseDao baseDao;
// @RabbitListener(bindings = @QueueBinding(
// exchange = @Exchange("queue.sql"),
// key = "rk",
// value = @Queue("queue.sql.main")
// ))
// @RabbitHandler
// public void consumerSql(Message message) {
// log.info("consumer sql:{}", message);
// }
@RabbitListener(queues = MQConst.QUEUES_SQL_MAIN)
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL),
key = MQConst.ROUTING_KEY_SQL,
value = @Queue(MQConst.QUEUES_SQL_MAIN)
))
// @RabbitListener(queues = MQConst.QUEUES_SQL_MAIN)
public void consumerSql(Message msg, Channel channel) {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(new String(msg.getBody()), SqlMapping.SqlMessage.class);
log.debug("consumer sql ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
......
package com.liquidnet.service.kylin.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.DateUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.kylin.dto.param.KylinStationCheckOrderParam;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketEntitiesVo;
import com.liquidnet.service.kylin.entity.KylinOrderTicketEntities;
......@@ -83,10 +85,7 @@ public class KylinOrderTicketEntitiesServiceImpl extends ServiceImpl<KylinOrderT
BulkWriteResult bulkWriteResult = mongoTemplate.getCollection(KylinOrderTicketEntitiesVo.class.getSimpleName()).bulkWrite(list);
log.info("bulkWriteResult:{}", JsonUtils.toJson(bulkWriteResult));
// TODO: 2021/6/1 sql to mq
// rabbitTemplate.convertSendAndReceive(MQConst.EXCHANGES_LIQUIDNET_SQL, MQConst.ROUTING_KEY_SQL,
// SqlMapping.get("kylin_order_ticket_entities.updateStatusByStation", paramsList));
rabbitTemplate.convertSendAndReceive(MQConst.EXCHANGES_LIQUIDNET_SQL, MQConst.ROUTING_KEY_SQL_KYLIN_STATION,
SqlMapping.get("kylin_order_ticket_entities.updateStatusByStation", paramsList));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment