记得上下班打卡 | git大法好,push需谨慎

Commit 9ad67db3 authored by 张国柄's avatar 张国柄

~opt:queue.order.close;

parent e0ec05fd
...@@ -22,9 +22,11 @@ import org.springframework.data.redis.connection.stream.StreamRecords; ...@@ -22,9 +22,11 @@ import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map;
import static com.liquidnet.commons.lang.util.DateUtil.DTF_YMD_HMS; import static com.liquidnet.commons.lang.util.DateUtil.DTF_YMD_HMS;
...@@ -41,10 +43,10 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin ...@@ -41,10 +43,10 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER XLS [streamKey:{},messageId:{},stream:{},body:{}]", log.debug("CONSUMER ORDER_CLOSE [streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue()); this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerOrderCloseHandler(this.getRedisStreamKey(), message.getValue().get("message"), message.getValue().get("type")); boolean result = this.consumerOrderCloseHandler(message.getValue());
log.info("CONSUMER XLS_PATH RESULT:{} ==> MESSAGE_ID:{}", result, message.getId()); log.info("CONSUMER ORDER_CLOSE RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try { try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
...@@ -60,44 +62,33 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin ...@@ -60,44 +62,33 @@ public abstract class AbstractOrderCloseReceiver implements StreamListener<Strin
} }
} }
private int getGoblinOrderUnPayNum(String streamKey) { private boolean consumerOrderCloseHandler(Map<String, String> messageMap) {
String[] keyArr = streamKey.split(":");
return Integer.parseInt(keyArr[keyArr.length]);
}
private boolean consumerOrderCloseHandler(String streamKey, String message, String type) {
boolean aBoolean = false;
try { try {
int unPayNum = this.getGoblinOrderUnPayNum(streamKey); String orderCode = messageMap.get("id"), type = messageMap.get("type");
LocalDateTime now = LocalDateTime.now().minusMinutes(5); LocalDateTime createdAt = LocalDateTime.parse(messageMap.get("time"));
int currMin = now.getMinute() % 10, sleepMin = 1;
if (currMin == unPayNum) { long durationToMillis = Duration.between(createdAt.plusMinutes(5), LocalDateTime.now()).toMillis();
aBoolean = checkOrderTime(message, type);
if (durationToMillis > 0) {
return checkOrderTime(orderCode, type);
} else { } else {
try { try {
Thread.sleep(sleepMin * 60000L); Thread.sleep(Math.abs(durationToMillis));
consumerOrderCloseHandler(streamKey, message, type);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
return consumerOrderCloseHandler(messageMap);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("CONSUMER ORDER {} CLOSE FAIL ==> {}", type, e.getMessage(), e); log.error("CONSUMER ORDER_CLOSE FAIL ==> {}", e.getMessage(), e);
} finally { stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(messageMap).withStreamKey(this.getRedisStreamKey()));
if (!aBoolean) { return false;
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", message);
map.put("type", type);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
} }
return aBoolean;
} }
protected abstract String getRedisStreamKey(); protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup(); protected abstract String getRedisStreamGroup();
public boolean checkOrderTime(String valueData, String type) { public boolean checkOrderTime(String valueData, String type) {
LocalDateTime now = LocalDateTime.now(); LocalDateTime now = LocalDateTime.now();
if (type.equals("GOBLIN")) { if (type.equals("GOBLIN")) {
......
package com.liquidnet.service.order.utils; package com.liquidnet.service.order.utils;
import com.liquidnet.commons.lang.util.RandomUtil;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst; import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinIpAreaVo; import com.liquidnet.service.kylin.dto.vo.mongo.KylinIpAreaVo;
...@@ -87,9 +86,10 @@ public class QueueUtils { ...@@ -87,9 +86,10 @@ public class QueueUtils {
streamKey = MQConst.GoblinQueue.GOBLIN_UN_PAY_0.getKey(); streamKey = MQConst.GoblinQueue.GOBLIN_UN_PAY_0.getKey();
break; break;
} }
HashMap<String, String> map = ObjectUtil.cloneHashMapStringAndString(); HashMap<String, Object> map = ObjectUtil.cloneHashMapStringAndObject();
map.put("message", masterOrderCode); map.put("id", masterOrderCode);
map.put("type", "GOBLIN"); map.put("type", "GOBLIN");
map.put("time", createTime.toString());
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey)); stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment