记得上下班打卡 | git大法好,push需谨慎

Commit 533ba47c authored by anjiabin's avatar anjiabin

修改galaxy消费相关

parent a8ed4fb1
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.consumer.kylin.service.processor.ConsumerGalaxyJsonNftPublishAndBuyProcessor;
import com.liquidnet.service.galaxy.dto.param.GalaxyNftPublishAndBuyReqDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.util.HashMap;
@Slf4j
public abstract class AbstractJsonRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
private ConsumerGalaxyJsonNftPublishAndBuyProcessor jsonNftPublishAndBuyProcessor;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
}
private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
GalaxyNftPublishAndBuyReqDto textMessage = JsonUtils.fromJson(msg, GalaxyNftPublishAndBuyReqDto.class);
if (textMessage == null) {
aBoolean = true;
} else {
//执行计数
jsonNftPublishAndBuyProcessor.nftPublishAndBuy(textMessage);
aBoolean = true;
}
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
package com.liquidnet.service.consumer.kylin.receiver; package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.constant.MQConst; import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.service.processor.ConsumerGalaxyJsonNftPublishAndBuyProcessor;
import com.liquidnet.service.galaxy.dto.param.GalaxyNftPublishAndBuyReqDto;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap;
@Slf4j @Slf4j
@Component @Component
public class ConsumerGalaxyJsonNftPublishAndBuyReceiver extends AbstractJsonRedisReceiver { public class ConsumerGalaxyJsonNftPublishAndBuyReceiver extends AbstractBizRedisReceiver {
@Autowired
private ConsumerGalaxyJsonNftPublishAndBuyProcessor jsonNftPublishAndBuyProcessor;
@Override
protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
GalaxyNftPublishAndBuyReqDto textMessage = JsonUtils.fromJson(msg, GalaxyNftPublishAndBuyReqDto.class);
if (textMessage == null) {
aBoolean = true;
} else {
//执行计数
jsonNftPublishAndBuyProcessor.nftPublishAndBuy(textMessage);
aBoolean = true;
}
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
@Override @Override
protected String getRedisStreamKey() { protected String getRedisStreamKey() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment