记得上下班打卡 | git大法好,push需谨慎

Commit 75383d61 authored by wanglele's avatar wanglele

批量空投

parent a5c3a10b
...@@ -274,7 +274,8 @@ public class MQConst { ...@@ -274,7 +274,8 @@ public class MQConst {
GOBLIN_STORE_ORDER_OPERA("goblin:stream:order:store", "group.order:store", "商铺订单操作"), GOBLIN_STORE_ORDER_OPERA("goblin:stream:order:store", "group.order:store", "商铺订单操作"),
GOBLIN_USER_ORDER_OPERA("goblin:stream:order:user", "group.order:user", "用户订单操作"), GOBLIN_USER_ORDER_OPERA("goblin:stream:order:user", "group.order:user", "用户订单操作"),
GOBLIN_XLS_OPERA("goblin:stream:xls", "group.xls", "xls文件操作"), GOBLIN_XLS_OPERA("goblin:stream:xls", "group.xls", "xls文件操作"),
GOBLIN_PHONE_CODE_OPERA("goblin:stream:phone:code","goblin.phone.code","批量空投操作"), GOBLIN_PHONE_CODE_OPERA("goblin:stream:phone:code","group.phone:code","批量空投操作"),
GOBLIN_CODE_OPERA("goblin:stream:code","group.code","修改兑换码操作人"),
GOBLIN_UN_PAY_0("goblin:stream:order:back:0", "group.order:back", "回滚关闭订单库存队列"), GOBLIN_UN_PAY_0("goblin:stream:order:back:0", "group.order:back", "回滚关闭订单库存队列"),
GOBLIN_UN_PAY_1("goblin:stream:order:back:1", "group.order:back", "回滚关闭订单库存队列"), GOBLIN_UN_PAY_1("goblin:stream:order:back:1", "group.order:back", "回滚关闭订单库存队列"),
......
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinCodeReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
@Configuration
public class ConsumerGoblinCodeStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinCodeReceiver consumerGoblinCodeReceiver;
public Subscription receiveCodeExamine(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_CODE_OPERA.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_CODE_OPERA.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_CODE_OPERA.getKey(), ReadOffset.lastConsumed()), consumerGoblinCodeReceiver);
}
@Bean
public Subscription codeExamine0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveCodeExamine(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinPhoneCodeReceiver;
@Configuration
public class ConsumerGoblinPhoneCodeStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinPhoneCodeReceiver consumerGoblinPhoneCodeReceiver;
public Subscription receivePhoneCodeExamine(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_PHONE_CODE_OPERA.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_PHONE_CODE_OPERA.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_PHONE_CODE_OPERA.getKey(), ReadOffset.lastConsumed()), consumerGoblinPhoneCodeReceiver);
}
@Bean
public Subscription phoneCodeExamine0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receivePhoneCodeExamine(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
/* @Bean
public Subscription phoneCodeExamine1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receivePhoneCodeExamine(listenerContainer, 1);
listenerContainer.start();
return subscription;
}*/
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.type.TypeReference;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.HttpUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.goblin.mapper.GoblinNftExCodeMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import java.util.*;
@Slf4j
public abstract class AbstractHttpRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Value("${liquidnet.service.order.url}")
private String orderUrl;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue());
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
}
private boolean consumerMessageHandler(Map<String, String> message) {
try {
String codestr = message.get("codes");
String userIds = message.get("userIds");
String adminUid = message.get("adminUid");
List<String> codeList = Arrays.asList(codestr.split(","));
List<String> userIdList = Arrays.asList(userIds.split(","));
LinkedList<Object[]> addLink = CollectionUtil.linkedListObjectArr();
for (int i = 0; i < userIdList.size(); i++) {
MultiValueMap<String, String> params = new LinkedMultiValueMap();
params.add("code", codeList.get(i));
params.add("userId", userIdList.get(i));
MultiValueMap<String, String> headers = CollectionUtil.linkedMultiValueMapStringString();
headers.add("Accept", "application/json;charset=UTF-8");
String post = HttpUtil.post(orderUrl + "/order/goblin/nft/airdrop", params, headers);
ResponseDto<Boolean> rsp = JsonUtils.fromJson(post, new TypeReference<ResponseDto<Boolean>>() {
});
if (rsp.isSuccess()) {
addLink.add(new Object[]{adminUid,codeList.get(i)});
}
}
LinkedList<String> sqls = CollectionUtil.linkedListString();
sqls.add(SqlMapping.get("goblin_activity.code.admin"));
String sqlData = SqlMapping.gets(sqls, addLink);
sendMsgByRedis(MQConst.GoblinQueue.GOBLIN_CODE_OPERA.getKey(),sqlData);
} catch (Exception e) {
e.printStackTrace();
log.error("发送空投失败!");
}
return true;
}
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
package com.liquidnet.service.consumer.kylin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerGoblinCodeReceiver extends AbstractSqlRedisReceiver{
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_CODE_OPERA.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_CODE_OPERA.getGroup();
}
}
...@@ -10,6 +10,7 @@ import com.liquidnet.service.goblin.mapper.GoblinNftExCodeMapper; ...@@ -10,6 +10,7 @@ import com.liquidnet.service.goblin.mapper.GoblinNftExCodeMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
...@@ -20,50 +21,7 @@ import java.util.Map; ...@@ -20,50 +21,7 @@ import java.util.Map;
@Slf4j @Slf4j
@Component @Component
public class ConsumerGoblinPhoneCodeReceiver extends AbstractBizRedisReceiver{ public class ConsumerGoblinPhoneCodeReceiver extends AbstractHttpRedisReceiver{
@Autowired
private GoblinNftExCodeMapper goblinNftExCodeMapper;
@Value("${liquidnet.service.order.url}")
private String orderUrl;
@Override
protected boolean consumerMessageHandler(String msg) {
Boolean bol = false;
try{
JSONObject jsonObject = JSON.parseObject(msg);
String codestr = jsonObject.getString("codes");
String userIds = jsonObject.getString("userIds");
String adminUid = jsonObject.getString("adminUid");
List<String> codeList = Arrays.asList(codestr.split(","));
List<String> userIdList = Arrays.asList(userIds.split(","));
StringBuffer codes = new StringBuffer();
for (int i = 0; i < userIdList.size(); i++) {
MultiValueMap<String, String> params = new LinkedMultiValueMap();
params.add("code", codeList.get(i));
params.add("userId", userIdList.get(i));
MultiValueMap<String, String> headers = CollectionUtil.linkedMultiValueMapStringString();
headers.add("Accept", "application/json;charset=UTF-8");
headers.add("Authorization", "Bearer " + CurrentUtil.getToken());
String post = HttpUtil.post(orderUrl + "/order/goblin/nft/airdrop", params, headers);
ResponseDto<Boolean> rsp = JsonUtils.fromJson(post, new TypeReference<ResponseDto<Boolean>>() {
});
if (rsp.isSuccess()) {
codes.append(codeList.get(i)).append(",");
}
}
if (StringUtil.isNotBlank(codes)){
codes.deleteCharAt(codes.length()-1);
}
goblinNftExCodeMapper.updateCodeAdminUids(codes.toString(),adminUid);
}catch (Exception e){
e.printStackTrace();
log.error("发送空投失败!");
}
return bol;
}
@Override @Override
protected String getRedisStreamKey() { protected String getRedisStreamKey() {
...@@ -74,4 +32,5 @@ public class ConsumerGoblinPhoneCodeReceiver extends AbstractBizRedisReceiver{ ...@@ -74,4 +32,5 @@ public class ConsumerGoblinPhoneCodeReceiver extends AbstractBizRedisReceiver{
protected String getRedisStreamGroup() { protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_PHONE_CODE_OPERA.getGroup(); return MQConst.GoblinQueue.GOBLIN_PHONE_CODE_OPERA.getGroup();
} }
} }
...@@ -3,4 +3,7 @@ goblin_order.close.order=UPDATE goblin_store_order SET status = ? ,updated_at = ...@@ -3,4 +3,7 @@ goblin_order.close.order=UPDATE goblin_store_order SET status = ? ,updated_at =
goblin_order.close.sku=UPDATE goblin_order_sku SET status = ? ,updated_at = ? WHERE order_sku_id = ? and (updated_at <= ? or created_at = ? or updated_at is null) goblin_order.close.sku=UPDATE goblin_order_sku SET status = ? ,updated_at = ? WHERE order_sku_id = ? and (updated_at <= ? or created_at = ? or updated_at is null)
goblin_user_coupon.updateState=UPDATE goblin_user_coupon SET state = ? , used_for = ? ,updated_at = ? where ucoupon_id = ? goblin_user_coupon.updateState=UPDATE goblin_user_coupon SET state = ? , used_for = ? ,updated_at = ? where ucoupon_id = ?
#-------- NFT ------- #-------- NFT -------
goblin_nft_order.update.close=UPDATE goblin_nft_order SET status = ?, updated_at = ?, cancel_time = ?, cancel_reason = ? WHERE order_id = ? and (updated_at <= ? or created_at = ? or updated_at is null) goblin_nft_order.update.close=UPDATE goblin_nft_order SET status = ?, updated_at = ?, cancel_time = ?, cancel_reason = ? WHERE order_id = ? and (updated_at <= ? or created_at = ? or updated_at is null)
\ No newline at end of file
#---- 兑换码操作
goblin_activity.code.admin=UPDATE goblin_nft_ex_code SET admin_uid = ? where code = ?
\ No newline at end of file
...@@ -416,18 +416,35 @@ public class GoblinNftExCodeServiceImpl implements IGoblinNftExCodeService { ...@@ -416,18 +416,35 @@ public class GoblinNftExCodeServiceImpl implements IGoblinNftExCodeService {
for (GoblinNftExCode goblinNftExCode : goblinNftExCodeList) { for (GoblinNftExCode goblinNftExCode : goblinNftExCodeList) {
codes.append(goblinNftExCode.getCode()).append(","); codes.append(goblinNftExCode.getCode()).append(",");
} }
JSONObject jsonObject = new JSONObject(); HashMap<String, String> hashMap = CollectionUtil.mapStringString();
codes.deleteCharAt(codes.length() - 1); codes.deleteCharAt(codes.length() - 1);
userIds.deleteCharAt(userIds.length() - 1); userIds.deleteCharAt(userIds.length() - 1);
jsonObject.put("codes", codes); hashMap.put("codes", codes.toString());
jsonObject.put("userIds", userIds); hashMap.put("userIds", userIds.toString());
jsonObject.put("adminUid", adminUid); hashMap.put("adminUid", adminUid);
queueUtils.sendMssPhoneCodeRedis(MQConst.GoblinQueue.GOBLIN_PHONE_CODE_OPERA.getKey(), jsonObject.toJSONString()); queueUtils.sendMssPhoneCodeRedis(hashMap);
userIdList.clear(); userIdList.clear();
i++; i++;
} }
} }
if (userIdList.size() > 0) {
PageHelper.startPage(i, pageSize, true);
List<GoblinNftExCode> goblinNftExCodeList = goblinNftExCodeMapper.selectCodeAvailableByActivityId(activityId);
StringBuffer codes = new StringBuffer();
for (GoblinNftExCode goblinNftExCode : goblinNftExCodeList) {
codes.append(goblinNftExCode.getCode()).append(",");
}
HashMap<String, String> hashMap = CollectionUtil.mapStringString();
codes.deleteCharAt(codes.length() - 1);
userIds.deleteCharAt(userIds.length() - 1);
hashMap.put("codes", codes.toString());
hashMap.put("userIds", userIds.toString());
hashMap.put("adminUid", adminUid);
queueUtils.sendMssPhoneCodeRedis(hashMap);
userIdList.clear();
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
......
...@@ -8,6 +8,7 @@ import org.springframework.data.redis.connection.stream.StreamRecords; ...@@ -8,6 +8,7 @@ import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map;
@Component @Component
public class QueueUtils { public class QueueUtils {
...@@ -70,10 +71,8 @@ public class QueueUtils { ...@@ -70,10 +71,8 @@ public class QueueUtils {
* *
* @param * @param
*/ */
public void sendMssPhoneCodeRedis(String streamKey, String jsonMsg) { public void sendMssPhoneCodeRedis(Map<String,String> map) {
HashMap<String, String> map = CollectionUtil.mapStringString(); redisDataSourceUtil.getRedisGoblinUtil().getStringRedisTemplate().opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(MQConst.GoblinQueue.GOBLIN_PHONE_CODE_OPERA.getKey()));
map.put(MQConst.QUEUE_MESSAGE_KEY, jsonMsg);
redisDataSourceUtil.getRedisGoblinUtil().getStringRedisTemplate().opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
} }
} }
...@@ -3,4 +3,4 @@ goblin_order.close.order=UPDATE goblin_store_order SET status = ? ,updated_at = ...@@ -3,4 +3,4 @@ goblin_order.close.order=UPDATE goblin_store_order SET status = ? ,updated_at =
goblin_order.close.sku=UPDATE goblin_order_sku SET status = ? ,updated_at = ? WHERE order_sku_id = ? and (updated_at <= ? or created_at = ? or updated_at is null) goblin_order.close.sku=UPDATE goblin_order_sku SET status = ? ,updated_at = ? WHERE order_sku_id = ? and (updated_at <= ? or created_at = ? or updated_at is null)
# ------------------------????---------------------------- # ------------------------????----------------------------
kylin_camera_record.insert=INSERT INTO kylin_camera_record (camera_record_id, camera_id, person_num) VALUES (?,?,?) kylin_camera_record.insert=INSERT INTO kylin_camera_record (camera_record_id, camera_id, person_num) VALUES (?,?,?)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment