记得上下班打卡 | git大法好,push需谨慎

Commit 4468b8d3 authored by 张国柄's avatar 张国柄

Adam:替换RABBIT为REDIS实现MQ;

parent fb4e5638
package com.liquidnet.service.base.constant;
public class MQConst {
public enum AdamQueue {
SMS_NOTICE("adam:stream:rk.sms.notice", "group.sms.sender", "短信通知"),
// SMS_SPREAD("adam:stream:rk.sms.spread", "group.sms.sender", "短信推广"),
SQL_UREGISTER("adam:stream:rk.sql.uregister", "group.sql.ucenter", "用户注册"),
SQL_UCENTER("adam:stream:rk.sql.ucenter", "group.sql.ucenter", "用户中心"),
SQL_UMEMBER("adam:stream:rk.sql.umember", "group.sql.ucenter", "购买会员"),
;
private final String key;
private final String group;
private final String desc;
AdamQueue(String key, String group, String desc) {
this.key = key;
this.group = group;
this.desc = desc;
}
public String getKey() {
return key;
}
public String getGroup() {
return group;
}
public String getDesc() {
return desc;
}
}
}
XADD adam:stream:rk.sms.notice * 0 0
XGROUP CREATE adam:stream:rk.sms.notice group.sms.sender 0
XADD adam:stream:rk.sql.uregister * 0 0
XGROUP CREATE adam:stream:rk.sql.uregister group.sql.ucenter 0
XADD adam:stream:rk.sql.ucenter * 0 0
XGROUP CREATE adam:stream:rk.sql.ucenter group.sql.ucenter 0
XADD adam:stream:rk.sql.umember * 0 0
XGROUP CREATE adam:stream:rk.sql.umember group.sql.ucenter 0
# ==================================================
# XGROUP DESTROY adam:stream:rk.sms.notice group.sms.sender 0
......@@ -73,8 +73,6 @@ public class AdamLoginController {
@Autowired
IAdamUserService adamUserService;
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
SmsProcessor smsProcessor;
@Value("${liquidnet.reviewer.app-login.mobile}")
......
package com.liquidnet.service.adam.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.liquidnet.common.exception.LiquidnetServiceException;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.*;
import com.liquidnet.service.adam.dto.AdamAddressesParam;
import com.liquidnet.service.adam.dto.vo.AdamAddressesVo;
import com.liquidnet.service.adam.entity.AdamAddresses;
import com.liquidnet.service.adam.mapper.AdamAddressesMapper;
import com.liquidnet.service.adam.service.AdamRdmService;
import com.liquidnet.service.adam.service.IAdamAddressesService;
import com.liquidnet.service.adam.util.QueueUtils;
import com.liquidnet.service.base.ErrorMapping;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.result.DeleteResult;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
......@@ -46,7 +42,7 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
@Autowired
MongoTemplate mongoTemplate;
@Autowired
RabbitTemplate rabbitTemplate;
QueueUtils queueUtils;
@Autowired
AdamRdmService adamRdmService;
......@@ -75,7 +71,8 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_addresses.add",
vo.getAddressesId(), vo.getUid(), vo.getName(), vo.getPhone(), vo.getProvince(), vo.getCity(), vo.getCounty(), vo.getAddress(), vo.getIsDefault(), vo.getState(), now
)
......@@ -133,8 +130,10 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
adamRdmService.setAddressesVoByUid(uid, vos);
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
SqlMapping.get("adam_addresses.update.is_default", toMqObjs));
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_addresses.update.is_default", toMqObjs)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
}
}
......@@ -163,7 +162,8 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_addresses.edit",
updateVo.getName(), updateVo.getPhone(), updateVo.getProvince(), updateVo.getCity(), updateVo.getCounty(), updateVo.getAddress(), now, updateVo.getAddressesId()
)
......@@ -196,7 +196,8 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_addresses.remove", now, now, addressesId)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
......
package com.liquidnet.service.adam.service.impl;
import com.github.pagehelper.PageInfo;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.service.adam.dto.vo.AdamCollectBaseVo;
import com.liquidnet.service.adam.dto.vo.AdamCollectInfoVo;
import com.liquidnet.service.adam.dto.vo.AdamCollectVo;
import com.liquidnet.service.adam.service.AdamRdmService;
import com.liquidnet.service.adam.service.IAdamCollectionService;
import com.liquidnet.service.adam.util.QueueUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinPerformanceVo;
import com.mongodb.client.result.DeleteResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
......@@ -41,7 +41,7 @@ public class AdamCollectionServiceImpl implements IAdamCollectionService {
@Autowired
MongoTemplate mongoTemplate;
@Autowired
RabbitTemplate rabbitTemplate;
QueueUtils queueUtils;
@Autowired
AdamRdmService adamRdmService;
......@@ -54,7 +54,8 @@ public class AdamCollectionServiceImpl implements IAdamCollectionService {
mongoTemplate.insert(vo, AdamCollectBaseVo.class.getSimpleName());
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_collection.add",
vo.getUid(), vo.getContentId(), vo.getType(), vo.getState(), now
)
......@@ -82,8 +83,10 @@ public class AdamCollectionServiceImpl implements IAdamCollectionService {
for (String c : contentIds) {
toMqObjs.add(new Object[]{now, uid, c});
}
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
SqlMapping.get("adam_collection.del", toMqObjs));
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_collection.del", toMqObjs)
);
}
}
......
......@@ -2,7 +2,6 @@ package com.liquidnet.service.adam.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.pagehelper.PageInfo;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.service.adam.dto.vo.AdamCollectInfoVo;
import com.liquidnet.service.adam.dto.vo.AdamDisposedBaseVo;
......@@ -12,11 +11,12 @@ import com.liquidnet.service.adam.entity.AdamDisposed;
import com.liquidnet.service.adam.mapper.AdamDisposedMapper;
import com.liquidnet.service.adam.service.AdamRdmService;
import com.liquidnet.service.adam.service.IAdamDisposedService;
import com.liquidnet.service.adam.util.QueueUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinPerformanceVo;
import com.mongodb.client.result.DeleteResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
......@@ -44,7 +44,7 @@ public class AdamDisposedServiceImpl implements IAdamDisposedService {
@Autowired
MongoTemplate mongoTemplate;
@Autowired
RabbitTemplate rabbitTemplate;
QueueUtils queueUtils;
@Autowired
AdamRdmService adamRdmService;
......@@ -62,7 +62,8 @@ public class AdamDisposedServiceImpl implements IAdamDisposedService {
mongoTemplate.insert(vo, AdamDisposedBaseVo.class.getSimpleName());
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_disposed.add",
vo.getUid(), vo.getContentId(), vo.getType(), vo.getState(), now
)
......@@ -90,8 +91,10 @@ public class AdamDisposedServiceImpl implements IAdamDisposedService {
for (String c : contentIds) {
toMqObjs.add(new Object[]{now, uid, c});
}
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
SqlMapping.get("adam_disposed.del", toMqObjs));
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_disposed.del", toMqObjs)
);
}
}
......
......@@ -2,21 +2,21 @@ package com.liquidnet.service.adam.service.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.liquidnet.common.exception.LiquidnetServiceException;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.*;
import com.liquidnet.service.adam.dto.AdamEntersParam;
import com.liquidnet.service.adam.dto.vo.AdamEntersVo;
import com.liquidnet.service.adam.service.AdamRdmService;
import com.liquidnet.service.adam.service.IAdamEntersService;
import com.liquidnet.service.adam.util.QueueUtils;
import com.liquidnet.service.base.ErrorMapping;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.result.DeleteResult;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
......@@ -49,7 +49,7 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
@Autowired
MongoTemplate mongoTemplate;
@Autowired
RabbitTemplate rabbitTemplate;
QueueUtils queueUtils;
@Autowired
AdamRdmService adamRdmService;
......@@ -98,7 +98,8 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
log.debug("#SQL.GET耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
msg
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
......@@ -155,8 +156,10 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
adamRdmService.setEntersVoByUid(uid, vos);
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
SqlMapping.get("adam_enters.update.is_default", toMqObjs));
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_enters.update.is_default", toMqObjs)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
}
}
......@@ -194,7 +197,8 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_enters.edit",
updateVo.getType(), updateVo.getName(), updateVo.getMobile(), updateVo.getIdCard(), updateVo.getIsDefault(), updateVo.getState(), now, updateVo.getEntersId()
)
......@@ -220,7 +224,8 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_enters.remove", now, now, entersId)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
......
......@@ -2,7 +2,6 @@ package com.liquidnet.service.adam.service.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.pagehelper.PageInfo;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.*;
import com.liquidnet.service.adam.dto.AdamMemberOrderCallbackParam;
import com.liquidnet.service.adam.dto.AdamMemberOrderCodeParam;
......@@ -13,12 +12,13 @@ import com.liquidnet.service.adam.service.AdamRdmService;
import com.liquidnet.service.adam.service.IAdamMemberOrderService;
import com.liquidnet.service.adam.service.IAdamUserMemberService;
import com.liquidnet.service.adam.util.MemberUtil;
import com.liquidnet.service.adam.util.QueueUtils;
import com.liquidnet.service.base.ErrorMapping;
import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.domain.PageRequest;
......@@ -45,7 +45,7 @@ public class AdamMemberOrderServiceImpl implements IAdamMemberOrderService {
@Autowired
MongoTemplate mongoTemplate;
@Autowired
RabbitTemplate rabbitTemplate;
QueueUtils queueUtils;
@Autowired
AdamRdmService adamRdmService;
@Autowired
......@@ -303,8 +303,10 @@ public class AdamMemberOrderServiceImpl implements IAdamMemberOrderService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UMEMBER,
SqlMapping.gets(toMqSqls, operationObjs, updateMemberOrderObjs));
queueUtils.sendMsgByRedis(
com.liquidnet.service.base.constant.MQConst.AdamQueue.SQL_UMEMBER.getKey(),
SqlMapping.gets(toMqSqls, operationObjs, updateMemberOrderObjs)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
return ResponseDto.success();
}
......@@ -421,8 +423,10 @@ public class AdamMemberOrderServiceImpl implements IAdamMemberOrderService {
});
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UMEMBER,
SqlMapping.gets(toMqSqls, upsertUserMemberObjs, updateMemberCodeObjs, initMemberOrderObjs));
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UMEMBER.getKey(),
SqlMapping.gets(toMqSqls, upsertUserMemberObjs, updateMemberCodeObjs, initMemberOrderObjs)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
AdamMemberOrderResult result = AdamMemberOrderResult.getNew();
......
package com.liquidnet.service.adam.service.impl;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.service.adam.dto.vo.AdamRealInfoVo;
import com.liquidnet.service.adam.entity.AdamRealName;
import com.liquidnet.service.adam.service.IAdamRealNameService;
import com.liquidnet.service.adam.util.QueueUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;
......@@ -31,7 +31,7 @@ public class AdamRealNameServiceImpl implements IAdamRealNameService {
@Autowired
RedisUtil redisUtil;
@Autowired
RabbitTemplate rabbitTemplate;
QueueUtils queueUtils;
@Override
// @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
......@@ -50,8 +50,10 @@ public class AdamRealNameServiceImpl implements IAdamRealNameService {
realName.getCreatedAt()
);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
SqlMapping.get("adam_real_name.add", paramList.toArray()));
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_real_name.add", paramList.toArray())
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
}
}
package com.liquidnet.service.adam.service.impl;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.core.JwtValidator;
import com.liquidnet.commons.lang.util.BsonUtil;
import com.liquidnet.commons.lang.util.CollectionUtil;
......@@ -9,13 +8,14 @@ import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.adam.dto.vo.AdamUserInfoVo;
import com.liquidnet.service.adam.service.AdamRdmService;
import com.liquidnet.service.adam.service.IAdamUserInfoService;
import com.liquidnet.service.adam.util.QueueUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
......@@ -44,7 +44,7 @@ public class AdamUserInfoServiceImpl implements IAdamUserInfoService {
@Autowired
AdamRdmService adamRdmService;
@Autowired
RabbitTemplate rabbitTemplate;
QueueUtils queueUtils;
@Autowired
RedisUtil redisUtil;
@Autowired
......@@ -92,7 +92,7 @@ public class AdamUserInfoServiceImpl implements IAdamUserInfoService {
log.debug("#SQL.GET耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.gets(toMqSqls, updateUserObjs, updateUserInfoObjs)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
......@@ -124,8 +124,9 @@ public class AdamUserInfoServiceImpl implements IAdamUserInfoService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
SqlMapping.get("adam_user.edit.mobile", mobile, now, uid));
queueUtils.sendMsgByRedis(MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_user.edit.mobile", mobile, now, uid)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
return this.flushSsoProcess(beforeUserInfoVo);
......
......@@ -3,7 +3,6 @@ package com.liquidnet.service.adam.service.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.liquidnet.common.cache.redisson.util.RedisLockUtil;
import com.liquidnet.common.exception.LiquidnetServiceException;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.*;
import com.liquidnet.service.adam.constant.AdamRedisConst;
import com.liquidnet.service.adam.dto.AdamThirdPartParam;
......@@ -14,8 +13,10 @@ import com.liquidnet.service.adam.service.AdamRdmService;
import com.liquidnet.service.adam.service.IAdamEntersService;
import com.liquidnet.service.adam.service.IAdamRealNameService;
import com.liquidnet.service.adam.service.IAdamUserService;
import com.liquidnet.service.adam.util.QueueUtils;
import com.liquidnet.service.base.ErrorMapping;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.result.DeleteResult;
......@@ -23,7 +24,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
......@@ -57,7 +57,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
@Autowired
MongoTemplate mongoTemplate;
@Autowired
RabbitTemplate rabbitTemplate;
QueueUtils queueUtils;
@Autowired
IAdamEntersService adamEntersService;
@Autowired
......@@ -104,7 +104,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
toMqSqls.add(SqlMapping.get("adam_user_info.add"));
initUserInfoObjs.add(new Object[]{userInfoVo.getQrCode(), userInfoVo.getUid()});
log.debug("#SQL.GET耗时:{}ms", System.currentTimeMillis() - s);
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UREGISTER,
queueUtils.sendMsgByRedis(MQConst.AdamQueue.SQL_UREGISTER.getKey(),
SqlMapping.gets(toMqSqls, initUserObjs, initUserInfoObjs)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
......@@ -173,7 +173,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UREGISTER,
queueUtils.sendMsgByRedis(MQConst.AdamQueue.SQL_UREGISTER.getKey(),
SqlMapping.gets(toMqSqls, initUserObjs, initThirdPartObjs)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
......@@ -209,7 +209,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get(
"adam_third_party.add",
thirdPartInfoVo.getUid(), thirdPartInfoVo.getOpenId(), thirdPartInfoVo.getAvatar(), thirdPartInfoVo.getNickname(), thirdPartInfoVo.getPlatform(), thirdPartInfoVo.getState(), thirdPartInfoVo.getCreatedAt()
......@@ -237,7 +237,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
adamRdmService.delThirdPartVoListByUid(bindUid);
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
queueUtils.sendMsgByRedis(MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get(
"adam_third_party.add",
thirdPartInfoVo.getUid(), thirdPartInfoVo.getOpenId(), thirdPartInfoVo.getAvatar(), thirdPartInfoVo.getNickname(), thirdPartInfoVo.getPlatform(), thirdPartInfoVo.getState(), thirdPartInfoVo.getCreatedAt()
......@@ -274,8 +274,9 @@ public class AdamUserServiceImpl implements IAdamUserService {
log.debug("#RDS耗时:{}ms", System.currentTimeMillis() - s);
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
SqlMapping.get("adam_third_party.unbind", now, uid, platform));
queueUtils.sendMsgByRedis(MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.get("adam_third_party.unbind", now, uid, platform)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
}
......@@ -352,8 +353,10 @@ public class AdamUserServiceImpl implements IAdamUserService {
}
s = System.currentTimeMillis();
rabbitTemplate.convertAndSend(MQConst.EX_LNS_SQL_UCENTER, MQConst.RK_SQL_UCENTER,
SqlMapping.gets(toMqSqls, objsUser));
queueUtils.sendMsgByRedis(
MQConst.AdamQueue.SQL_UCENTER.getKey(),
SqlMapping.gets(toMqSqls, objsUser)
);
log.debug("#MQ耗时:{}ms", System.currentTimeMillis() - s);
}
......
package com.liquidnet.service.adam.util;
import com.liquidnet.common.exception.LiquidnetServiceException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
......@@ -19,27 +17,25 @@ public class QueueUtils {
StringRedisTemplate stringRedisTemplate;
/**
* 发送 SqlMapping Json 字符串
* 发送消息 - RABBIT
*
* @param exchange 交换机
* @param route 路径
* @param sqlStr Json字符串
* @param routeKey 路径
* @param jsonMsg Json字符串
*/
public void sendSqlRabbit(String exchange, String route, String sqlStr) {
rabbitTemplate.convertAndSend(exchange, route, sqlStr);
public void sendMsgByRabbit(String exchange, String routeKey, String jsonMsg) {
rabbitTemplate.convertAndSend(exchange, routeKey, jsonMsg);
}
/**
* 给 REDIS 队列发送消息 数据库相关
* 发送消息 - REDIS
*
* @param redisKey RedisKey 消费Key
* @param sqlStr Json字符串
* @return
* @param streamKey Redis消费Key
* @param jsonMsg Json字符串
*/
public void sendSqlRedis(String redisKey, String sqlStr) {
public void sendMsgByRedis(String streamKey, String jsonMsg) {
HashMap<String, String> map = new HashMap<>();
map.put("message", sqlStr);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(redisKey);
stringRedisTemplate.opsForStream().add(record);
map.put("message", jsonMsg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(streamKey));
}
}
......@@ -17,20 +17,6 @@
</properties>
<dependencies>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-mq</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-web</artifactId>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-cache-redisson</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-sms</artifactId>
......
package com.liquidnet.service.consumer.adam.service.config;
import com.liquidnet.service.consumer.adam.service.receiver.ConsumerAdamSmsNoticeRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.*;
@Configuration
public class ConsumerAdamSmsSenderRedisStreamConfig {
@Autowired
ConsumerAdamSmsNoticeRdsReceiver consumerAdamSmsNoticeRdsReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 短信通知
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveSqlURegister(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(AdamQueue.SMS_NOTICE.getGroup(), AdamQueue.SMS_NOTICE.name() + t),
StreamOffset.create(AdamQueue.SMS_NOTICE.getKey(), ReadOffset.lastConsumed()), consumerAdamSmsNoticeRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 短信通知 */
@Bean
public Subscription subscriptionSmsNotice1(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 2);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 3);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice4(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 4);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice5(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 5);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice6(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 6);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice7(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 7);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice8(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 8);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice9(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 9);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSmsNotice10(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlURegister(listenerContainer, 10);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.adam.service.processor;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.common.sms.processor.SmsProcessor;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SmsMessage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
/**
* ConsumerAdamSmsProcessor.class
*
* @author zhanggb
* Created by IntelliJ IDEA at 2021/7/13
*/
@Slf4j
@Component
public class ConsumerAdamSmsProcessor {
@Resource
SmsProcessor smsProcessor;
private void consumerSmsSendHandler(Message msg, Channel channel) {
MessageProperties properties = msg.getMessageProperties();
String consumerQueue = properties.getConsumerQueue();
long deliveryTag = properties.getDeliveryTag();
log.info("CONSUMER SMS ==> [consumerQueue:{},deliveryTag:{}]", consumerQueue, deliveryTag);
String msgBody = new String(msg.getBody());
log.debug("CONSUMER SMS ==> Preparing:{}", msgBody);
try {
SmsMessage smsMessage = JsonUtils.fromJson(msgBody, SmsMessage.class);
boolean result = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
log.debug("CONSUMER SMS result of execution:{}", result);
if (result) {
channel.basicAck(deliveryTag, false);
} else {
log.warn("###CONSUMER SMS[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody);
channel.basicAck(deliveryTag, false);
}
} catch (IOException e) {
log.error("CONSUMER SMS[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody, e);
}
}
/* ================================================================== | 短信验证码 */
//package com.liquidnet.service.consumer.adam.service.processor;
//
//import com.liquidnet.common.mq.constant.MQConst;
//import com.liquidnet.common.sms.processor.SmsProcessor;
//import com.liquidnet.commons.lang.util.JsonUtils;
//import com.liquidnet.service.base.SmsMessage;
//import com.rabbitmq.client.Channel;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.core.MessageProperties;
//import org.springframework.amqp.rabbit.annotation.Exchange;
//import org.springframework.amqp.rabbit.annotation.Queue;
//import org.springframework.amqp.rabbit.annotation.QueueBinding;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.Resource;
//import java.io.IOException;
//
///**
// * ConsumerAdamSmsProcessor.class
// *
// * @author zhanggb
// * Created by IntelliJ IDEA at 2021/7/13
// */
//@Slf4j
//@Component
//public class ConsumerAdamSmsProcessor {
// @Resource
// SmsProcessor smsProcessor;
//
// private void consumerSmsSendHandler(Message msg, Channel channel) {
// MessageProperties properties = msg.getMessageProperties();
// String consumerQueue = properties.getConsumerQueue();
// long deliveryTag = properties.getDeliveryTag();
// log.info("CONSUMER SMS ==> [consumerQueue:{},deliveryTag:{}]", consumerQueue, deliveryTag);
// String msgBody = new String(msg.getBody());
// log.debug("CONSUMER SMS ==> Preparing:{}", msgBody);
// try {
// SmsMessage smsMessage = JsonUtils.fromJson(msgBody, SmsMessage.class);
// boolean result = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
// log.debug("CONSUMER SMS result of execution:{}", result);
// if (result) {
// channel.basicAck(deliveryTag, false);
// } else {
// log.warn("###CONSUMER SMS[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody);
// channel.basicAck(deliveryTag, false);
// }
// } catch (IOException e) {
// log.error("CONSUMER SMS[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody, e);
// }
// }
//
// /* ================================================================== | 短信验证码 */
//
//// @RabbitListener(
//// bindings = @QueueBinding(
//// exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
//// key = MQConst.RK_SMS_CODE,
//// value = @Queue(MQConst.QUEUES_SMS_CODE)
//// ),
//// concurrency = "25"
//// )
//// public void consumerSqlForSmsCode(Message msg, Channel channel) {
//// this.consumerSmsSendHandler(msg, channel);
//// }
//
// /* ================================================================== | 短信通知 */
//
// @RabbitListener(
// bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
// key = MQConst.RK_SMS_CODE,
// value = @Queue(MQConst.QUEUES_SMS_CODE)
// key = MQConst.RK_SMS_NOTICE,
// value = @Queue(MQConst.QUEUES_SMS_NOTICE)
// ),
// concurrency = "25"
// concurrency = "10"
// )
// public void consumerSqlForSmsCode(Message msg, Channel channel) {
// public void consumerSqlForSmsNotice(Message msg, Channel channel) {
// this.consumerSmsSendHandler(msg, channel);
// }
/* ================================================================== | 短信通知 */
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
key = MQConst.RK_SMS_NOTICE,
value = @Queue(MQConst.QUEUES_SMS_NOTICE)
),
concurrency = "10"
)
public void consumerSqlForSmsNotice(Message msg, Channel channel) {
this.consumerSmsSendHandler(msg, channel);
}
/* ================================================================== | */
}
//
//
// /* ================================================================== | */
//}
package com.liquidnet.service.consumer.adam.service.receiver;
import com.liquidnet.common.sms.processor.SmsProcessor;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SmsMessage;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.adam.service.IBaseDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import javax.annotation.Resource;
@Slf4j
public abstract class AbstractSmsRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Resource
SmsProcessor smsProcessor;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSmsSendHandler(message.getValue().get("message"));
log.debug("CONSUMER SMS RESULT:{}", result);
// 消费成功确认,消息删除和消息确认是一个事务
if (result) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId());
try {
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SMS SUC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
}
}
}
private boolean consumerSmsSendHandler(String msg) {
try {
SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class);
return smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false;
}
}
protected abstract String getRedisStreamKey();
}
package com.liquidnet.service.consumer.adam.service.receiver;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.adam.service.IBaseDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
@Slf4j
public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
private IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.debug("CONSUMER SQL RESULT:{}", result);
// 消费成功确认,消息删除和消息确认是一个事务
if (result) {
log.info("CONSUMER SMS SUCC ==> MESSAGE_ID:{}", message.getId());
try {
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("CONSUMER SMS SUCC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]", message.getId(), e);
}
}
}
private boolean consumerSqlDaoHandler(String msg) {
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
return baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) {
log.error("CONSUMER SMS FAIL ==> {}", e.getMessage(), e);
return false;
}
}
protected abstract String getRedisStreamKey();
}
package com.liquidnet.service.consumer.adam.service.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerAdamSmsNoticeRdsReceiver extends AbstractSmsRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.AdamQueue.SMS_NOTICE.getKey();
}
}
package com.liquidnet.service.consumer.adam.service.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerAdamUCenterRdsReceiverSql extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.AdamQueue.SQL_UCENTER.getKey();
}
}
package com.liquidnet.service.consumer.adam.service.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerAdamUMemberRdsReceiverSql extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.AdamQueue.SQL_UMEMBER.getKey();
}
}
package com.liquidnet.service.consumer.adam.service.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerAdamURegisterRdsReceiverSql extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.AdamQueue.SQL_UREGISTER.getKey();
}
}
package com.liquidnet.service.consumer.service.processor;
import com.liquidnet.service.consumer.adam.service.processor.ConsumerAdamUCenterProcessor;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ConsumerAdamUCenterProcessorTest {
@Autowired
ConsumerAdamUCenterProcessor consumerAdamUCenterProcessor;
@Test
public void test() {
String msg = "{\"sqls\":[\"INSERT INTO kylin_order_tickets(order_tickets_id,user_id,user_name,user_mobile,performance_title,order_code,qr_code,order_type,order_version,number,price,price_member,price_total,price_voucher,price_actual,price_express,price_refund,refund_number,pay_type,payment_type,time_pay,express_contacts,express_address,express_phone,coupon_type,get_ticket_type,get_ticket_describe,pay_countdown_minute,`comment`,created_at,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_status(order_ticket_status_id ,order_id ,express_type ,is_student ,transfer_status ,`status` ,pay_status ,created_at ,updated_at)VALUES(?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_relations(order_ticket_relations_id ,order_id ,transfer_id ,live_id ,agent_id ,is_member ,performance_id ,time_id,ticket_id ,created_at ,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_entities(order_ticket_entities_id ,order_id ,ticket_id ,user_id ,time_id ,performance_id ,enter_type ,enter_name ,enter_mobile,enter_id_code,`status`,sys_damai,check_client,is_payment,`comment`,created_at,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)\"],\"args\":[[[\"73901653639766016\",\"73459712083042304\",\"\",\"15001268203\",\"0607演出0607\",\"T7390165363976516260\",\"\",\"\",\"\",1,0.01,0.01,0.01,0.0,0.01,0.00,0.0,0,\"alipay\",null,null,\"\",\"\",\"\",\"no\",\"electronic\",\"\",5,null,\"2021-06-07T22:18:47.264\",null]],[[\"73901653694291968\",\"73901653639766016\",2,0,0,0,0,\"2021-06-07T22:18:47.267\",null]],[[\"73901653694291969\",\"73901653639766016\",\"\",\"\",\"0\",0,\"73776030099382272\",\"73776073304907776\",\"73776225042243584\",\"2021-06-07T22:18:47.267\",null]],[[\"73901653694291970\",\"73901653639766016\",\"73776225042243584\",\"73459712083042304\",\"73776073304907776\",\"73776030099382272\",0,\"\",\"\",\"\",0,0,\"\",0,\"\",\"2021-06-07T22:18:47.267\",null]]]}";
// consumerAdamProcessor.consumerSqlForUCenterA(null, null);
}
}
......@@ -17,20 +17,6 @@
</properties>
<dependencies>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-mq</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-web</artifactId>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-cache-redis</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-service-dragon-api</artifactId>
......
package com.liquidnet.service.consumer.dragon.service.processor;
import com.liquidnet.common.mq.constant.MQConst;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
/**
* ConsumerAdamProcessor.class
*
* @author zhanggb
* Created by IntelliJ IDEA at 2021/4/29
*/
@Slf4j
@Component
public class ConsumerAdamUCenterProcessor {
// @Resource
// IBaseDao baseDao;
//
// private void consumerSqlDaoHandler(Message msg, Channel channel) {
// MessageProperties properties = msg.getMessageProperties();
// String consumerQueue = properties.getConsumerQueue();
// long deliveryTag = properties.getDeliveryTag();
// log.info("CONSUMER SQL ==> [consumerQueue:{},deliveryTag:{}]", consumerQueue, deliveryTag);
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(new String(msg.getBody()), SqlMapping.SqlMessage.class);
// log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
// log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
// try {
// Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
// if (rstBatchSqls) {
// channel.basicAck(deliveryTag, false);
// } else {
// log.warn("###CONSUMER SQL[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, JsonUtils.toJson(sqlMessage));
// channel.basicAck(deliveryTag, false);
// }
// } catch (IOException e) {
// log.error("CONSUMER SQL[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, JsonUtils.toJson(sqlMessage), e);
// }
// }
//
// /* ================================================================== | 用户注册 */
//
// @RabbitListener(
// bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER),
// key = MQConst.RK_SQL_UREGISTER,
// value = @Queue(MQConst.QUEUES_SQL_UREGISTER)
// ),
// concurrency = "5"
// )
// public void consumerSqlForURegister(Message msg, Channel channel) {
// this.consumerSqlDaoHandler(msg, channel);
// }
//
// /* ================================================================== | 用户信息 */
//
// @RabbitListener(
// bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER),
// key = MQConst.RK_SQL_UCENTER,
// value = @Queue(MQConst.QUEUES_SQL_UCENTER)
// ),
// concurrency = "5"
// )
// public void consumerSqlForUCenter(Message msg, Channel channel) {
// this.consumerSqlDaoHandler(msg, channel);
// }
//
// /* ================================================================== | 会员购买 */
//
// @RabbitListener(
// bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER),
// key = MQConst.RK_SQL_UMEMBER,
// value = @Queue(MQConst.QUEUES_SQL_UMEMBER)
// ),
// concurrency = "5"
// )
// public void consumerSqlForUMember(Message msg, Channel channel) {
// this.consumerSqlDaoHandler(msg, channel);
// }
/* ================================================================== | */
}
package com.liquidnet.service.consumer.service.processor;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ConsumerDragonUCenterProcessorTest {
@Autowired
ConsumerDragonUCenterProcessorTest consumerAdamUCenterProcessor;
@Test
public void test() {
String msg = "{\"sqls\":[\"INSERT INTO kylin_order_tickets(order_tickets_id,user_id,user_name,user_mobile,performance_title,order_code,qr_code,order_type,order_version,number,price,price_member,price_total,price_voucher,price_actual,price_express,price_refund,refund_number,pay_type,payment_type,time_pay,express_contacts,express_address,express_phone,coupon_type,get_ticket_type,get_ticket_describe,pay_countdown_minute,`comment`,created_at,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_status(order_ticket_status_id ,order_id ,express_type ,is_student ,transfer_status ,`status` ,pay_status ,created_at ,updated_at)VALUES(?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_relations(order_ticket_relations_id ,order_id ,transfer_id ,live_id ,agent_id ,is_member ,performance_id ,time_id,ticket_id ,created_at ,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_entities(order_ticket_entities_id ,order_id ,ticket_id ,user_id ,time_id ,performance_id ,enter_type ,enter_name ,enter_mobile,enter_id_code,`status`,sys_damai,check_client,is_payment,`comment`,created_at,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)\"],\"args\":[[[\"73901653639766016\",\"73459712083042304\",\"\",\"15001268203\",\"0607演出0607\",\"T7390165363976516260\",\"\",\"\",\"\",1,0.01,0.01,0.01,0.0,0.01,0.00,0.0,0,\"alipay\",null,null,\"\",\"\",\"\",\"no\",\"electronic\",\"\",5,null,\"2021-06-07T22:18:47.264\",null]],[[\"73901653694291968\",\"73901653639766016\",2,0,0,0,0,\"2021-06-07T22:18:47.267\",null]],[[\"73901653694291969\",\"73901653639766016\",\"\",\"\",\"0\",0,\"73776030099382272\",\"73776073304907776\",\"73776225042243584\",\"2021-06-07T22:18:47.267\",null]],[[\"73901653694291970\",\"73901653639766016\",\"73776225042243584\",\"73459712083042304\",\"73776073304907776\",\"73776030099382272\",0,\"\",\"\",\"\",0,0,\"\",0,\"\",\"2021-06-07T22:18:47.267\",null]]]}";
// consumerAdamProcessor.consumerSqlForUCenterA(null, null);
}
}
......@@ -17,11 +17,6 @@
</modules>
<dependencies>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-web</artifactId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment