记得上下班打卡 | git大法好,push需谨慎

Commit 3164744c authored by Administrator's avatar Administrator 🎨

Merge branch 'pre' into 'master'

Pre

See merge request !183
parents 652e1507 d3f6a767
......@@ -73,20 +73,21 @@ public class GoblinRedisConst {
public static final String FRONT_TOP_BANNER = PREFIX.concat("front_top_banner"); //前端顶部banner
public static final String FRONT_MIDDLE_BANNER = PREFIX.concat("front_middle_banner"); //前端中部banner
public static final String FRONT_NAVIGATION = PREFIX.concat("front_navigation"); //前端banner
public static final String FRONT_HOTWORD = PREFIX.concat("front_hot_word"); //前端banner
public static final String FRONT_HOTWORD_INDEX = PREFIX.concat("front_hot_word_index"); //前端banner index
public static final String FRONT_FRONT_SECKILL = PREFIX.concat("front_front_seckill"); //前端 首页秒杀
public static final String FRONT_SECKILL = PREFIX.concat("front_seckill"); //前端 秒杀列表
public static final String SELECT_GOODS = PREFIX.concat("select_goods"); //配置的精选商品
public static final String SELECT_GOODS_PAGE1 = PREFIX.concat("select_goods_page1"); //精选商品 第一页
public static final String SELECT_GOODS_SORT = PREFIX.concat("select_goods_sort"); //精选商品 排序规则
public static final String SELECT_GOODS_SPUIDS = PREFIX.concat("select_goods_SPUIDS"); //精选商品 spuids
public static final String MOUDLE_INDEX = PREFIX.concat("moudle_index"); //moudle_index 组件排序
public static final String COMPLIATIONS = PREFIX.concat("goblinFrontCompilations"); // 合集
public static final String FRONT_GOBLINFRONTCUBE = PREFIX.concat("goblinFrontCube"); // 魔方
public static final String FRONT_GOBLIN_RECOMMEND= PREFIX.concat("goblinFrontRECOMMMEND"); // 新品推荐
public static final String FRONT_SHOPCART = PREFIX.concat("goblin_shopcart:"); // 购物车
public static final String FRONT_SHOPCART_TWO = PREFIX.concat("goblin_shopcartTwo:"); // 购物车第二部
public static final String FRONT_HOTWORD = PREFIX.concat("front_hot_word"); //前端banner
public static final String FRONT_HOTWORD_INDEX = PREFIX.concat("front_hot_word_index"); //前端banner index
public static final String FRONT_FRONT_SECKILL = PREFIX.concat("front_front_seckill"); //前端 首页秒杀
public static final String FRONT_SECKILL = PREFIX.concat("front_seckill"); //前端 秒杀列表
public static final String SELECT_GOODS = PREFIX.concat("select_goods"); //配置的精选商品
public static final String SELECT_GOODS_PAGE1 = PREFIX.concat("select_goods_page1"); //精选商品 第一页
public static final String SELECT_GOODS_SORT = PREFIX.concat("select_goods_sort"); //精选商品 排序规则
public static final String SELECT_GOODS_SPUIDS = PREFIX.concat("select_goods_SPUIDS"); //精选商品 spuids
public static final String SELECT_GOODS_SPUIDS_ISHAVE = PREFIX.concat("select_goods_SPUIDS_isHave"); //精选商品 spuids
public static final String MOUDLE_INDEX = PREFIX.concat("moudle_index"); //moudle_index 组件排序
public static final String COMPLIATIONS = PREFIX.concat("goblinFrontCompilations"); // 合集
public static final String FRONT_GOBLINFRONTCUBE = PREFIX.concat("goblinFrontCube"); // 魔方
public static final String FRONT_GOBLIN_RECOMMEND = PREFIX.concat("goblinFrontRECOMMMEND"); // 新品推荐
public static final String FRONT_SHOPCART = PREFIX.concat("goblin_shopcart:"); // 购物车
public static final String FRONT_SHOPCART_TWO = PREFIX.concat("goblin_shopcartTwo:"); // 购物车第二部
/* ----------------------------------------------------------------- */
......@@ -101,7 +102,7 @@ public class GoblinRedisConst {
public static final String REDIS_GOBLIN_BUY_COUNT = PREFIX.concat("uid:");//用户sku购买数量 key:uid:skuId:$skuId
public static final String REDIS_GOBLIN_SALE_COUNT = PREFIX.concat("sale:skuId:");//用户sku购买数量 key:sale:skuId:$skuId
public static final String REDIS_GOBLIN_SALE_SPU_COUNT = PREFIX.concat("sale:skuId:");//用户sku购买数量 key:sale:skuId:$spuId
public static final String REDIS_GOBLIN_SALE_SPU_COUNT = PREFIX.concat("sale:spuId:");//用户sku购买数量 key:sale:skuId:$spuId
public static final String REDIS_GOBLIN_ORDER = PREFIX.concat("order:");//用户sku购买数量 key:$orderId
public static final String REDIS_GOBLIN_ORDER_BACK = PREFIX.concat("order:back:");//用户sku购买数量 key:$backOrderId
public static final String REDIS_GOBLIN_ORDER_SKU = PREFIX.concat("orderSku:");//用户sku购买数量 key:$orderSkuId
......
......@@ -99,8 +99,9 @@ public class GoblinStatusConst {
BACK_REASON_TYPE_5(5, "不想买了"),
BACK_REASON_TYPE_6(6, "商品质量问题"),
BACK_REASON_TYPE_7(7, "收到商品与描述不符"),
BACK_REASON_TYPE_8(8, "商铺发起"),//todo
BACK_REASON_TYPE_8(8, "商铺发起"),
BACK_REASON_TYPE_9(9, "系统自动申请"),
BACK_REASON_TYPE_10(10, "超时支付自动退款"),
BACK_TYPE_1(1, "退款"),
BACK_TYPE_2(2, "退货"),
......
package com.liquidnet.service.goblin.dto.vo;
import com.liquidnet.service.goblin.entity.GoblinBackOrder;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
......@@ -90,6 +91,45 @@ public class GoblinBackOrderVo implements Serializable, Cloneable {
this.expireAt = LocalDateTime.parse(createdAt, DTF_YMD_HMS).plusDays(7).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
public GoblinBackOrderVo copy(GoblinBackOrder source) {
this.setBackOrderId(source.getBackOrderId());
this.setBackCode(source.getBackCode());
this.setOrderId(source.getOrderId());
this.setOrderCode(source.getOrderCode());
this.setStoreId(source.getStoreId());
this.setUserId(source.getUserId());
this.setType(source.getType());
this.setReason(source.getReason());
this.setRefuseReason(source.getRefuseReason());
this.setErrorReason(source.getErrorReason());
this.setDescribes(source.getDescribes());
this.setCredential(source.getCredential());
this.setBackType(source.getBackType());
this.setRealBackPrice(source.getRealBackPrice());
this.setBackPriceExpress(source.getBackPriceExpress());
this.setPics(source.getPics());
this.setStatus(source.getStatus());
this.setPreDepositPay(source.getPreDepositPay());
this.setPayType(source.getBackOrderId());
this.setLogisCompanyName(source.getLogisCompanyName());
this.setMailNo(source.getMailNo());
if(source.getRefundAt()!=null) {
this.setRefundAt(source.getRefundAt().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
if(source.getRefuseAt()!=null) {
this.setRefuseAt(source.getRefuseAt().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
this.setRefuseSize(source.getRefusedSize());
if(source.getAuditAt()!=null) {
this.setAuditAt(source.getAuditAt().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
this.setCreatedAt(source.getCreatedAt().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
if(source.getExpireAt()!=null) {
this.setExpireAt(source.getExpireAt().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
return this;
}
private static final GoblinBackOrderVo obj = new GoblinBackOrderVo();
public static GoblinBackOrderVo getNew() {
......
......@@ -23,6 +23,8 @@ public class GoblinSelfZhengzaiSkuVo implements Serializable, Cloneable {
private Integer stock;
@ApiModelProperty(position = 2, value = "可配置库存")
private Integer skuStock;
@ApiModelProperty(position = 2, value = "剩余库存")
private Integer restStock;
@ApiModelProperty(position = 3, value = "原价")
private BigDecimal price;
@ApiModelProperty(position = 4, value = "秒杀价")
......
......@@ -25,9 +25,12 @@ public class GoblinStoreOrderListSkuVo implements Cloneable {
private Integer num;
@ApiModelProperty(value = "订单状态[0-待付款(用户刚下单)|2-代发货(用户付完款 等待商城发货)|3-代收货(商城已经发货 等待用户确认收货)|4-已完成(用户已经确认收货 订单结束)|5-取消订单(用户未付款前取消订单)|6-退款通过(用户已经付款但是商城还未发货,用户发出退款申请,商城同意退款)|7-退货通过(用户已经确认收货后用户发出退货申请,商城同意所有退货申请 ,一个订单可能有多个单品)|61-6的发起状态|71-7的发起状态]")
private int status;
@ApiModelProperty(value = " 价格")
@ApiModelProperty(value = "实付价格")
private BigDecimal skuPriceActual;
//todo hujiachen 缺券相关 活动相关
@ApiModelProperty(value = "原价")
private BigDecimal price;
@ApiModelProperty(value = "优惠价")
private BigDecimal priceVoucher;
private static final GoblinStoreOrderListSkuVo obj = new GoblinStoreOrderListSkuVo();
......
......@@ -71,8 +71,6 @@ public class AdamMemberOrderAdminServiceImpl extends ServiceImpl<AdamMemberOrder
private String applyUrl;
@Value("${liquidnet.service.platform.urls.memberRefundNotify}")
private String notifyUrl;
@Value("${liquidnet.service.kylin.url}")
private String serviceKylinUrl;
@Override
public List<MemberOrderDto> getMemberOrderList(MemberOrderListReq memberOrderListReq) {
......@@ -210,12 +208,6 @@ public class AdamMemberOrderAdminServiceImpl extends ServiceImpl<AdamMemberOrder
reason = existsRefund.getRefundReason();
}
try {
HttpUtil.post(serviceKylinUrl + "/inner/cache/member?uid=".concat(memberOrder.getUid()), CollectionUtil.linkedMultiValueMapStringString());
} catch (Exception e) {
log.error("会员管理:会员订单:退款:同步Kylin会员缓存异常[UID={}]", memberOrder.getUid());
}
LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<>();
linkedMultiValueMap.add("code", memberOrder.getPayNo());
linkedMultiValueMap.add("notifyUrl", notifyUrl);
......
......@@ -182,6 +182,7 @@ public class GoblinFrontSelectGoodsServiceImpl extends ServiceImpl<GoblinFrontSe
redisGoblinUtil.set(GoblinRedisConst.SELECT_GOODS,list);
redisGoblinUtil.del(GoblinRedisConst.SELECT_GOODS_SPUIDS);
redisGoblinUtil.del(GoblinRedisConst.SELECT_GOODS_PAGE1);
redisGoblinUtil.del(GoblinRedisConst.SELECT_GOODS_SPUIDS_ISHAVE);
}
return true;
......
......@@ -6,7 +6,7 @@ liquidnet:
username: user
password: user123
eureka:
host: 172.17.192.6:7001
host: 172.17.207.177:7001
#instance:
# prefer-ip-address: true
#host: eureka-test-0.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-1.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-2.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka
......
......@@ -31,9 +31,9 @@ public class RestTemplateConfig {
PoolingHttpClientConnectionManager poolConnManager =
new PoolingHttpClientConnectionManager(20, TimeUnit.SECONDS);
// 设置最大链接数
poolConnManager.setMaxTotal(2000 * getMaxCpuCore() + 1000);
poolConnManager.setMaxTotal(80 * getMaxCpuCore() * 5);
// 单路由的并发数
poolConnManager.setDefaultMaxPerRoute(1000 * getMaxCpuCore());
poolConnManager.setDefaultMaxPerRoute(80 * getMaxCpuCore());
HttpClientBuilder httpClientBuilder = HttpClients.custom();
httpClientBuilder.setConnectionManager(poolConnManager);
......
package com.liquidnet.common.cache.redis.config;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.HashMap;
@Slf4j
public class RedisStreamConfig {
private String hostname;
{
try {
hostname = InetAddress.getLocalHost().getHostName() + "_";
} catch (UnknownHostException ignored) {
hostname = "";
}
}
public void initStream(StringRedisTemplate stringRedisTemplate, String key, String group) {
try {
if (stringRedisTemplate.hasKey(key)) {
log.info("redis stream exist[{},{}]", key, group);
} else {
log.info("redis stream init[{},{}]", key, group);
HashMap<String, String> map = new HashMap<>();
map.put("message", " ");
StreamOperations<String, Object, Object> stringObjectObjectStreamOperations = stringRedisTemplate.opsForStream();
RecordId recordId = stringObjectObjectStreamOperations.add(StreamRecords.mapBacked(map).withStreamKey(key));
stringObjectObjectStreamOperations.createGroup(key, group);
stringObjectObjectStreamOperations.delete(key, recordId.getValue());
}
} catch (Exception e) {
log.error("Ex:redis stream init [{},{}]", key, group, e);
}
}
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
public String getConsumerName(String consumerSeqname) {
return hostname.concat(consumerSeqname);
}
}
......@@ -115,6 +115,6 @@ logging:
com:
liquidnet: info
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
......@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: error
......
......@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: error
......
......@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: error
......
......@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: error
......
......@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: error
......
......@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: error
......
......@@ -18,8 +18,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: error
......
......@@ -10,8 +10,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: error
......
......@@ -10,6 +10,7 @@ liquidnet:
path: /data/logs
name: service-kylin
config: classpath:logback-spring.xml
file-max-size: 200MB
level: debug
mysql:
database-name: dev_ln_scene
......
......@@ -10,6 +10,7 @@ liquidnet:
path: /data/logs
name: service-kylin
config: classpath:logback-spring.xml
file-max-size: 200MB
level: info
mysql:
database-name: test_ln_scene
......
......@@ -11,7 +11,7 @@ liquidnet:
name: service-order
config: classpath:logback-spring.xml
file-max-size: 200MB
level: info
level: debug
mysql:
database-name: test_ln_scene
mongodb:
......
......@@ -22,8 +22,8 @@ logging:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{56}:%line] - %msg%n'
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: info
......
......@@ -379,11 +379,11 @@ public class AdamLoginController {
@ApiOperation(value = "注销")
@PostMapping(value = {"close"})
public ResponseDto<Object> close() {
log.info("###close_uid:{}", CurrentUtil.getCurrentUid());
this.logout();
adamUserService.close(CurrentUtil.getCurrentUid());
// log.info("###close_uid:{}", CurrentUtil.getCurrentUid());
//
// this.logout();
//
// adamUserService.close(CurrentUtil.getCurrentUid());
return ResponseDto.success();
}
......
......@@ -389,9 +389,9 @@ public class AdamMemberOrderServiceImpl implements IAdamMemberOrderService {
);
try {
HttpUtil.post(serviceKylinUrl + "/inner/cache/member?uid=".concat(handleMemberOrderVo.getUid()), CollectionUtil.linkedMultiValueMapStringString());
HttpUtil.get(serviceKylinUrl + "/inner/cache/member/".concat(handleMemberOrderVo.getUid()), CollectionUtil.linkedMultiValueMapStringString());
} catch (Exception e) {
log.error("购买会员支付回调处理成功:同步Kylin会员缓存异常[UID={}]", handleMemberOrderVo.getUid());
log.error("购买会员支付回调处理成功:同步Kylin会员缓存异常[UID={},URL={}]", handleMemberOrderVo.getUid(), serviceKylinUrl, e);
}
return ResponseDto.success();
} catch (Exception e) {
......
......@@ -6,7 +6,7 @@ liquidnet:
username: user
password: user123
eureka:
host: 172.17.192.6:7001
host: 172.17.207.177:7001
#instance:
# prefer-ip-address: true
#host: eureka-test-0.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-1.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-2.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka
......
......@@ -6,7 +6,7 @@ liquidnet:
username: user
password: user123
eureka:
host: 172.17.192.6:7001
host: 172.17.207.177:7001
#instance:
# prefer-ip-address: true
#host: eureka-test-0.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-1.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-2.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka
......
......@@ -6,7 +6,7 @@ liquidnet:
username: user
password: user123
eureka:
host: 172.17.192.6:7001
host: 172.17.207.177:7001
#instance:
# prefer-ip-address: true
#host: eureka-test-0.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-1.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-2.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka
......
package com.liquidnet.service.consumer.adam.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.consumer.adam.receiver.ConsumerAdamSmsNoticeRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -13,24 +14,13 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.*;
import static com.liquidnet.service.base.constant.MQConst.AdamQueue;
@Configuration
public class ConsumerAdamSmsSenderRedisStreamConfig {
public class ConsumerAdamSmsSenderRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerAdamSmsNoticeRdsReceiver consumerAdamSmsNoticeRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 短信通知
*
......@@ -39,16 +29,22 @@ public class ConsumerAdamSmsSenderRedisStreamConfig {
* @return
*/
private Subscription receiveSqlURegister(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(AdamQueue.SMS_NOTICE.getGroup(), AdamQueue.SMS_NOTICE.name() + t),
StreamOffset.create(AdamQueue.SMS_NOTICE.getKey(), ReadOffset.lastConsumed()), consumerAdamSmsNoticeRdsReceiver
);
return listenerContainer.receiveAutoAck(Consumer.from(AdamQueue.SMS_NOTICE.getGroup(), getConsumerName(AdamQueue.SMS_NOTICE.name() + t)),
StreamOffset.create(AdamQueue.SMS_NOTICE.getKey(), ReadOffset.lastConsumed()), consumerAdamSmsNoticeRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 短信通知 */
@Bean
public Subscription subscriptionSmsNotice0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
......@@ -121,13 +117,5 @@ public class ConsumerAdamSmsSenderRedisStreamConfig {
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice10(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 10);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.adam.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.adam.receiver.ConsumerChimeUserOperationDisLikeRdsReceiver;
import com.liquidnet.service.consumer.adam.receiver.ConsumerChimeUserOperationLikeRdsReceiver;
......@@ -15,24 +16,13 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerChimeRedisStreamConfig {
public class ConsumerChimeRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerChimeUserOperationDisLikeRdsReceiver consumerChimeUserOperationDisLikeRdsReceiver;
@Autowired
ConsumerChimeUserOperationLikeRdsReceiver consumerChimeUserOperationLikeRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 划卡-喜欢
*
......@@ -41,10 +31,8 @@ public class ConsumerChimeRedisStreamConfig {
* @return
*/
private Subscription receiveUserOperationLike(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.ChimeQueue.USER_OPERATION_LIKE.getGroup(), MQConst.ChimeQueue.USER_OPERATION_LIKE.name() + t),
StreamOffset.create(MQConst.ChimeQueue.USER_OPERATION_LIKE.getKey(), ReadOffset.lastConsumed()), consumerChimeUserOperationLikeRdsReceiver
);
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.ChimeQueue.USER_OPERATION_LIKE.getGroup(), getConsumerName(MQConst.ChimeQueue.USER_OPERATION_LIKE.name() + t)),
StreamOffset.create(MQConst.ChimeQueue.USER_OPERATION_LIKE.getKey(), ReadOffset.lastConsumed()), consumerChimeUserOperationLikeRdsReceiver);
}
/**
......@@ -55,15 +43,21 @@ public class ConsumerChimeRedisStreamConfig {
* @return
*/
private Subscription receiveUserOperationDisLike(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getGroup(), MQConst.ChimeQueue.USER_OPERATION_DISLIKE.name() + t),
StreamOffset.create(MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getKey(), ReadOffset.lastConsumed()), consumerChimeUserOperationDisLikeRdsReceiver
);
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getGroup(), getConsumerName(MQConst.ChimeQueue.USER_OPERATION_DISLIKE.name() + t)),
StreamOffset.create(MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getKey(), ReadOffset.lastConsumed()), consumerChimeUserOperationDisLikeRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 用户注册 */
/* -------------------------------------------------------- | 划卡-喜欢 */
@Bean
public Subscription subscriptionUserOperationLike0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveUserOperationLike(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionUserOperationLike1(RedisConnectionFactory factory) {
......@@ -137,16 +131,16 @@ public class ConsumerChimeRedisStreamConfig {
return subscription;
}
/* -------------------------------------------------------- | 划卡-不喜欢 */
@Bean
public Subscription subscriptionUserOperationLike10(RedisConnectionFactory factory) {
public Subscription subscriptionUserOperationDisLike0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveUserOperationLike(listenerContainer, 10);
var subscription = receiveUserOperationDisLike(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | 用户中心 */
@Bean
public Subscription subscriptionUserOperationDisLike1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
......@@ -219,12 +213,6 @@ public class ConsumerChimeRedisStreamConfig {
return subscription;
}
@Bean
public Subscription subscriptionUserOperationDisLike10(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveUserOperationDisLike(listenerContainer, 10);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
......@@ -23,27 +23,25 @@ public abstract class AbstractChimeRedisReceiver implements StreamListener<Strin
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception ignored) {
}
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
}
private boolean consumerMessageHandler(String msg) {
Boolean aBoolean = false;
boolean aBoolean = false;
try {
ChimeUserOperLogVo textMessage = JsonUtils.fromJson(msg, ChimeUserOperLogVo.class);
if (textMessage == null) {
......@@ -54,10 +52,9 @@ public abstract class AbstractChimeRedisReceiver implements StreamListener<Strin
//创建操作日志
chimeDataUtils.createUserOperLog(textMessage);
aBoolean = true;
log.info("consumerMessageHandler.msg===> ",msg);
}
} catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e);
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
......
......@@ -25,26 +25,24 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSmsSendHandler(message.getValue().get("message"));
log.info("CONSUMER SMS RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SMS RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception ignored) {
}
log.error("#CONSUMER SMS EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER SMS EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
}
private boolean consumerSmsSendHandler(String msg) {
private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class);
......@@ -52,7 +50,7 @@ public abstract class AbstractSmsRedisReceiver implements StreamListener<String,
ObjectNode templateParam = smsMessage.getTemplateParam();
aBoolean = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), null == templateParam ? "" : templateParam.toString());
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
......
......@@ -3,7 +3,6 @@ package com.liquidnet.service.consumer.adam.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.adam.service.IBaseDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -23,36 +22,30 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]",redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception ignored) {
}
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
}
private boolean consumerSqlDaoHandler(String msg) {
Boolean aBoolean = false;
private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) {
aBoolean = true;
} else {
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
}
aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e);
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
......
......@@ -6,7 +6,7 @@ liquidnet:
username: user
password: user123
eureka:
host: 172.17.192.6:7001
host: 172.17.207.177:7001
#instance:
# prefer-ip-address: true
#host: eureka-test-0.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-1.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-2.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka
......
package com.liquidnet.service.consumer.candy.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponBackRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -13,25 +14,13 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_BACK;
@Configuration
public class ConsumerCandyCouponBackRedisStreamConfig {
public class ConsumerCandyCouponBackRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerCandyCouponBackRdsReceiver consumerCandyCouponBackRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
......@@ -40,10 +29,8 @@ public class ConsumerCandyCouponBackRedisStreamConfig {
* @return
*/
private Subscription receiveSqlCandyCouponBack(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(COUPON_BACK.getGroup(), COUPON_BACK.name() + t),
StreamOffset.create(COUPON_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponBackRdsReceiver
);
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_BACK.getGroup(), getConsumerName(COUPON_BACK.name() + t)),
StreamOffset.create(COUPON_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponBackRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
......@@ -51,15 +38,15 @@ public class ConsumerCandyCouponBackRedisStreamConfig {
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionSqlCandyCouponBack(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponBack0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCandyCouponBack2(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponBack1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
listenerContainer.start();
......@@ -67,9 +54,9 @@ public class ConsumerCandyCouponBackRedisStreamConfig {
}
@Bean
public Subscription subscriptionSqlCandyCouponBack3(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponBack2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
......
package com.liquidnet.service.consumer.candy.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponOrderBackRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -13,25 +14,13 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_ORDER_BACK;
@Configuration
public class ConsumerCandyCouponOrderBackRedisStreamConfig {
public class ConsumerCandyCouponOrderBackRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerCandyCouponOrderBackRdsReceiver consumerCandyCouponOrderBackRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
......@@ -40,10 +29,8 @@ public class ConsumerCandyCouponOrderBackRedisStreamConfig {
* @return
*/
private Subscription receiveSqlCandyCouponBack(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(COUPON_ORDER_BACK.getGroup(), COUPON_ORDER_BACK.name() + t),
StreamOffset.create(COUPON_ORDER_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponOrderBackRdsReceiver
);
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_ORDER_BACK.getGroup(), getConsumerName(COUPON_ORDER_BACK.name() + t)),
StreamOffset.create(COUPON_ORDER_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponOrderBackRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
......@@ -51,15 +38,15 @@ public class ConsumerCandyCouponOrderBackRedisStreamConfig {
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionSqlCandyCouponOrderBack(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponOrderBack0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCandyCouponOrderBack2(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponOrderBack1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
listenerContainer.start();
......@@ -67,9 +54,9 @@ public class ConsumerCandyCouponOrderBackRedisStreamConfig {
}
@Bean
public Subscription subscriptionSqlCandyCouponOrderBack3(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponOrderBack2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
var subscription = receiveSqlCandyCouponBack(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
......
package com.liquidnet.service.consumer.candy.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponReceiveRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -13,24 +14,13 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_RECEIVE;
@Configuration
public class ConsumerCandyCouponReceiveRedisStreamConfig {
public class ConsumerCandyCouponReceiveRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerCandyCouponReceiveRdsReceiver consumerCandyCouponReceiveRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
......@@ -39,10 +29,8 @@ public class ConsumerCandyCouponReceiveRedisStreamConfig {
* @return
*/
private Subscription receiveSqlCandyCouponReceive(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(COUPON_RECEIVE.getGroup(), COUPON_RECEIVE.name() + t),
StreamOffset.create(COUPON_RECEIVE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponReceiveRdsReceiver
);
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_RECEIVE.getGroup(), getConsumerName(COUPON_RECEIVE.name() + t)),
StreamOffset.create(COUPON_RECEIVE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponReceiveRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
......@@ -50,15 +38,15 @@ public class ConsumerCandyCouponReceiveRedisStreamConfig {
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionSqlCandyCouponReceive(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponReceive0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 1);
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCandyCouponReceive2(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponReceive1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 1);
listenerContainer.start();
......@@ -66,9 +54,9 @@ public class ConsumerCandyCouponReceiveRedisStreamConfig {
}
@Bean
public Subscription subscriptionSqlCandyCouponReceive3(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponReceive2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 1);
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
......
package com.liquidnet.service.consumer.candy.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponUseRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -13,24 +14,13 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_USE;
@Configuration
public class ConsumerCandyCouponUseRedisStreamConfig {
public class ConsumerCandyCouponUseRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerCandyCouponUseRdsReceiver consumerCandyCouponUseRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
......@@ -39,8 +29,7 @@ public class ConsumerCandyCouponUseRedisStreamConfig {
* @return
*/
private Subscription receiveSqlCandyCouponUse(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(COUPON_USE.getGroup(), COUPON_USE.name() + t),
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_USE.getGroup(), getConsumerName(COUPON_USE.name() + t)),
StreamOffset.create(COUPON_USE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponUseRdsReceiver
);
}
......@@ -50,15 +39,15 @@ public class ConsumerCandyCouponUseRedisStreamConfig {
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionSqlCandyCouponUse(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponUse0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponUse(listenerContainer, 1);
var subscription = receiveSqlCandyCouponUse(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlCandyCouponUse2(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponUse1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponUse(listenerContainer, 1);
listenerContainer.start();
......@@ -66,9 +55,9 @@ public class ConsumerCandyCouponUseRedisStreamConfig {
}
@Bean
public Subscription subscriptionSqlCandyCouponUse3(RedisConnectionFactory factory) {
public Subscription subscriptionSqlCandyCouponUse2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlCandyCouponUse(listenerContainer, 1);
var subscription = receiveSqlCandyCouponUse(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
......
......@@ -26,27 +26,24 @@ public abstract class AbstractCouponOrderBackRedisReceiver implements StreamList
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlOperationCouponOrderBackHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception ignored) {
}
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
}
private boolean consumerSqlOperationCouponOrderBackHandler(String msg) {
private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
OrderCloseMapping.orderCloseMessage mqMessage = JsonUtils.fromJson(msg, OrderCloseMapping.orderCloseMessage.class);
......@@ -64,7 +61,7 @@ public abstract class AbstractCouponOrderBackRedisReceiver implements StreamList
aBoolean = true;
} catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e);
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
......
package com.liquidnet.service.consumer.candy.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
......@@ -20,42 +19,33 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
private IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception ignored) {
}
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
}
private boolean consumerSqlDaoHandler(String msg) {
Boolean aBoolean = false;
private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) {
aBoolean = true;
} else {
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
}
aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e);
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
......
......@@ -6,7 +6,7 @@ liquidnet:
username: user
password: user123
eureka:
host: 172.17.192.6:7001
host: 172.17.207.177:7001
#instance:
# prefer-ip-address: true
# host: eureka-test-0.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-1.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-2.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka
......
package com.liquidnet.service.consumer.goblin.config;
package com.liquidnet.service.consumer.dragon.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinStoreOrderRdsReceiver;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinUserOrderRdsReceiver;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.consumer.dragon.receiver.RedisMchNotifyFailReceiver;
import com.liquidnet.service.dragon.constant.DragonConstant;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
......@@ -15,64 +15,62 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: TODO
* @class: ConsumerPayRedisStreamConfig
* @Package com.liquidnet.service.consumer.dragon.config
* @Copyright: LightNet @ Copyright (c) 2021
* @date 2022/3/11 14:45
*/
@Configuration
public class ConsumerGoblinUserOrderRedisStreamConfig {
public class ConsumerMchNotifyFailRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinUserOrderRdsReceiver consumerGoblinUserOrderRdsReceiver;
private RedisMchNotifyFailReceiver redisMchNotifyFailReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
private Subscription buildReceiverSubscription(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_ERROR_GROUP.getCode(), getConsumerName(DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_ERROR_KEY.name() + t)),
StreamOffset.create(DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_ERROR_KEY.getCode(), ReadOffset.lastConsumed()), redisMchNotifyFailReceiver);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinUserOrder(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.getGroup(), MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.getKey(), ReadOffset.lastConsumed()), consumerGoblinUserOrderRdsReceiver
);
@Bean
public Subscription subscriptionMchNotifyFail01(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = buildReceiverSubscription(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinUserOrder(RedisConnectionFactory factory) {
public Subscription subscriptionMchNotifyFail02(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinUserOrder(listenerContainer, 1);
var subscription = buildReceiverSubscription(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinUserOrder2(RedisConnectionFactory factory) {
public Subscription subscriptionMchNotifyFail03(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinUserOrder(listenerContainer, 2);
var subscription = buildReceiverSubscription(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinUserOrder3(RedisConnectionFactory factory) {
public Subscription subscriptionMchNotifyFail04(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinUserOrder(listenerContainer, 3);
var subscription = buildReceiverSubscription(listenerContainer, 4);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
@Bean
public Subscription subscriptionMchNotifyFail05(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = buildReceiverSubscription(listenerContainer, 5);
listenerContainer.start();
return subscription;
}
}
package com.liquidnet.service.consumer.goblin.config;
package com.liquidnet.service.consumer.dragon.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderAGRdsReceiver;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinXlsRdsReceiver;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.consumer.dragon.receiver.RedisMchNotifyReceiver;
import com.liquidnet.service.dragon.constant.DragonConstant;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
......@@ -15,64 +15,62 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: TODO
* @class: ConsumerPayRedisStreamConfig
* @Package com.liquidnet.service.consumer.dragon.config
* @Copyright: LightNet @ Copyright (c) 2021
* @date 2022/3/11 14:45
*/
@Configuration
public class ConsumerGoblinXlsRedisStreamConfig {
public class ConsumerMchNotifyRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinXlsRdsReceiver consumerGoblinXlsRdsReceiver;
private RedisMchNotifyReceiver redisMchNotifyReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
private Subscription buildReceiverSubscription(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_GROUP.getCode(), getConsumerName(DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_KEY.name() + t)),
StreamOffset.create(DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_KEY.getCode(), ReadOffset.lastConsumed()), redisMchNotifyReceiver);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinXls(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getGroup(), MQConst.GoblinQueue.GOBLIN_XLS_OPERA.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getKey(), ReadOffset.lastConsumed()), consumerGoblinXlsRdsReceiver
);
@Bean
public Subscription subscriptionMchNotify01(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = buildReceiverSubscription(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinXls1(RedisConnectionFactory factory) {
public Subscription subscriptionMchNotify02(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinXls(listenerContainer, 1);
var subscription = buildReceiverSubscription(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinXls2(RedisConnectionFactory factory) {
public Subscription subscriptionMchNotify03(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinXls(listenerContainer, 2);
var subscription = buildReceiverSubscription(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinXls3(RedisConnectionFactory factory) {
public Subscription subscriptionMchNotify04(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinXls(listenerContainer, 3);
var subscription = buildReceiverSubscription(listenerContainer, 4);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
@Bean
public Subscription subscriptionMchNotify05(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = buildReceiverSubscription(listenerContainer, 5);
listenerContainer.start();
return subscription;
}
}
package com.liquidnet.service.consumer.goblin.config;
package com.liquidnet.service.consumer.dragon.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinStoreMarketRdsReceiver;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.consumer.dragon.receiver.RedisPayNotifyReceiver;
import com.liquidnet.service.dragon.constant.DragonConstant;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
......@@ -14,64 +15,62 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: TODO
* @class: ConsumerPayRedisStreamConfig
* @Package com.liquidnet.service.consumer.dragon.config
* @Copyright: LightNet @ Copyright (c) 2021
* @date 2022/3/11 14:45
*/
@Configuration
public class ConsumerGoblinStoneMarketRedisStreamConfig {
public class ConsumerPayNotifyRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinStoreMarketRdsReceiver consumerGoblinStoreMarketRdsReceiver;
private RedisPayNotifyReceiver redisPayNotifyReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
private Subscription buildReceiverSubscription(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_NOTIFY_GROUP.getCode(), getConsumerName(DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_NOTIFY_KEY.name() + t)),
StreamOffset.create(DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_NOTIFY_KEY.getCode(), ReadOffset.lastConsumed()), redisPayNotifyReceiver);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinStoreMarket(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_STORE_MARKET.getGroup(), MQConst.GoblinQueue.GOBLIN_STORE_MARKET.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_STORE_MARKET.getKey(), ReadOffset.lastConsumed()), consumerGoblinStoreMarketRdsReceiver
);
@Bean
public Subscription subscriptionPayNotify01(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = buildReceiverSubscription(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinStoreMarket(RedisConnectionFactory factory) {
public Subscription subscriptionPayNotify02(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreMarket(listenerContainer, 1);
var subscription = buildReceiverSubscription(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinStoreMarket2(RedisConnectionFactory factory) {
public Subscription subscriptionPayNotify03(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreMarket(listenerContainer, 2);
var subscription = buildReceiverSubscription(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinStoreMarket3(RedisConnectionFactory factory) {
public Subscription subscriptionPayNotify04(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreMarket(listenerContainer, 3);
var subscription = buildReceiverSubscription(listenerContainer, 4);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
@Bean
public Subscription subscriptionPayNotify05(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = buildReceiverSubscription(listenerContainer, 5);
listenerContainer.start();
return subscription;
}
}
package com.liquidnet.service.consumer.goblin.config;
package com.liquidnet.service.consumer.dragon.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinShopCartReceiver;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.consumer.dragon.receiver.RedisRefundReceiver;
import com.liquidnet.service.dragon.constant.DragonConstant;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
......@@ -14,65 +15,62 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: TODO
* @class: ConsumerRefundRedisStreamConfig
* @Package com.liquidnet.service.consumer.dragon.config
* @Copyright: LightNet @ Copyright (c) 2021
* @date 2022/3/11 14:45
*/
@Configuration
public class ConsumerGoblinShopCartStreamConfig {
public class ConsumerRefundRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinShopCartReceiver consumerGoblinShopCartReceiver;
private RedisRefundReceiver redisRefundReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 购物车
* @return Subscription
*/
private Subscription receiveSqlShopCart(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_SHOP_CART.getGroup(), MQConst.GoblinQueue.GOBLIN_SHOP_CART.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_SHOP_CART.getKey(), ReadOffset.lastConsumed()), consumerGoblinShopCartReceiver
);
private Subscription buildReceiverSubscription(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(Consumer.from(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), getConsumerName(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.name() + t)),
StreamOffset.create(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), ReadOffset.lastConsumed()), redisRefundReceiver);
}
/* -------------------------------------------------------- | */
@Bean
public Subscription subscriptionSqlShopCart1(RedisConnectionFactory factory) {
public Subscription subscriptionRefund01(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlShopCart(listenerContainer, 1);
var subscription = buildReceiverSubscription(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlShopCart2(RedisConnectionFactory factory) {
public Subscription subscriptionRefund02(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlShopCart(listenerContainer, 2);
var subscription = buildReceiverSubscription(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlShopCart3(RedisConnectionFactory factory) {
public Subscription subscriptionRefund03(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlShopCart(listenerContainer, 2);
var subscription = buildReceiverSubscription(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlShopCart4(RedisConnectionFactory factory) {
public Subscription subscriptionRefund04(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlShopCart(listenerContainer, 2);
var subscription = buildReceiverSubscription(listenerContainer, 4);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionRefund05(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = buildReceiverSubscription(listenerContainer, 5);
listenerContainer.start();
return subscription;
}
}
package com.liquidnet.service.consumer.dragon.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao;
......@@ -30,72 +31,106 @@ public abstract class AbstractRedisReceiver implements StreamListener<String, Ma
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis PAY key:{} 的消息",getRedisStreamKey());
log.info("message id " + message.getId());
log.info("stream " + message.getStream());
log.info("body " + message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
// if(result){
// 消费成功确认,消息删除和消息确认是一个事务
log.info("consumer success delete message messageId:{} ",message.getId());
try {
// stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
// stringRedisTemplate.exec();
} catch (Exception e) {
e.printStackTrace();
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
}finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("error: {}",e);
}
}
// }
}
private boolean consumerSqlDaoHandler(String msg) {
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
return true;
}else{
sendMySqlRedis(msg);
}
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", this.getRedisStreamKey(), result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
e.printStackTrace();
log.error("CONSUMER SQL Exception error:{}", e);
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", this.getRedisStreamKey(), result, message.getValue(), e);
}
return false;
}
/**
* 给 REDIS 队列发送消息 数据库相关
*
* @param msg 接收到的内容
* @return
*/
private boolean sendMySqlRedis(String msg) {
private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
HashMap<String, String> map = new HashMap<>();
map.put("message", msg);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(getRedisStreamKey());
stringRedisTemplate.opsForStream().add(record);
return true;
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) {
e.printStackTrace();
return false;
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
// @Override
// public void onMessage(MapRecord<String, String, String> message) {
// log.info("接受到来自redis PAY key:{} 的消息",getRedisStreamKey());
// log.info("message id " + message.getId());
// log.info("stream " + message.getStream());
// log.info("body " + message.getValue());
// boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
//
//// if(result){
// // 消费成功确认,消息删除和消息确认是一个事务
// log.info("consumer success delete message messageId:{} ",message.getId());
// try {
//// stringRedisTemplate.multi();
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
//// stringRedisTemplate.exec();
// } catch (Exception e) {
// e.printStackTrace();
// log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
// }finally {
// try {
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
// } catch (Exception e) {
// log.error("error: {}",e);
// }
// }
//// }
// }
//
// private boolean consumerMessageHandler(String msg) {
// try {
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
// log.debug("CONSUMER MSG ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
// log.debug("CONSUMER MSG ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
// Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// log.debug("CONSUMER MSG result of execution:{}", rstBatchSqls);
// if (rstBatchSqls) {
// return true;
// } else {
// sendMySqlRedis(msg);
// }
// } catch (Exception e) {
// log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
// }
// return false;
// }
//
// /**
// * 给 REDIS 队列发送消息 数据库相关
// *
// * @param msg 接收到的内容
// * @return
// */
// private boolean sendMySqlRedis(String msg) {
// try {
// HashMap<String, String> map = new HashMap<>();
// map.put("message", msg);
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(getRedisStreamKey()));
// return true;
// } catch (Exception e) {
// e.printStackTrace();
// return false;
// }
// }
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
......
package com.liquidnet.service.consumer.dragon.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao;
......@@ -26,63 +27,99 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis REFUND 的消息");
log.info("message id " + message.getId());
log.info("stream " + message.getStream());
log.info("body " + message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
// if(result){
log.info("consumer success delete message messageId:{} ",message.getId());
try {
// stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
// stringRedisTemplate.exec();
} catch (Exception e) {
log.error("delete redis queue message Exception error: {} ",e);
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
}finally {
stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
}
// }
}
String redisStreamKey = DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
private boolean consumerSqlDaoHandler(String msg) {
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
return true;
}else{
sendMySqlRedis(msg);
}
stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
} catch (Exception e) {
e.printStackTrace();
log.error("CONSUMER SQL Exception error:{}", e);
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
}
return false;
}
/**
* 给 REDIS 队列发送消息 数据库相关
*
* @param msg 接收到的内容
* @return
*/
private boolean sendMySqlRedis(String msg) {
private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
HashMap<String, String> map = new HashMap<>();
map.put("message", msg);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode());
stringRedisTemplate.opsForStream().add(record);
return true;
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) {
e.printStackTrace();
return false;
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode()));
}
}
return aBoolean;
}
// @Override
// public void onMessage(MapRecord<String, String, String> message) {
// log.info("接受到来自redis REFUND 的消息");
// log.info("message id " + message.getId());
// log.info("stream " + message.getStream());
// log.info("body " + message.getValue());
// boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
//// if(result){
// log.info("consumer success delete message messageId:{} ",message.getId());
// try {
//// stringRedisTemplate.multi();
// stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
// stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
//// stringRedisTemplate.exec();
// } catch (Exception e) {
// log.error("delete redis queue message Exception error: {} ",e);
// log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
// }finally {
// stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
// stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
// }
//// }
// }
//
// private boolean consumerSqlDaoHandler(String msg) {
// try {
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
// log.debug("CONSUMER MSG ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
// log.debug("CONSUMER MSG ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
// Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// log.debug("CONSUMER MSG result of execution:{}", rstBatchSqls);
// if (rstBatchSqls) {
// return true;
// }else{
// sendMySqlRedis(msg);
// }
// } catch (Exception e) {
// e.printStackTrace();
// log.error("CONSUMER MSG Exception error:{}", e);
// }
// return false;
// }
//
// /**
// * 给 REDIS 队列发送消息 数据库相关
// *
// * @param msg 接收到的内容
// * @return
// */
// private boolean sendMySqlRedis(String msg) {
// try {
// HashMap<String, String> map = new HashMap<>();
// map.put("message", msg);
// MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode());
// stringRedisTemplate.opsForStream().add(record);
// return true;
// } catch (Exception e) {
// e.printStackTrace();
// return false;
// }
// }
}
\ No newline at end of file
......@@ -6,7 +6,7 @@ liquidnet:
username: user
password: user123
eureka:
host: 172.17.192.6:7001
host: 172.17.207.177:7001
#instance:
# prefer-ip-address: true
#host: eureka-test-0.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-1.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-2.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>liquidnet-service-consumer-all</artifactId>
<groupId>com.liquidnet</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>liquidnet-service-consumer-goblin</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-cache-redis</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-service-goblin-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</build>
</project>
\ No newline at end of file
package com.liquidnet.service;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.env.Environment;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
@Slf4j
@SpringBootApplication(scanBasePackages = {"com.liquidnet"})
@MapperScan(basePackages = {"com.liquidnet.service.goblin.mapper"})
public class ServiceConsumerGoblinApplication implements CommandLineRunner {
@Autowired
private Environment environment;
public static void main(String[] args) {
SpringApplication.run(ServiceConsumerGoblinApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
try {
log.info("\n----------------------------------------------------------\n\t" +
"Application '{}' is running! Access URLs:\n\t" +
"Local: \t\thttp://127.0.0.1:{}\n\t" +
"External: \thttp://{}:{}{}/doc.html\n\t" +
"Profile(s): \t{}\n----------------------------------------------------------",
environment.getProperty("spring.application.name"),
environment.getProperty("server.port"),
InetAddress.getLocalHost().getHostAddress(),
environment.getProperty("server.port"),
environment.getProperty("server.servlet.context-path"),
Arrays.toString(environment.getActiveProfiles()));
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
package com.liquidnet.service.consumer.goblin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderAGRdsReceiver;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderCPRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinOrderAGRedisStreamConfig {
@Autowired
ConsumerGoblinOrderAGRdsReceiver consumerGoblinOrderAGRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinOrderAG(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup(), MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderAGRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinOrderAG(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderAG2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderAG3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.goblin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderCPRdsReceiver;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinSelfMarketRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinOrderCPRedisStreamConfig {
@Autowired
ConsumerGoblinOrderCPRdsReceiver consumerGoblinOrderCPRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinOrderCP(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup(), MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCPRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinOrderCP(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderCP(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderCP2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderCP(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderCP3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderCP(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.goblin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderCPRdsReceiver;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderCloseRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinOrderCloseRedisStreamConfig {
@Autowired
ConsumerGoblinOrderCloseRdsReceiver consumerGoblinOrderCloseRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinOrderClose(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getGroup(), MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderCloseRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinOrderClose(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderClose(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderClose2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderClose(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderClose3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderClose(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.goblin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinSelfMarketRdsReceiver;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinStoreMarketRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinSelfMarketRedisStreamConfig {
@Autowired
ConsumerGoblinSelfMarketRdsReceiver consumerGoblinSelfMarketRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinSelfMarket(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_SELF_MARKET.getGroup(), MQConst.GoblinQueue.GOBLIN_SELF_MARKET.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_SELF_MARKET.getKey(), ReadOffset.lastConsumed()), consumerGoblinSelfMarketRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinSelfMarket(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinSelfMarket(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinSelfMarket2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinSelfMarket(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinSelfMarket3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinSelfMarket(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.goblin.config;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinOrderCloseRdsReceiver;
import com.liquidnet.service.consumer.goblin.receiver.ConsumerGoblinStoreOrderRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinStoreOrderRedisStreamConfig {
@Autowired
ConsumerGoblinStoreOrderRdsReceiver consumerGoblinStoreOrderRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveGoblinStoreOrder(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getGroup(), MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getKey(), ReadOffset.lastConsumed()), consumerGoblinStoreOrderRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinStoreOrder(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreOrder(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinStoreOrder2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreOrder(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinStoreOrder3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinStoreOrder(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.goblin.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode
public class PhoneDto {
private String uid;
private String mobile;
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.goblin.service.IBaseDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.util.HashMap;
@Slf4j
public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
private IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception ignored) {
}
}
}
private boolean consumerSqlDaoHandler(String msg) {
Boolean aBoolean = false;
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
if (sqlMessage == null) {
aBoolean = true;
} else {
aBoolean = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
}
} catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
\ No newline at end of file
package com.liquidnet.service.consumer.goblin.receiver;
import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.read.listener.PageReadListener;
import com.alibaba.fastjson.JSON;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.consumer.goblin.dto.PhoneDto;
import com.liquidnet.service.consumer.goblin.service.IBaseDao;
import com.liquidnet.service.goblin.constant.GoblinRedisConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.io.InputStream;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.LinkedList;
@Slf4j
public abstract class AbstractXlsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
private IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
private static final String SQL_INSERT_GOODS_BUY_ROSTER_LOG = "INSERT INTO goblin_goods_buy_roster_log (sku_id,buy_roster,buy_roster_type,parsing_result,created_at)VALUES(?,?,?,?,?)";
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER XLS [streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"), Integer.parseInt(message.getValue().get("type")), message.getValue().get("skuId"));
log.info("CONSUMER XLS_PATH RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception ignored) {
}
}
}
private boolean consumerSqlDaoHandler(String xlsPath, Integer type, String skuId) {
LinkedList<Object[]> objs = CollectionUtil.linkedListObjectArr();
Boolean aBoolean = false;
try {
URL url = new URL(xlsPath);
InputStream is = url.openStream();
EasyExcel.read(is, PhoneDto.class, new PageReadListener<PhoneDto>(dataList -> {
for (PhoneDto data : dataList) {
if (data.getMobile() == null) {
continue;
}
String redisKey = GoblinRedisConst.REDIS_CAN_BUY.concat(skuId + ":").concat(data.getMobile());
if (type.equals(1)) {//添加
if (log.isDebugEnabled()) {
log.debug("添加 读取到一条数据{}", JSON.toJSONString(data));
}
redisUtil.set(redisKey, 0);
} else {
if (log.isDebugEnabled()) {
log.debug("删除 读取到一条数据{}", JSON.toJSONString(data));
}
redisUtil.del(redisKey);
}
}
})).sheet().doRead();
objs.add(new Object[]{skuId, xlsPath, type, 1, LocalDateTime.now()});
} catch (Exception e) {
objs.add(new Object[]{skuId, xlsPath, type, 2, LocalDateTime.now()});
aBoolean = false;
log.error("CONSUMER XLS FAIL ==> {}", e.getMessage(), e);
} finally {
try {
baseDao.batchSql(SQL_INSERT_GOODS_BUY_ROSTER_LOG, objs);
} catch (Exception e) {
}
// if (!aBoolean) {
// HashMap<String, String> map = CollectionUtil.mapStringString();
// map.put("message", xlsPath);
// map.put("skuId", skuId);
// map.put("type", type.toString());
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
// }
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
\ No newline at end of file
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinOrderAGRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinOrderCPRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CREATE_PAY.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinOrderCloseRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_ORDER_CLOSE.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinSelfMarketRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_SELF_MARKET.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_SELF_MARKET.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinShopCartReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_SHOP_CART.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_SHOP_CART.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinSqlCouponRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.SQL_COUPON.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.SQL_COUPON.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinSqlGoodsRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.SQL_GOODS.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.SQL_GOODS.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinSqlStoreRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.SQL_STORE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.SQL_STORE.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinStoreMarketRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_STORE_MARKET.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_STORE_MARKET.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinStoreOrderRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_STORE_ORDER_OPERA.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinUserOrderRdsReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_USER_ORDER_OPERA.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.receiver;
import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component;
@Component
public class ConsumerGoblinXlsRdsReceiver extends AbstractXlsRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getGroup();
}
}
package com.liquidnet.service.consumer.goblin.service;
import java.util.LinkedList;
public interface IBaseDao {
/**
* 批量执行sql
*
* @param sql
* @param values
* @return
*/
Boolean batchSql(String sql, LinkedList<Object[]> values);
/**
* 批量执行不定量sql
*
* @param sql
* @param values
* @return
*/
Boolean batchSqls(LinkedList<String> sql, LinkedList<Object[]>... values);
/**
* 执行sql语句 无 参数
*
* @param sql
* @return
*/
Boolean batchSqlNoArgs(LinkedList<String> sql);
/**
* xs 新增一条记录且返回主键Id
*
* @param sql 新增待执行sql
* @param param 参数
* @return 主键ID
*/
int insertSqlAndReturnKeyId(final String sql, final Object[] param);
}
package com.liquidnet.service.consumer.goblin.service.impl;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.consumer.goblin.service.IBaseDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.LinkedList;
@Service
public class BaseDao implements IBaseDao {
private static final Logger log = LoggerFactory.getLogger(BaseDao.class);
@Resource
public JdbcTemplate jdbcTemplate;
@Resource(name = "transactionManager")
public DataSourceTransactionManager transactionManager;
@Override
public Boolean batchSql(final String sql, final LinkedList<Object[]> values) {
TransactionCallback<Boolean> callback = new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(final TransactionStatus transactionStatus) {
if (values.size() > 0) {
int[] ints = jdbcTemplate.batchUpdate(sql, values);
}
return true;
}
};
try {
TransactionTemplate tt = new TransactionTemplate(transactionManager);
return tt.execute(callback);
} catch (Exception ex) {
log.error("###\nSQL.Preparing:{}\nParameters:{}", JsonUtils.toJson(sql), JsonUtils.toJson(values), ex);
return false;
}
}
@Override
public Boolean batchSqls(final LinkedList<String> sql,
final LinkedList<Object[]>... values) {
try {
TransactionCallback<Boolean> callback = new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(final TransactionStatus transactionStatus) {
int i = 0;
for (LinkedList<Object[]> o : values) {
if (sql.size() < i + 1) {
break;
}
if (!o.isEmpty()) {
jdbcTemplate.batchUpdate(sql.get(i), o);
}
i++;
}
return true;
}
};
TransactionTemplate tt = new TransactionTemplate(transactionManager);
return tt.execute(callback);
} catch (Exception ex) {
// if (ex instanceof LiquidnetServiceException) {
// log.error("###Error.Code:{} - {}", ((LiquidnetServiceException) ex).getCode(), ex.getMessage());
// } else {
log.error("###Error.Sqls:{}\nParameters:{},Ex:{}", JsonUtils.toJson(sql), JsonUtils.toJson(values), ex.getMessage());
// }
return false;
}
}
@Override
public Boolean batchSqlNoArgs(final LinkedList<String> sql) {
try {
TransactionCallback<Boolean> callback = new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(final TransactionStatus transactionStatus) {
for (String o : sql) {
jdbcTemplate.execute(o);
}
return true;
}
};
TransactionTemplate tt = new TransactionTemplate(transactionManager);
return tt.execute(callback);
} catch (Exception ex) {
log.error("###Error.Sqls:{}\nParameters:{},Ex:{}", sql);
return false;
}
}
/**
* xs 新增一条记录且返回主键Id
*
* @param sql 新增待执行sql
* @param param 参数
* @return 主键ID
*/
public int insertSqlAndReturnKeyId(final String sql, final Object[] param) {
final String innersql = sql;
final Object[] innerO = param;
KeyHolder keyHolder = new GeneratedKeyHolder();
try {
jdbcTemplate.update(new PreparedStatementCreator() {
@Override
public PreparedStatement createPreparedStatement(final Connection con)
throws SQLException {
PreparedStatement ps = con.prepareStatement(innersql,
Statement.RETURN_GENERATED_KEYS);
for (int i = 0; i < innerO.length; i++) {
ps.setObject(i + 1, innerO[i]);
}
return ps;
}
}, keyHolder);
} catch (Exception e) {
log.error("###\nSQL.Preparing:{}\nParameters:{}", sql, JsonUtils.toJson(param), e);
}
return keyHolder.getKey().intValue();
}
}
# begin-dev-这里是配置信息基本值
liquidnet:
cloudConfig:
profile: dev
security:
username: user
password: user123
eureka:
host: 127.0.0.1:7001
# end-dev-这里是配置信息基本值
spring:
profiles:
include: service-consumer-goblin
# begin-prod-这里是配置信息基本值
liquidnet:
cloudConfig:
profile: prod
security:
username: user
password: user123
eureka:
host: 172.17.207.189:7001
# end-prod-这里是配置信息基本值
spring:
profiles:
include: service-consumer-goblin
\ No newline at end of file
# begin-test-这里是配置信息基本值
liquidnet:
cloudConfig:
profile: test
security:
username: user
password: user123
eureka:
host: 172.17.192.6:7001
#instance:
# prefer-ip-address: true
#host: eureka-test-0.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-1.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-2.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka
# end-test-这里是配置信息基本值
spring:
profiles:
include: service-consumer-goblin
\ No newline at end of file
spring:
application:
name: liquidnet-service-consumer-goblin
profiles:
active: dev
\ No newline at end of file
//package com.liquidnet.service.consumer.kylin.config;
//
//import com.liquidnet.service.consumer.kylin.receiver.ConsumerCandyCouponBackRdsReceiver;
//import lombok.var;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.redis.connection.RedisConnectionFactory;
//import org.springframework.data.redis.connection.stream.Consumer;
//import org.springframework.data.redis.connection.stream.MapRecord;
//import org.springframework.data.redis.connection.stream.ReadOffset;
//import org.springframework.data.redis.connection.stream.StreamOffset;
//import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//import org.springframework.data.redis.stream.Subscription;
//
//import java.time.Duration;
//
//import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_BACK;
//
//
//@Configuration
//public class ConsumerCandyCouponBackRedisStreamConfig {
// @Autowired
// ConsumerCandyCouponBackRdsReceiver consumerCandyCouponBackRdsReceiver;
//
// private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
// var options = StreamMessageListenerContainer
// .StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofMillis(1))
// .build();
// return StreamMessageListenerContainer.create(factory, options);
// }
//
// /**
// * 缺票登记
// *
// * @param listenerContainer
// * @param t
// * @return
// */
// private Subscription receiveSqlCandyCouponBack(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
// return listenerContainer.receiveAutoAck(
// Consumer.from(COUPON_BACK.getGroup(), COUPON_BACK.name() + t),
// StreamOffset.create(COUPON_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponBackRdsReceiver
// );
// }
//
// /* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
// /* -------------------------------------------------------- | 缺票登记 */
//
// @Bean
// public Subscription subscriptionSqlCandyCouponBack(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// @Bean
// public Subscription subscriptionSqlCandyCouponBack2(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// @Bean
// public Subscription subscriptionSqlCandyCouponBack3(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// /* -------------------------------------------------------- | */
//}
//package com.liquidnet.service.consumer.kylin.config;
//
//import com.liquidnet.service.consumer.kylin.receiver.ConsumerCandyCouponBackRdsReceiver;
//import com.liquidnet.service.consumer.kylin.receiver.ConsumerCandyCouponOrderBackRdsReceiver;
//import lombok.var;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.redis.connection.RedisConnectionFactory;
//import org.springframework.data.redis.connection.stream.Consumer;
//import org.springframework.data.redis.connection.stream.MapRecord;
//import org.springframework.data.redis.connection.stream.ReadOffset;
//import org.springframework.data.redis.connection.stream.StreamOffset;
//import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//import org.springframework.data.redis.stream.Subscription;
//
//import java.time.Duration;
//
//import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_BACK;
//import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_ORDER_BACK;
//
//
//@Configuration
//public class ConsumerCandyCouponOrderBackRedisStreamConfig {
// @Autowired
// ConsumerCandyCouponOrderBackRdsReceiver consumerCandyCouponOrderBackRdsReceiver;
//
// private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
// var options = StreamMessageListenerContainer
// .StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofMillis(1))
// .build();
// return StreamMessageListenerContainer.create(factory, options);
// }
//
// /**
// * 缺票登记
// *
// * @param listenerContainer
// * @param t
// * @return
// */
// private Subscription receiveSqlCandyCouponBack(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
// return listenerContainer.receiveAutoAck(
// Consumer.from(COUPON_ORDER_BACK.getGroup(), COUPON_ORDER_BACK.name() + t),
// StreamOffset.create(COUPON_ORDER_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponOrderBackRdsReceiver
// );
// }
//
// /* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
// /* -------------------------------------------------------- | 缺票登记 */
//
// @Bean
// public Subscription subscriptionSqlCandyCouponOrderBack(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// @Bean
// public Subscription subscriptionSqlCandyCouponOrderBack2(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// @Bean
// public Subscription subscriptionSqlCandyCouponOrderBack3(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// /* -------------------------------------------------------- | */
//}
//package com.liquidnet.service.consumer.kylin.config;
//
//import com.liquidnet.service.consumer.kylin.receiver.ConsumerCandyCouponReceiveRdsReceiver;
//import lombok.var;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.redis.connection.RedisConnectionFactory;
//import org.springframework.data.redis.connection.stream.Consumer;
//import org.springframework.data.redis.connection.stream.MapRecord;
//import org.springframework.data.redis.connection.stream.ReadOffset;
//import org.springframework.data.redis.connection.stream.StreamOffset;
//import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//import org.springframework.data.redis.stream.Subscription;
//
//import java.time.Duration;
//
//import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_RECEIVE;
//import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_USE;
//import static com.liquidnet.service.base.constant.MQConst.SweetQueue.SWEET_USER_INSERT_DRAW;
//
//@Configuration
//public class ConsumerCandyCouponReceiveRedisStreamConfig {
// @Autowired
// ConsumerCandyCouponReceiveRdsReceiver consumerCandyCouponReceiveRdsReceiver;
//
// private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
// var options = StreamMessageListenerContainer
// .StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofMillis(1))
// .build();
// return StreamMessageListenerContainer.create(factory, options);
// }
//
// /**
// * 缺票登记
// *
// * @param listenerContainer
// * @param t
// * @return
// */
// private Subscription receiveSqlCandyCouponReceive(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
// return listenerContainer.receiveAutoAck(
// Consumer.from(COUPON_RECEIVE.getGroup(), COUPON_RECEIVE.name() + t),
// StreamOffset.create(COUPON_RECEIVE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponReceiveRdsReceiver
// );
// }
//
// /* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
// /* -------------------------------------------------------- | 缺票登记 */
//
// @Bean
// public Subscription subscriptionSqlCandyCouponReceive(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponReceive(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// @Bean
// public Subscription subscriptionSqlCandyCouponReceive2(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponReceive(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// @Bean
// public Subscription subscriptionSqlCandyCouponReceive3(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponReceive(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// /* -------------------------------------------------------- | */
//}
//package com.liquidnet.service.consumer.kylin.config;
//
//import com.liquidnet.service.consumer.kylin.receiver.ConsumerCandyCouponUseRdsReceiver;
//import lombok.var;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.redis.connection.RedisConnectionFactory;
//import org.springframework.data.redis.connection.stream.Consumer;
//import org.springframework.data.redis.connection.stream.MapRecord;
//import org.springframework.data.redis.connection.stream.ReadOffset;
//import org.springframework.data.redis.connection.stream.StreamOffset;
//import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//import org.springframework.data.redis.stream.Subscription;
//
//import java.time.Duration;
//
//import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_USE;
//
//@Configuration
//public class ConsumerCandyCouponUseRedisStreamConfig {
// @Autowired
// ConsumerCandyCouponUseRdsReceiver consumerCandyCouponUseRdsReceiver;
//
// private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
// var options = StreamMessageListenerContainer
// .StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofMillis(1))
// .build();
// return StreamMessageListenerContainer.create(factory, options);
// }
//
// /**
// * 缺票登记
// *
// * @param listenerContainer
// * @param t
// * @return
// */
// private Subscription receiveSqlCandyCouponUse(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
// return listenerContainer.receiveAutoAck(
// Consumer.from(COUPON_USE.getGroup(), COUPON_USE.name() + t),
// StreamOffset.create(COUPON_USE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponUseRdsReceiver
// );
// }
//
// /* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
// /* -------------------------------------------------------- | 缺票登记 */
//
// @Bean
// public Subscription subscriptionSqlCandyCouponUse(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponUse(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// @Bean
// public Subscription subscriptionSqlCandyCouponUse2(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponUse(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// @Bean
// public Subscription subscriptionSqlCandyCouponUse3(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveSqlCandyCouponUse(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
//
// /* -------------------------------------------------------- | */
//}
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerGoblinOrderAGRdsReceiver;
import lombok.var;
......@@ -14,23 +15,11 @@ import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
@Configuration
public class ConsumerGoblinOrderAGRedisStreamConfig {
public class ConsumerGoblinOrderAGRedisStreamConfig extends RedisStreamConfig {
@Autowired
ConsumerGoblinOrderAGRdsReceiver consumerGoblinOrderAGRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
......@@ -39,10 +28,8 @@ public class ConsumerGoblinOrderAGRedisStreamConfig {
* @return
*/
private Subscription receiveGoblinOrderAG(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup(), MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.name() + t),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderAGRdsReceiver
);
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.name() + t)),
StreamOffset.create(MQConst.GoblinQueue.GOBLIN_ORDER_AGAIN.getKey(), ReadOffset.lastConsumed()), consumerGoblinOrderAGRdsReceiver);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
......@@ -50,25 +37,25 @@ public class ConsumerGoblinOrderAGRedisStreamConfig {
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionGoblinOrderAG(RedisConnectionFactory factory) {
public Subscription subscriptionGoblinOrderAG0(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 1);
var subscription = receiveGoblinOrderAG(listenerContainer, 0);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderAG2(RedisConnectionFactory factory) {
public Subscription subscriptionGoblinOrderAG1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 2);
var subscription = receiveGoblinOrderAG(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionGoblinOrderAG3(RedisConnectionFactory factory) {
public Subscription subscriptionGoblinOrderAG2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveGoblinOrderAG(listenerContainer, 3);
var subscription = receiveGoblinOrderAG(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment