记得上下班打卡 | git大法好,push需谨慎

Commit 98055842 authored by 张国柄's avatar 张国柄

~Consumer:kylin+goblin;

parent a4d85596
......@@ -104,7 +104,12 @@ public class TestAdam {
@Test
public void testTmp() {
String post = HttpUtil.post("http://ttestkylin.zhengzai.tv/kylin/inner/cache/member?uid=925802662655180800214832", CollectionUtil.linkedMultiValueMapStringString());
System.out.println("===" + post);
// String post = HttpUtil.post("http://ttestkylin.zhengzai.tv/kylin/inner/cache/member?uid=925802662655180800214832", CollectionUtil.linkedMultiValueMapStringString());
// System.out.println("===" + post);
LocalDateTime now = LocalDateTime.now();
for (int i = 0; i < 15; i++) {
System.out.println(now.getSecond() + "----" + now.getSecond()%10);
}
}
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinOrderCloseRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinOrderCloseRedisStreamConfig {
@Autowired
ConsumerGoblinOrderCloseRdsReceiver consumerGoblinOrderCloseRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinOrderClose(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getGroup(), MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCloseRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinOrderClose(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderClose(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderClose2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderClose(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderClose3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderClose(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.alibaba.excel.EasyExcel;
import com.alibaba.fastjson.JSON;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
......@@ -13,7 +11,6 @@ import com.liquidnet.service.goblin.dto.vo.GoblinOrderSkuVo;
import com.liquidnet.service.goblin.dto.vo.GoblinStoreOrderVo;
import com.mongodb.BasicDBObject;
import com.mongodb.client.result.UpdateResult;
import io.netty.util.internal.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
......@@ -25,8 +22,6 @@ import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.io.InputStream;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.LinkedList;
......@@ -46,7 +41,7 @@ public abstract class AbstractGoblinOrderCloseReceiver implements StreamListener
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER XLS [streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"), message.getValue().get("type"));
boolean result = this.consumerOrderCloseHandler(this.getRedisStreamKey(), message.getValue().get("message"), message.getValue().get("type"));
log.info("CONSUMER XLS_PATH RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try {
......@@ -63,10 +58,26 @@ public abstract class AbstractGoblinOrderCloseReceiver implements StreamListener
}
}
private boolean consumerSqlDaoHandler(String message, String type) {
private int getGoblinOrderUnPayNum(String streamKey) {
String[] keyArr = streamKey.split(":");
return Integer.parseInt(keyArr[keyArr.length]);
}
private boolean consumerOrderCloseHandler(String streamKey, String message, String type) {
boolean aBoolean = false;
try {
aBoolean = checkOrderTime(message, type);
int unPayNum = this.getGoblinOrderUnPayNum(streamKey);
LocalDateTime now = LocalDateTime.now().minusMinutes(5);
int currMin = now.getMinute() % 10, sleepMin = 1;
if (currMin == unPayNum) {
aBoolean = checkOrderTime(message, type);
} else {
try {
Thread.sleep(sleepMin * 60000L);
consumerOrderCloseHandler(streamKey, message, type);
} catch (InterruptedException ignored) {
}
}
} catch (Exception e) {
log.error("CONSUMER ORDER {} CLOSE FAIL ==> {}", type, e.getMessage(), e);
} finally {
......
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose0RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_0.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_0.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose1RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_1.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_1.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose2RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_2.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_2.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose3RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_3.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_3.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose4RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_4.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_4.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose5RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_5.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_5.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose6RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_6.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_6.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose7RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_7.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_7.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose8RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_8.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_8.getGroup();
}
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerOrderClose9RdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_9.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_UN_PAY_9.getGroup();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment