记得上下班打卡 | git大法好,push需谨慎

Commit c153d97f authored by 张国柄's avatar 张国柄

~opt;

parent 3b6c7030
package com.liquidnet.service.consumer.nft.config; //package com.liquidnet.service.consumer.nft.config;
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig; //import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration; //import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate; //import org.springframework.data.redis.core.StringRedisTemplate;
//
@Configuration //@Configuration
public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { //public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
@Autowired // @Autowired
StringRedisTemplate stringRedisTemplate; // StringRedisTemplate stringRedisTemplate;
} //}
package com.liquidnet.service.consumer.nft.receiver; //package com.liquidnet.service.consumer.nft.receiver;
//
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.nft.service.IBaseDao; //import com.liquidnet.service.consumer.nft.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; //import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate; //import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; //import org.springframework.data.redis.stream.StreamListener;
//
/** ///**
* 公共的业务队列消息监听器,具体业务消费逻辑通过`consumerMessageHandler`实现 // * 公共的业务队列消息监听器,具体业务消费逻辑通过`consumerMessageHandler`实现
* // *
* @author zhanggb // * @author zhanggb
* Created by IntelliJ IDEA at 2022/3/31 // * Created by IntelliJ IDEA at 2022/3/31
*/ // */
@Slf4j //@Slf4j
public abstract class AbstractBizRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { //public abstract class AbstractBizRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired // @Autowired
public IBaseDao baseDao; // public IBaseDao baseDao;
@Autowired // @Autowired
public StringRedisTemplate stringRedisTemplate; // public StringRedisTemplate stringRedisTemplate;
//
@Override // @Override
public void onMessage(MapRecord<String, String, String> message) { // public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey(); // String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue()); // log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get(MQConst.QUEUE_MESSAGE_KEY)); // boolean result = this.consumerMessageHandler(message.getValue().get(MQConst.QUEUE_MESSAGE_KEY));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId()); // log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
//
try { // try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); // stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
try { // try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId()); // stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
} // }
//
protected abstract boolean consumerMessageHandler(String msg); // protected abstract boolean consumerMessageHandler(String msg);
//
protected abstract String getRedisStreamKey(); // protected abstract String getRedisStreamKey();
//
protected abstract String getRedisStreamGroup(); // protected abstract String getRedisStreamGroup();
} //}
\ No newline at end of file \ No newline at end of file
package com.liquidnet.service.consumer.nft.receiver; //package com.liquidnet.service.consumer.nft.receiver;
//
import com.liquidnet.commons.lang.util.CollectionUtil; //import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; //import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; //import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.nft.service.IBaseDao; //import com.liquidnet.service.consumer.nft.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; //import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords; //import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate; //import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; //import org.springframework.data.redis.stream.StreamListener;
//
import java.util.HashMap; //import java.util.HashMap;
//
/** ///**
* 公共的SQL队列消息监听器,具体SQL消费逻辑统一使用`consumerMessageHandler` // * 公共的SQL队列消息监听器,具体SQL消费逻辑统一使用`consumerMessageHandler`
* // *
* @author zhanggb // * @author zhanggb
* Created by IntelliJ IDEA at 2022/3/31 // * Created by IntelliJ IDEA at 2022/3/31
*/ // */
@Slf4j //@Slf4j
public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { //public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired // @Autowired
private IBaseDao baseDao; // private IBaseDao baseDao;
@Autowired // @Autowired
StringRedisTemplate stringRedisTemplate; // StringRedisTemplate stringRedisTemplate;
//
@Override // @Override
public void onMessage(MapRecord<String, String, String> message) { // public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey(); // String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue()); // log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message")); // boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId()); // log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
//
try { // try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); // stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
try { // try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId()); // stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
} // }
//
private boolean consumerMessageHandler(String msg) { // private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; // boolean aBoolean = false;
try { // try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); // SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) { // if (sqlMessage == null) {
aBoolean = true; // aBoolean = true;
} else { // } else {
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs()); // aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} // }
} catch (Exception e) { // } catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e); // log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { // } finally {
if (!aBoolean) { // if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); // HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg); // map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey())); // stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
} // }
} // }
return aBoolean; // return aBoolean;
} // }
//
protected abstract String getRedisStreamKey(); // protected abstract String getRedisStreamKey();
//
protected abstract String getRedisStreamGroup(); // protected abstract String getRedisStreamGroup();
} //}
\ No newline at end of file \ No newline at end of file
package com.liquidnet.service.consumer.nft.receiver; //package com.liquidnet.service.consumer.nft.receiver;
//
import com.liquidnet.commons.lang.util.CollectionUtil; //import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; //import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.ResponseDto; //import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.nft.service.processor.ConsumerGalaxyJsonNftPublishAndBuyProcessor; //import com.liquidnet.service.consumer.nft.service.processor.ConsumerGalaxyJsonNftPublishAndBuyProcessor;
import com.liquidnet.service.galaxy.dto.param.GalaxyNftPublishAndBuyReqDto; //import com.liquidnet.service.galaxy.dto.param.GalaxyNftPublishAndBuyReqDto;
import com.liquidnet.service.galaxy.dto.param.GalaxyNftPublishAndBuyRespDto; //import com.liquidnet.service.galaxy.dto.param.GalaxyNftPublishAndBuyRespDto;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords; //import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.util.HashMap; //import java.util.HashMap;
//
@Slf4j //@Slf4j
@Component //@Component
public class ConsumerGalaxyJsonNftPublishAndBuyReceiver extends AbstractBizRedisReceiver { //public class ConsumerGalaxyJsonNftPublishAndBuyReceiver extends AbstractBizRedisReceiver {
@Autowired // @Autowired
private ConsumerGalaxyJsonNftPublishAndBuyProcessor jsonNftPublishAndBuyProcessor; // private ConsumerGalaxyJsonNftPublishAndBuyProcessor jsonNftPublishAndBuyProcessor;
//
@Override // @Override
protected boolean consumerMessageHandler(String msg) { // protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; // boolean aBoolean = false;
try { // try {
GalaxyNftPublishAndBuyReqDto textMessage = JsonUtils.fromJson(msg, GalaxyNftPublishAndBuyReqDto.class); // GalaxyNftPublishAndBuyReqDto textMessage = JsonUtils.fromJson(msg, GalaxyNftPublishAndBuyReqDto.class);
if (textMessage == null) { // if (textMessage == null) {
aBoolean = true; // aBoolean = true;
} else { // } else {
//执行计数 // //执行计数
ResponseDto<GalaxyNftPublishAndBuyRespDto> responseDto = jsonNftPublishAndBuyProcessor.nftPublishAndBuy(textMessage); // ResponseDto<GalaxyNftPublishAndBuyRespDto> responseDto = jsonNftPublishAndBuyProcessor.nftPublishAndBuy(textMessage);
if(responseDto.isSuccess()){ // if(responseDto.isSuccess()){
aBoolean = true; // aBoolean = true;
} // }
} // }
} catch (Exception e) { // } catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e); // log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { // } finally {
if (!aBoolean) { // if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); // HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg); // map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey())); // stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
} // }
} // }
return aBoolean; // return aBoolean;
} // }
//
@Override // @Override
protected String getRedisStreamKey() { // protected String getRedisStreamKey() {
return MQConst.GalaxyQueue.JSON_NFT_PUBLISH_AND_BUY.getKey(); // return MQConst.GalaxyQueue.JSON_NFT_PUBLISH_AND_BUY.getKey();
} // }
//
@Override // @Override
protected String getRedisStreamGroup() { // protected String getRedisStreamGroup() {
return MQConst.GalaxyQueue.JSON_NFT_PUBLISH_AND_BUY.getGroup(); // return MQConst.GalaxyQueue.JSON_NFT_PUBLISH_AND_BUY.getGroup();
} // }
} //}
package com.liquidnet.service.consumer.nft.receiver; //package com.liquidnet.service.consumer.nft.receiver;
//
import com.liquidnet.commons.lang.util.CollectionUtil; //import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; //import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.ResponseDto; //import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.nft.service.processor.ConsumerGalaxyJsonNftUserRegisterProcessor; //import com.liquidnet.service.consumer.nft.service.processor.ConsumerGalaxyJsonNftUserRegisterProcessor;
import com.liquidnet.service.galaxy.dto.param.GalaxyUserRegisterReqDto; //import com.liquidnet.service.galaxy.dto.param.GalaxyUserRegisterReqDto;
import com.liquidnet.service.galaxy.dto.param.GalaxyUserRegisterRespDto; //import com.liquidnet.service.galaxy.dto.param.GalaxyUserRegisterRespDto;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.StreamRecords; //import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.util.HashMap; //import java.util.HashMap;
//
@Slf4j //@Slf4j
@Component //@Component
public class ConsumerGalaxyJsonNftUserRegisterReceiver extends AbstractBizRedisReceiver { //public class ConsumerGalaxyJsonNftUserRegisterReceiver extends AbstractBizRedisReceiver {
@Autowired // @Autowired
private ConsumerGalaxyJsonNftUserRegisterProcessor jsonNftUserRegisterProcessor; // private ConsumerGalaxyJsonNftUserRegisterProcessor jsonNftUserRegisterProcessor;
//
@Override // @Override
protected boolean consumerMessageHandler(String msg) { // protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; // boolean aBoolean = false;
try { // try {
GalaxyUserRegisterReqDto textMessage = JsonUtils.fromJson(msg, GalaxyUserRegisterReqDto.class); // GalaxyUserRegisterReqDto textMessage = JsonUtils.fromJson(msg, GalaxyUserRegisterReqDto.class);
if (textMessage == null) { // if (textMessage == null) {
aBoolean = true; // aBoolean = true;
} else { // } else {
//执行计数 // //执行计数
ResponseDto<GalaxyUserRegisterRespDto> responseDto = jsonNftUserRegisterProcessor.userRegister(textMessage); // ResponseDto<GalaxyUserRegisterRespDto> responseDto = jsonNftUserRegisterProcessor.userRegister(textMessage);
// if(responseDto.isSuccess()){ //// if(responseDto.isSuccess()){
aBoolean = true; // aBoolean = true;
// } //// }
} // }
} catch (Exception e) { // } catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e); // log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { // } finally {
if (!aBoolean) { // if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); // HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg); // map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey())); // stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
} // }
} // }
return aBoolean; // return aBoolean;
} // }
//
@Override // @Override
protected String getRedisStreamKey() { // protected String getRedisStreamKey() {
return MQConst.GalaxyQueue.JSON_NFT_USER_REGISTER.getKey(); // return MQConst.GalaxyQueue.JSON_NFT_USER_REGISTER.getKey();
} // }
//
@Override // @Override
protected String getRedisStreamGroup() { // protected String getRedisStreamGroup() {
return MQConst.GalaxyQueue.JSON_NFT_USER_REGISTER.getGroup(); // return MQConst.GalaxyQueue.JSON_NFT_USER_REGISTER.getGroup();
} // }
} //}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment