记得上下班打卡 | git大法好,push需谨慎

Commit a9e6e471 authored by zhanggb's avatar zhanggb

~queue:consumer-base.+biz_integral;

parent db20d1fa
......@@ -37,6 +37,14 @@
<artifactId>liquidnet-service-galaxy-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-service-goblin-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- third party api -->
</dependencies>
<build>
......
......@@ -2,10 +2,7 @@ package com.liquidnet.service.consumer.base.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.ConsumerGalaxyJsonNftPublishAndBuyReceiver;
import com.liquidnet.service.consumer.base.receiver.ConsumerGalaxyJsonNftUserRegisterReceiver;
import com.liquidnet.service.consumer.base.receiver.ConsumerGoblinBizArtworkClqReceiver;
import com.liquidnet.service.consumer.base.receiver.ConsumerGoblinBizArtworkUplReceiver;
import com.liquidnet.service.consumer.base.receiver.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -36,9 +33,11 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinBizArtworkClqReceiver consumerGoblinBizArtworkClqReceiver;
@Autowired
private ConsumerGalaxyJsonNftPublishAndBuyReceiver jsonNftPublishAndBuyReceiver;
ConsumerGalaxyJsonNftPublishAndBuyReceiver jsonNftPublishAndBuyReceiver;
@Autowired
private ConsumerGalaxyJsonNftUserRegisterReceiver jsonNftUserRegisterReceiver;
ConsumerGalaxyJsonNftUserRegisterReceiver jsonNftUserRegisterReceiver;
@Autowired
ConsumerGoblinBizIntegralReceiver consumerGoblinBizIntegralReceiver;
@Bean// 藏品上传声明
public List<Subscription> subscriptionGoblinBizArtworkUpl(RedisConnectionFactory factory) {
......@@ -115,4 +114,20 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
}
return subscriptionList;
}
@Bean// 增减积分
public List<Subscription> subscriptionBizIntegral(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_INTEGRAL;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 1; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerGoblinBizIntegralReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.base.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.HttpUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.goblin.dto.GoblinQueueBizIntegralDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import java.util.HashMap;
@Slf4j
@Component
public class ConsumerGoblinBizIntegralReceiver extends AbstractBizRedisReceiver {
@Value("${liquidnet.service.stone.url}")
private String stoneUrl;
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.BIZ_INTEGRAL.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.BIZ_INTEGRAL.getGroup();
}
@Override
protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
GoblinQueueBizIntegralDto fromJsonObj = JsonUtils.fromJson(msg, GoblinQueueBizIntegralDto.class);
if (fromJsonObj == null) {
log.warn("CONSUMER MSG NULL_DTO ==> [{}]:{}", this.getRedisStreamKey(), msg);
aBoolean = true;
} else {
aBoolean = this.bizIntegralProcessing(fromJsonObj);
}
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put(MQConst.QUEUE_MESSAGE_KEY, msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
private boolean bizIntegralProcessing(GoblinQueueBizIntegralDto dto) {
try {
log.info("bizIntegralProcessing params:{}", dto);
String uid = dto.getUid(), content = dto.getContent();
int type = dto.getType();
MultiValueMap<String, String> header = CollectionUtil.linkedMultiValueMapStringString();
header.add("Accept", "application/json;charset=UTF-8");
MultiValueMap<String, String> params = CollectionUtil.linkedMultiValueMapStringString();
params.add("score", dto.getScore().intValue() + "");
params.add("content", content);
params.add("uid", uid);
String resultData = "";
long ss = System.currentTimeMillis();
log.info("HttpUtil s:{}", ss);
if (type == 1) {
resultData = HttpUtil.post(stoneUrl + "/user/logs/in2111", params, header);
} else {
resultData = HttpUtil.post(stoneUrl + "/user/logs/de2111", params, header);
}
log.info("HttpUtil e:{}", System.currentTimeMillis() - ss);
log.info("bizIntegralProcessing result:{}", resultData);
return true;
} catch (Exception e) {
log.error("bizIntegralProcessing e:{}", e);
return false;
}
}
/* ------------------------------------------------------------------------------------ */
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment