记得上下班打卡 | git大法好,push需谨慎

Commit 83061f16 authored by 胡佳晨's avatar 胡佳晨

脚本 消费 和 订单回调 修改

parent 360e4363
......@@ -6,8 +6,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
public class SqlMapping {
......@@ -35,6 +38,7 @@ public class SqlMapping {
private static final long serialVersionUID = 2208924091512163151L;
private LinkedList<String> sqls;
private LinkedList<Object[]>[] args;
private String redisKey;
private SqlMessage() {
}
......@@ -61,6 +65,14 @@ public class SqlMapping {
this.args = args;
}
public String getRedisKey() {
return redisKey;
}
public void setRedisKey(String redisKey) {
this.redisKey = redisKey;
}
private final static SqlMapping.SqlMessage instance = new SqlMapping.SqlMessage();
public static SqlMapping.SqlMessage getInstance() {
......@@ -106,4 +118,10 @@ public class SqlMapping {
sqlMessage.setArgs(paramsList);
return JsonUtils.toJson(sqlMessage);
}
public static String getSqlRedis(String redisKey) {
SqlMapping.SqlMessage sqlMessage = SqlMapping.SqlMessage.getInstance();
sqlMessage.setRedisKey(redisKey);
return JsonUtils.toJson(sqlMessage);
}
}
......@@ -31,6 +31,11 @@
<artifactId>liquidnet-common-cache-redisson</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-cache-redis</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
......
......@@ -21,6 +21,14 @@ public interface IBaseDao {
*/
Boolean batchSqls(LinkedList<String> sql, LinkedList<Object[]>... values);
/**
* 执行sql语句 无 参数
*
* @param sql
* @return
*/
Boolean batchSqlNoArgs(LinkedList<String> sql);
/**
* xs 新增一条记录且返回主键Id
*
......
......@@ -82,6 +82,26 @@ public class BaseDao implements IBaseDao {
}
}
@Override
public Boolean batchSqlNoArgs(final LinkedList<String> sql) {
try {
TransactionCallback<Boolean> callback = new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(final TransactionStatus transactionStatus) {
for (String o : sql) {
jdbcTemplate.execute(o);
}
return true;
}
};
TransactionTemplate tt = new TransactionTemplate(transactionManager);
return tt.execute(callback);
} catch (Exception ex) {
log.error("###Error.Sqls:{}\nParameters:{},Ex:{}", sql);
return false;
}
}
/**
* xs 新增一条记录且返回主键Id
......
package com.liquidnet.service.consumer.service.processor;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
......@@ -10,11 +11,14 @@ import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
/**
* ConsumerProcessor.class
......@@ -27,6 +31,8 @@ import java.io.IOException;
public class ConsumerProcessor {
@Resource
IBaseDao baseDao;
@Autowired
private RedisUtil redisUtil;
// @RabbitListener(bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL), key = MQConst.ROUTING_KEY_SQL,
......@@ -73,6 +79,25 @@ public class ConsumerProcessor {
}
}
// 处理长sql语句
private void consumerSqlDaoHandlerLongSql(Message msg, Channel channel) {
String redisKey = new String(msg.getBody());
log.debug("consumer ==> redisKey:{}", redisKey);
try {
LinkedList<String> sqlList = (LinkedList<String>) redisUtil.get(redisKey);
Boolean rstBatchSqls = baseDao.batchSqlNoArgs(sqlList);
log.debug("consumer sql result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} else {
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
}
} catch (IOException e) {
log.error("error:consumer sql:Channel.msg.tag:{}", msg.getMessageProperties().getDeliveryTag(), e);
}
}
// 用户注册
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER), key = MQConst.RK_SQL_UREGISTER,
......@@ -118,6 +143,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_CREATE ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE), key = MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CREATE)
......@@ -126,6 +152,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_CREATE ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE), key = MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CREATE)
......@@ -134,6 +161,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_CREATE ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE), key = MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CREATE)
......@@ -142,6 +170,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_CREATE ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE), key = MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CREATE)
......@@ -150,6 +179,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_CREATE ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE), key = MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CREATE)
......@@ -158,6 +188,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_CREATE ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE), key = MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CREATE)
......@@ -166,6 +197,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_CREATE ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE), key = MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CREATE)
......@@ -174,6 +206,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_CREATE ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE), key = MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CREATE)
......@@ -182,6 +215,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_CREATE ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CREADE), key = MQConst.ROUTING_KEY_SQL_ORDER_CREATE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CREATE)
......@@ -208,79 +242,88 @@ public class ConsumerProcessor {
))
public void consumerOrderClose(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE), key = MQConst.ROUTING_KEY_SQL_ORDER_CLOSE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CLOSE)
))
public void consumerOrderClose1(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE), key = MQConst.ROUTING_KEY_SQL_ORDER_CLOSE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CLOSE)
))
public void consumerOrderClose2(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE), key = MQConst.ROUTING_KEY_SQL_ORDER_CLOSE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CLOSE)
))
public void consumerOrderClose3(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE), key = MQConst.ROUTING_KEY_SQL_ORDER_CLOSE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CLOSE)
))
public void consumerOrderClose4(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE), key = MQConst.ROUTING_KEY_SQL_ORDER_CLOSE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CLOSE)
))
public void consumerOrderClose5(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE), key = MQConst.ROUTING_KEY_SQL_ORDER_CLOSE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CLOSE)
))
public void consumerOrderClose6(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE), key = MQConst.ROUTING_KEY_SQL_ORDER_CLOSE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CLOSE)
))
public void consumerOrderClose7(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE), key = MQConst.ROUTING_KEY_SQL_ORDER_CLOSE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CLOSE)
))
public void consumerOrderClose8(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE), key = MQConst.ROUTING_KEY_SQL_ORDER_CLOSE,
value = @Queue(MQConst.QUEUES_SQL_ORDER_CLOSE)
))
public void consumerOrderClose9(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_CLOSE ===");
this.consumerSqlDaoHandler(msg, channel);
this.consumerSqlDaoHandlerLongSql(msg, channel);
}
......@@ -293,6 +336,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
......@@ -301,6 +345,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
......@@ -309,6 +354,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
......@@ -317,6 +363,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
......@@ -325,6 +372,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
......@@ -333,6 +381,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
......@@ -341,6 +390,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
......@@ -349,6 +399,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
......@@ -356,7 +407,9 @@ public class ConsumerProcessor {
public void consumerOrderPay8(Message msg, Channel channel) {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}@RabbitListener(bindings = @QueueBinding(
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
))
......@@ -364,6 +417,7 @@ public class ConsumerProcessor {
log.info("=== CONSUMER_ORDER_PAY ===");
this.consumerSqlDaoHandler(msg, channel);
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_PAY), key = MQConst.ROUTING_KEY_SQL_ORDER_PAY,
value = @Queue(MQConst.QUEUES_SQL_ORDER_PAY)
......
......@@ -156,7 +156,7 @@ public class KylinOrderTicketsServiceImpl extends ServiceImpl<KylinOrderTicketsM
//redisLockUtil.unlock(lock);
return ResponseDto.failure(ErrorMapping.get("20004"));//参数错误
}
if (payOrderParam.getNumber()%ticketData.getCounts()!=0) {
if (payOrderParam.getNumber() % ticketData.getCounts() != 0) {
//redisLockUtil.unlock(lock);
return ResponseDto.failure(ErrorMapping.get("20007"));//数量错误
}
......@@ -1083,20 +1083,23 @@ public class KylinOrderTicketsServiceImpl extends ServiceImpl<KylinOrderTicketsM
}
currentTime = System.currentTimeMillis() - currentTime;
log.debug("redis 库存 -> time:" + (currentTime) + "毫秒");
//mysql
LinkedList<String> sqls = new LinkedList<>();
String orderStr = "";
log.debug("orderIdList -> size() :" + orderIdList.size());
for (int i = 0; i < orderIdList.size(); i++) {
String orderTicketId = orderIdList.get(i).split(",")[0];
orderStr += "'"+orderTicketId + "',";
orderStr += "'" + orderTicketId + "',";
}
log.debug("orderStr -> STR :" + orderStr.toString());
orderStr = orderStr.substring(0, orderStr.length() - 1);
LinkedList<String> sqls = new LinkedList();
sqls.add("UPDATE kylin_order_tickets SET updated_at = '" + now + "' WHERE order_tickets_id in (" + orderStr + ") ");
sqls.add("UPDATE kylin_order_ticket_status SET `status` = 2,updated_at = '" + now + "' WHERE order_id in (" + orderStr + ") ");
sqls.add("UPDATE kylin_order_ticket_relations SET updated_at = '" + now + "' WHERE order_id in (" + orderStr + ")");
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE, MQConst.ROUTING_KEY_SQL_ORDER_CLOSE, SqlMapping.gets(sqls));
String redisKey = "kylin:ORDER_CLOSE:" + IDGenerator.nextMilliId();
redisUtil.set(redisKey, sqls);
rabbitTemplate.convertAndSend(MQConst.EXCHANGES_LIQUIDNET_SQL_ORDER_CLOSE, MQConst.ROUTING_KEY_SQL_ORDER_CLOSE, sqls);
currentTime = System.currentTimeMillis() - currentTime;
log.debug("mysql -> time:" + (currentTime) + "毫秒");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment