记得上下班打卡 | git大法好,push需谨慎
Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liquidnet-bus-v1
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
董敬伟
liquidnet-bus-v1
Commits
3e8b549a
Commit
3e8b549a
authored
Jul 26, 2021
by
张国柄
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dev' into test
parents
1c89b7f7
8044627a
Changes
30
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
30 changed files
with
976 additions
and
441 deletions
+976
-441
pom.xml
...dnet-common-cache/liquidnet-common-cache-redisson/pom.xml
+4
-0
MQConst.java
...ain/java/com/liquidnet/service/base/constant/MQConst.java
+33
-0
pom.xml
liquidnet-bus-common/pom.xml
+12
-1
redis_queue_create.txt
...m/liquidnet-service-adam-impl/docu/redis_queue_create.txt
+16
-0
AdamLoginController.java
...iquidnet/service/adam/controller/AdamLoginController.java
+0
-2
AdamAddressesServiceImpl.java
...t/service/adam/service/impl/AdamAddressesServiceImpl.java
+13
-12
AdamCollectionServiceImpl.java
.../service/adam/service/impl/AdamCollectionServiceImpl.java
+9
-6
AdamDisposedServiceImpl.java
...et/service/adam/service/impl/AdamDisposedServiceImpl.java
+9
-6
AdamEntersServiceImpl.java
...dnet/service/adam/service/impl/AdamEntersServiceImpl.java
+13
-8
AdamMemberOrderServiceImpl.java
...service/adam/service/impl/AdamMemberOrderServiceImpl.java
+11
-7
AdamRealNameServiceImpl.java
...et/service/adam/service/impl/AdamRealNameServiceImpl.java
+7
-5
AdamUserInfoServiceImpl.java
...et/service/adam/service/impl/AdamUserInfoServiceImpl.java
+7
-6
AdamUserServiceImpl.java
...uidnet/service/adam/service/impl/AdamUserServiceImpl.java
+14
-11
QueueUtils.java
...main/java/com/liquidnet/service/adam/util/QueueUtils.java
+11
-15
pom.xml
...vice-consumer-all/liquidnet-service-consumer-adam/pom.xml
+0
-14
ConsumerAdamSmsSenderRedisStreamConfig.java
...ervice/config/ConsumerAdamSmsSenderRedisStreamConfig.java
+133
-0
ConsumerAdamSqlUcenterRedisStreamConfig.java
...rvice/config/ConsumerAdamSqlUcenterRedisStreamConfig.java
+332
-0
ConsumerAdamSmsProcessor.java
...umer/adam/service/processor/ConsumerAdamSmsProcessor.java
+77
-77
ConsumerAdamUCenterProcessor.java
.../adam/service/processor/ConsumerAdamUCenterProcessor.java
+114
-114
AbstractSmsRedisReceiver.java
...sumer/adam/service/receiver/AbstractSmsRedisReceiver.java
+55
-0
AbstractSqlRedisReceiver.java
...sumer/adam/service/receiver/AbstractSqlRedisReceiver.java
+50
-0
ConsumerAdamSmsNoticeRdsReceiver.java
...am/service/receiver/ConsumerAdamSmsNoticeRdsReceiver.java
+14
-0
ConsumerAdamUCenterRdsReceiverSql.java
...m/service/receiver/ConsumerAdamUCenterRdsReceiverSql.java
+14
-0
ConsumerAdamUMemberRdsReceiverSql.java
...m/service/receiver/ConsumerAdamUMemberRdsReceiverSql.java
+14
-0
ConsumerAdamURegisterRdsReceiverSql.java
...service/receiver/ConsumerAdamURegisterRdsReceiverSql.java
+14
-0
ConsumerAdamUCenterProcessorTest.java
...r/service/processor/ConsumerAdamUCenterProcessorTest.java
+0
-21
pom.xml
...ce-consumer-all/liquidnet-service-consumer-dragon/pom.xml
+0
-14
ConsumerAdamUCenterProcessor.java
...ragon/service/processor/ConsumerAdamUCenterProcessor.java
+0
-97
ConsumerDragonUCenterProcessorTest.java
...service/processor/ConsumerDragonUCenterProcessorTest.java
+0
-20
pom.xml
liquidnet-bus-service/liquidnet-service-consumer-all/pom.xml
+0
-5
No files found.
liquidnet-bus-common/liquidnet-common-cache/liquidnet-common-cache-redisson/pom.xml
View file @
3e8b549a
...
...
@@ -15,6 +15,10 @@
<groupId>
org.redisson
</groupId>
<artifactId>
redisson-spring-boot-starter
</artifactId>
</dependency>
<dependency>
<groupId>
org.redisson
</groupId>
<artifactId>
redisson-spring-data-22
</artifactId>
</dependency>
</dependencies>
...
...
liquidnet-bus-common/liquidnet-common-service-base/src/main/java/com/liquidnet/service/base/constant/MQConst.java
0 → 100644
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
base
.
constant
;
public
class
MQConst
{
public
enum
AdamQueue
{
SMS_NOTICE
(
"adam:stream:rk.sms.notice"
,
"group.sms.sender"
,
"短信通知"
),
// SMS_SPREAD("adam:stream:rk.sms.spread", "group.sms.sender", "短信推广"),
SQL_UREGISTER
(
"adam:stream:rk.sql.uregister"
,
"group.sql.ucenter"
,
"用户注册"
),
SQL_UCENTER
(
"adam:stream:rk.sql.ucenter"
,
"group.sql.ucenter"
,
"用户中心"
),
SQL_UMEMBER
(
"adam:stream:rk.sql.umember"
,
"group.sql.ucenter"
,
"购买会员"
),
;
private
final
String
key
;
private
final
String
group
;
private
final
String
desc
;
AdamQueue
(
String
key
,
String
group
,
String
desc
)
{
this
.
key
=
key
;
this
.
group
=
group
;
this
.
desc
=
desc
;
}
public
String
getKey
()
{
return
key
;
}
public
String
getGroup
()
{
return
group
;
}
public
String
getDesc
()
{
return
desc
;
}
}
}
liquidnet-bus-common/pom.xml
View file @
3e8b549a
...
...
@@ -110,7 +110,18 @@
<dependency>
<groupId>
org.redisson
</groupId>
<artifactId>
redisson-spring-boot-starter
</artifactId>
<version>
3.13.4
</version>
<version>
3.13.6
</version>
<exclusions>
<exclusion>
<groupId>
org.redisson
</groupId>
<artifactId>
redisson-spring-data-23
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.redisson
</groupId>
<artifactId>
redisson-spring-data-22
</artifactId>
<version>
3.13.6
</version>
</dependency>
<dependency>
<groupId>
com.baomidou
</groupId>
...
...
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/docu/redis_queue_create.txt
0 → 100644
View file @
3e8b549a
XADD adam:stream:rk.sms.notice * 0 0
XGROUP CREATE adam:stream:rk.sms.notice group.sms.sender 0
XADD adam:stream:rk.sql.uregister * 0 0
XGROUP CREATE adam:stream:rk.sql.uregister group.sql.ucenter 0
XADD adam:stream:rk.sql.ucenter * 0 0
XGROUP CREATE adam:stream:rk.sql.ucenter group.sql.ucenter 0
XADD adam:stream:rk.sql.umember * 0 0
XGROUP CREATE adam:stream:rk.sql.umember group.sql.ucenter 0
# ==================================================
# XGROUP DESTROY adam:stream:rk.sms.notice group.sms.sender 0
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/controller/AdamLoginController.java
View file @
3e8b549a
...
...
@@ -73,8 +73,6 @@ public class AdamLoginController {
@Autowired
IAdamUserService
adamUserService
;
@Autowired
RabbitTemplate
rabbitTemplate
;
@Autowired
SmsProcessor
smsProcessor
;
@Value
(
"${liquidnet.reviewer.app-login.mobile}"
)
...
...
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/service/impl/AdamAddressesServiceImpl.java
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
adam
.
service
.
impl
;
import
com.baomidou.mybatisplus.extension.service.impl.ServiceImpl
;
import
com.liquidnet.common.exception.LiquidnetServiceException
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.commons.lang.util.*
;
import
com.liquidnet.service.adam.dto.AdamAddressesParam
;
import
com.liquidnet.service.adam.dto.vo.AdamAddressesVo
;
import
com.liquidnet.service.adam.entity.AdamAddresses
;
import
com.liquidnet.service.adam.mapper.AdamAddressesMapper
;
import
com.liquidnet.service.adam.service.AdamRdmService
;
import
com.liquidnet.service.adam.service.IAdamAddressesService
;
import
com.liquidnet.service.adam.util.QueueUtils
;
import
com.liquidnet.service.base.ErrorMapping
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.mongodb.client.model.FindOneAndUpdateOptions
;
import
com.mongodb.client.model.ReturnDocument
;
import
com.mongodb.client.result.DeleteResult
;
import
lombok.extern.slf4j.Slf4j
;
import
org.bson.Document
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.mongodb.core.MongoTemplate
;
import
org.springframework.data.mongodb.core.convert.MongoConverter
;
import
org.springframework.data.mongodb.core.query.Criteria
;
import
org.springframework.data.mongodb.core.query.Query
;
import
org.springframework.stereotype.Service
;
...
...
@@ -46,7 +42,7 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
@Autowired
MongoTemplate
mongoTemplate
;
@Autowired
RabbitTemplate
rabbitTemplate
;
QueueUtils
queueUtils
;
@Autowired
AdamRdmService
adamRdmService
;
...
...
@@ -75,7 +71,8 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_addresses.add"
,
vo
.
getAddressesId
(),
vo
.
getUid
(),
vo
.
getName
(),
vo
.
getPhone
(),
vo
.
getProvince
(),
vo
.
getCity
(),
vo
.
getCounty
(),
vo
.
getAddress
(),
vo
.
getIsDefault
(),
vo
.
getState
(),
now
)
...
...
@@ -133,8 +130,10 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
adamRdmService
.
setAddressesVoByUid
(
uid
,
vos
);
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
SqlMapping
.
get
(
"adam_addresses.update.is_default"
,
toMqObjs
));
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_addresses.update.is_default"
,
toMqObjs
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
}
}
...
...
@@ -163,7 +162,8 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_addresses.edit"
,
updateVo
.
getName
(),
updateVo
.
getPhone
(),
updateVo
.
getProvince
(),
updateVo
.
getCity
(),
updateVo
.
getCounty
(),
updateVo
.
getAddress
(),
now
,
updateVo
.
getAddressesId
()
)
...
...
@@ -196,7 +196,8 @@ public class AdamAddressesServiceImpl implements IAdamAddressesService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_addresses.remove"
,
now
,
now
,
addressesId
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
...
...
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/service/impl/AdamCollectionServiceImpl.java
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
adam
.
service
.
impl
;
import
com.github.pagehelper.PageInfo
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.service.adam.dto.vo.AdamCollectBaseVo
;
import
com.liquidnet.service.adam.dto.vo.AdamCollectInfoVo
;
import
com.liquidnet.service.adam.dto.vo.AdamCollectVo
;
import
com.liquidnet.service.adam.service.AdamRdmService
;
import
com.liquidnet.service.adam.service.IAdamCollectionService
;
import
com.liquidnet.service.adam.util.QueueUtils
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.kylin.dto.vo.mongo.KylinPerformanceVo
;
import
com.mongodb.client.result.DeleteResult
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.domain.PageRequest
;
import
org.springframework.data.domain.Pageable
;
...
...
@@ -41,7 +41,7 @@ public class AdamCollectionServiceImpl implements IAdamCollectionService {
@Autowired
MongoTemplate
mongoTemplate
;
@Autowired
RabbitTemplate
rabbitTemplate
;
QueueUtils
queueUtils
;
@Autowired
AdamRdmService
adamRdmService
;
...
...
@@ -54,7 +54,8 @@ public class AdamCollectionServiceImpl implements IAdamCollectionService {
mongoTemplate
.
insert
(
vo
,
AdamCollectBaseVo
.
class
.
getSimpleName
());
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_collection.add"
,
vo
.
getUid
(),
vo
.
getContentId
(),
vo
.
getType
(),
vo
.
getState
(),
now
)
...
...
@@ -82,8 +83,10 @@ public class AdamCollectionServiceImpl implements IAdamCollectionService {
for
(
String
c
:
contentIds
)
{
toMqObjs
.
add
(
new
Object
[]{
now
,
uid
,
c
});
}
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
SqlMapping
.
get
(
"adam_collection.del"
,
toMqObjs
));
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_collection.del"
,
toMqObjs
)
);
}
}
...
...
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/service/impl/AdamDisposedServiceImpl.java
View file @
3e8b549a
...
...
@@ -2,7 +2,6 @@ package com.liquidnet.service.adam.service.impl;
import
com.baomidou.mybatisplus.extension.service.impl.ServiceImpl
;
import
com.github.pagehelper.PageInfo
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.service.adam.dto.vo.AdamCollectInfoVo
;
import
com.liquidnet.service.adam.dto.vo.AdamDisposedBaseVo
;
...
...
@@ -12,11 +11,12 @@ import com.liquidnet.service.adam.entity.AdamDisposed;
import
com.liquidnet.service.adam.mapper.AdamDisposedMapper
;
import
com.liquidnet.service.adam.service.AdamRdmService
;
import
com.liquidnet.service.adam.service.IAdamDisposedService
;
import
com.liquidnet.service.adam.util.QueueUtils
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.kylin.dto.vo.mongo.KylinPerformanceVo
;
import
com.mongodb.client.result.DeleteResult
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.domain.PageRequest
;
import
org.springframework.data.domain.Sort
;
...
...
@@ -44,7 +44,7 @@ public class AdamDisposedServiceImpl implements IAdamDisposedService {
@Autowired
MongoTemplate
mongoTemplate
;
@Autowired
RabbitTemplate
rabbitTemplate
;
QueueUtils
queueUtils
;
@Autowired
AdamRdmService
adamRdmService
;
...
...
@@ -62,7 +62,8 @@ public class AdamDisposedServiceImpl implements IAdamDisposedService {
mongoTemplate
.
insert
(
vo
,
AdamDisposedBaseVo
.
class
.
getSimpleName
());
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_disposed.add"
,
vo
.
getUid
(),
vo
.
getContentId
(),
vo
.
getType
(),
vo
.
getState
(),
now
)
...
...
@@ -90,8 +91,10 @@ public class AdamDisposedServiceImpl implements IAdamDisposedService {
for
(
String
c
:
contentIds
)
{
toMqObjs
.
add
(
new
Object
[]{
now
,
uid
,
c
});
}
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
SqlMapping
.
get
(
"adam_disposed.del"
,
toMqObjs
));
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_disposed.del"
,
toMqObjs
)
);
}
}
...
...
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/service/impl/AdamEntersServiceImpl.java
View file @
3e8b549a
...
...
@@ -2,21 +2,21 @@ package com.liquidnet.service.adam.service.impl;
import
com.fasterxml.jackson.databind.JsonNode
;
import
com.liquidnet.common.exception.LiquidnetServiceException
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.commons.lang.util.*
;
import
com.liquidnet.service.adam.dto.AdamEntersParam
;
import
com.liquidnet.service.adam.dto.vo.AdamEntersVo
;
import
com.liquidnet.service.adam.service.AdamRdmService
;
import
com.liquidnet.service.adam.service.IAdamEntersService
;
import
com.liquidnet.service.adam.util.QueueUtils
;
import
com.liquidnet.service.base.ErrorMapping
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.mongodb.client.model.FindOneAndUpdateOptions
;
import
com.mongodb.client.model.ReturnDocument
;
import
com.mongodb.client.result.DeleteResult
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang3.StringUtils
;
import
org.bson.Document
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.core.env.Environment
;
...
...
@@ -49,7 +49,7 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
@Autowired
MongoTemplate
mongoTemplate
;
@Autowired
RabbitTemplate
rabbitTemplate
;
QueueUtils
queueUtils
;
@Autowired
AdamRdmService
adamRdmService
;
...
...
@@ -98,7 +98,8 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
log
.
debug
(
"#SQL.GET耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
msg
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
...
...
@@ -155,8 +156,10 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
adamRdmService
.
setEntersVoByUid
(
uid
,
vos
);
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
SqlMapping
.
get
(
"adam_enters.update.is_default"
,
toMqObjs
));
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_enters.update.is_default"
,
toMqObjs
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
}
}
...
...
@@ -194,7 +197,8 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_enters.edit"
,
updateVo
.
getType
(),
updateVo
.
getName
(),
updateVo
.
getMobile
(),
updateVo
.
getIdCard
(),
updateVo
.
getIsDefault
(),
updateVo
.
getState
(),
now
,
updateVo
.
getEntersId
()
)
...
...
@@ -220,7 +224,8 @@ public class AdamEntersServiceImpl implements IAdamEntersService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_enters.remove"
,
now
,
now
,
entersId
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
...
...
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/service/impl/AdamMemberOrderServiceImpl.java
View file @
3e8b549a
...
...
@@ -2,7 +2,6 @@ package com.liquidnet.service.adam.service.impl;
import
com.fasterxml.jackson.databind.JsonNode
;
import
com.github.pagehelper.PageInfo
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.commons.lang.util.*
;
import
com.liquidnet.service.adam.dto.AdamMemberOrderCallbackParam
;
import
com.liquidnet.service.adam.dto.AdamMemberOrderCodeParam
;
...
...
@@ -13,12 +12,13 @@ import com.liquidnet.service.adam.service.AdamRdmService;
import
com.liquidnet.service.adam.service.IAdamMemberOrderService
;
import
com.liquidnet.service.adam.service.IAdamUserMemberService
;
import
com.liquidnet.service.adam.util.MemberUtil
;
import
com.liquidnet.service.adam.util.QueueUtils
;
import
com.liquidnet.service.base.ErrorMapping
;
import
com.liquidnet.service.base.ResponseDto
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.bson.Document
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.core.env.Environment
;
import
org.springframework.data.domain.PageRequest
;
...
...
@@ -45,7 +45,7 @@ public class AdamMemberOrderServiceImpl implements IAdamMemberOrderService {
@Autowired
MongoTemplate
mongoTemplate
;
@Autowired
RabbitTemplate
rabbitTemplate
;
QueueUtils
queueUtils
;
@Autowired
AdamRdmService
adamRdmService
;
@Autowired
...
...
@@ -303,8 +303,10 @@ public class AdamMemberOrderServiceImpl implements IAdamMemberOrderService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UMEMBER
,
SqlMapping
.
gets
(
toMqSqls
,
operationObjs
,
updateMemberOrderObjs
));
queueUtils
.
sendMsgByRedis
(
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
AdamQueue
.
SQL_UMEMBER
.
getKey
(),
SqlMapping
.
gets
(
toMqSqls
,
operationObjs
,
updateMemberOrderObjs
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
return
ResponseDto
.
success
();
}
...
...
@@ -421,8 +423,10 @@ public class AdamMemberOrderServiceImpl implements IAdamMemberOrderService {
});
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UMEMBER
,
SqlMapping
.
gets
(
toMqSqls
,
upsertUserMemberObjs
,
updateMemberCodeObjs
,
initMemberOrderObjs
));
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UMEMBER
.
getKey
(),
SqlMapping
.
gets
(
toMqSqls
,
upsertUserMemberObjs
,
updateMemberCodeObjs
,
initMemberOrderObjs
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
AdamMemberOrderResult
result
=
AdamMemberOrderResult
.
getNew
();
...
...
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/service/impl/AdamRealNameServiceImpl.java
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
adam
.
service
.
impl
;
import
com.liquidnet.common.cache.redis.util.RedisUtil
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.service.adam.dto.vo.AdamRealInfoVo
;
import
com.liquidnet.service.adam.entity.AdamRealName
;
import
com.liquidnet.service.adam.service.IAdamRealNameService
;
import
com.liquidnet.service.adam.util.QueueUtils
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.mongodb.core.MongoTemplate
;
import
org.springframework.stereotype.Service
;
...
...
@@ -31,7 +31,7 @@ public class AdamRealNameServiceImpl implements IAdamRealNameService {
@Autowired
RedisUtil
redisUtil
;
@Autowired
RabbitTemplate
rabbitTemplate
;
QueueUtils
queueUtils
;
@Override
// @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
...
...
@@ -50,8 +50,10 @@ public class AdamRealNameServiceImpl implements IAdamRealNameService {
realName
.
getCreatedAt
()
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
SqlMapping
.
get
(
"adam_real_name.add"
,
paramList
.
toArray
()));
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_real_name.add"
,
paramList
.
toArray
())
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
}
}
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/service/impl/AdamUserInfoServiceImpl.java
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
adam
.
service
.
impl
;
import
com.liquidnet.common.cache.redis.util.RedisUtil
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.commons.lang.core.JwtValidator
;
import
com.liquidnet.commons.lang.util.BsonUtil
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
...
...
@@ -9,13 +8,14 @@ import com.liquidnet.commons.lang.util.JsonUtils;
import
com.liquidnet.service.adam.dto.vo.AdamUserInfoVo
;
import
com.liquidnet.service.adam.service.AdamRdmService
;
import
com.liquidnet.service.adam.service.IAdamUserInfoService
;
import
com.liquidnet.service.adam.util.QueueUtils
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.mongodb.client.model.FindOneAndUpdateOptions
;
import
com.mongodb.client.model.ReturnDocument
;
import
com.mongodb.client.result.UpdateResult
;
import
lombok.extern.slf4j.Slf4j
;
import
org.bson.Document
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.mongodb.core.MongoTemplate
;
import
org.springframework.data.mongodb.core.query.Criteria
;
...
...
@@ -44,7 +44,7 @@ public class AdamUserInfoServiceImpl implements IAdamUserInfoService {
@Autowired
AdamRdmService
adamRdmService
;
@Autowired
RabbitTemplate
rabbitTemplate
;
QueueUtils
queueUtils
;
@Autowired
RedisUtil
redisUtil
;
@Autowired
...
...
@@ -92,7 +92,7 @@ public class AdamUserInfoServiceImpl implements IAdamUserInfoService {
log
.
debug
(
"#SQL.GET耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
()
,
SqlMapping
.
gets
(
toMqSqls
,
updateUserObjs
,
updateUserInfoObjs
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
...
...
@@ -124,8 +124,9 @@ public class AdamUserInfoServiceImpl implements IAdamUserInfoService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
SqlMapping
.
get
(
"adam_user.edit.mobile"
,
mobile
,
now
,
uid
));
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_user.edit.mobile"
,
mobile
,
now
,
uid
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
return
this
.
flushSsoProcess
(
beforeUserInfoVo
);
...
...
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/service/impl/AdamUserServiceImpl.java
View file @
3e8b549a
...
...
@@ -3,7 +3,6 @@ package com.liquidnet.service.adam.service.impl;
import
com.fasterxml.jackson.databind.JsonNode
;
import
com.liquidnet.common.cache.redisson.util.RedisLockUtil
;
import
com.liquidnet.common.exception.LiquidnetServiceException
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.commons.lang.util.*
;
import
com.liquidnet.service.adam.constant.AdamRedisConst
;
import
com.liquidnet.service.adam.dto.AdamThirdPartParam
;
...
...
@@ -14,8 +13,10 @@ import com.liquidnet.service.adam.service.AdamRdmService;
import
com.liquidnet.service.adam.service.IAdamEntersService
;
import
com.liquidnet.service.adam.service.IAdamRealNameService
;
import
com.liquidnet.service.adam.service.IAdamUserService
;
import
com.liquidnet.service.adam.util.QueueUtils
;
import
com.liquidnet.service.base.ErrorMapping
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.mongodb.client.model.FindOneAndUpdateOptions
;
import
com.mongodb.client.model.ReturnDocument
;
import
com.mongodb.client.result.DeleteResult
;
...
...
@@ -23,7 +24,6 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.commons.lang3.RandomStringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.bson.Document
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.core.env.Environment
;
...
...
@@ -57,7 +57,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
@Autowired
MongoTemplate
mongoTemplate
;
@Autowired
RabbitTemplate
rabbitTemplate
;
QueueUtils
queueUtils
;
@Autowired
IAdamEntersService
adamEntersService
;
@Autowired
...
...
@@ -104,7 +104,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
toMqSqls
.
add
(
SqlMapping
.
get
(
"adam_user_info.add"
));
initUserInfoObjs
.
add
(
new
Object
[]{
userInfoVo
.
getQrCode
(),
userInfoVo
.
getUid
()});
log
.
debug
(
"#SQL.GET耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UREGISTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
()
,
SqlMapping
.
gets
(
toMqSqls
,
initUserObjs
,
initUserInfoObjs
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
...
...
@@ -173,7 +173,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UREGISTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
()
,
SqlMapping
.
gets
(
toMqSqls
,
initUserObjs
,
initThirdPartObjs
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
...
...
@@ -209,7 +209,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
()
,
SqlMapping
.
get
(
"adam_third_party.add"
,
thirdPartInfoVo
.
getUid
(),
thirdPartInfoVo
.
getOpenId
(),
thirdPartInfoVo
.
getAvatar
(),
thirdPartInfoVo
.
getNickname
(),
thirdPartInfoVo
.
getPlatform
(),
thirdPartInfoVo
.
getState
(),
thirdPartInfoVo
.
getCreatedAt
()
...
...
@@ -237,7 +237,7 @@ public class AdamUserServiceImpl implements IAdamUserService {
adamRdmService
.
delThirdPartVoListByUid
(
bindUid
);
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
()
,
SqlMapping
.
get
(
"adam_third_party.add"
,
thirdPartInfoVo
.
getUid
(),
thirdPartInfoVo
.
getOpenId
(),
thirdPartInfoVo
.
getAvatar
(),
thirdPartInfoVo
.
getNickname
(),
thirdPartInfoVo
.
getPlatform
(),
thirdPartInfoVo
.
getState
(),
thirdPartInfoVo
.
getCreatedAt
()
...
...
@@ -274,8 +274,9 @@ public class AdamUserServiceImpl implements IAdamUserService {
log
.
debug
(
"#RDS耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
SqlMapping
.
get
(
"adam_third_party.unbind"
,
now
,
uid
,
platform
));
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
get
(
"adam_third_party.unbind"
,
now
,
uid
,
platform
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
}
...
...
@@ -352,8 +353,10 @@ public class AdamUserServiceImpl implements IAdamUserService {
}
s
=
System
.
currentTimeMillis
();
rabbitTemplate
.
convertAndSend
(
MQConst
.
EX_LNS_SQL_UCENTER
,
MQConst
.
RK_SQL_UCENTER
,
SqlMapping
.
gets
(
toMqSqls
,
objsUser
));
queueUtils
.
sendMsgByRedis
(
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
(),
SqlMapping
.
gets
(
toMqSqls
,
objsUser
)
);
log
.
debug
(
"#MQ耗时:{}ms"
,
System
.
currentTimeMillis
()
-
s
);
}
...
...
liquidnet-bus-service/liquidnet-service-adam/liquidnet-service-adam-impl/src/main/java/com/liquidnet/service/adam/util/QueueUtils.java
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
adam
.
util
;
import
com.liquidnet.common.exception.LiquidnetServiceException
;
import
org.springframework.amqp.rabbit.core.RabbitTemplate
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.stereotype.Component
;
...
...
@@ -19,27 +17,25 @@ public class QueueUtils {
StringRedisTemplate
stringRedisTemplate
;
/**
* 发送
SqlMapping Json 字符串
* 发送
消息 - RABBIT
*
* @param exchange 交换机
* @param route
路径
* @param
sqlStr
Json字符串
* @param route
Key
路径
* @param
jsonMsg
Json字符串
*/
public
void
send
SqlRabbit
(
String
exchange
,
String
route
,
String
sqlStr
)
{
rabbitTemplate
.
convertAndSend
(
exchange
,
route
,
sqlStr
);
public
void
send
MsgByRabbit
(
String
exchange
,
String
routeKey
,
String
jsonMsg
)
{
rabbitTemplate
.
convertAndSend
(
exchange
,
route
Key
,
jsonMsg
);
}
/**
*
给 REDIS 队列发送消息 数据库相关
*
发送消息 - REDIS
*
* @param redisKey RedisKey 消费Key
* @param sqlStr Json字符串
* @return
* @param streamKey Redis消费Key
* @param jsonMsg Json字符串
*/
public
void
send
SqlRedis
(
String
redisKey
,
String
sqlStr
)
{
public
void
send
MsgByRedis
(
String
streamKey
,
String
jsonMsg
)
{
HashMap
<
String
,
String
>
map
=
new
HashMap
<>();
map
.
put
(
"message"
,
sqlStr
);
MapRecord
<
String
,
String
,
String
>
record
=
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
redisKey
);
stringRedisTemplate
.
opsForStream
().
add
(
record
);
map
.
put
(
"message"
,
jsonMsg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
streamKey
));
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/pom.xml
View file @
3e8b549a
...
...
@@ -17,20 +17,6 @@
</properties>
<dependencies>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-common-mq
</artifactId>
<version>
1.0-SNAPSHOT
</version>
</dependency>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-common-web
</artifactId>
</dependency>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-common-cache-redisson
</artifactId>
<version>
1.0-SNAPSHOT
</version>
</dependency>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-common-sms
</artifactId>
...
...
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/config/ConsumerAdamSmsSenderRedisStreamConfig.java
0 → 100644
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
service
.
config
;
import
com.liquidnet.service.consumer.adam.service.receiver.ConsumerAdamSmsNoticeRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.*;
@Configuration
public
class
ConsumerAdamSmsSenderRedisStreamConfig
{
@Autowired
ConsumerAdamSmsNoticeRdsReceiver
consumerAdamSmsNoticeRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 短信通知
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlURegister
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
AdamQueue
.
SMS_NOTICE
.
getGroup
(),
AdamQueue
.
SMS_NOTICE
.
name
()
+
t
),
StreamOffset
.
create
(
AdamQueue
.
SMS_NOTICE
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerAdamSmsNoticeRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 短信通知 */
@Bean
public
Subscription
subscriptionSmsNotice1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice6
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
6
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice7
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
7
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice8
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
8
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice9
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
9
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice10
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
10
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/config/ConsumerAdamSqlUcenterRedisStreamConfig.java
0 → 100644
View file @
3e8b549a
This diff is collapsed.
Click to expand it.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/processor/ConsumerAdamSmsProcessor.java
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
service
.
processor
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.common.sms.processor.SmsProcessor
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.SmsMessage
;
import
com.rabbitmq.client.Channel
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.amqp.core.Message
;
import
org.springframework.amqp.core.MessageProperties
;
import
org.springframework.amqp.rabbit.annotation.Exchange
;
import
org.springframework.amqp.rabbit.annotation.Queue
;
import
org.springframework.amqp.rabbit.annotation.QueueBinding
;
import
org.springframework.amqp.rabbit.annotation.RabbitListener
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
import
java.io.IOException
;
/**
* ConsumerAdamSmsProcessor.class
*
* @author zhanggb
* Created by IntelliJ IDEA at 2021/7/13
*/
@Slf4j
@Component
public
class
ConsumerAdamSmsProcessor
{
@Resource
SmsProcessor
smsProcessor
;
private
void
consumerSmsSendHandler
(
Message
msg
,
Channel
channel
)
{
MessageProperties
properties
=
msg
.
getMessageProperties
();
String
consumerQueue
=
properties
.
getConsumerQueue
();
long
deliveryTag
=
properties
.
getDeliveryTag
();
log
.
info
(
"CONSUMER SMS ==> [consumerQueue:{},deliveryTag:{}]"
,
consumerQueue
,
deliveryTag
);
String
msgBody
=
new
String
(
msg
.
getBody
());
log
.
debug
(
"CONSUMER SMS ==> Preparing:{}"
,
msgBody
);
try
{
SmsMessage
smsMessage
=
JsonUtils
.
fromJson
(
msgBody
,
SmsMessage
.
class
);
boolean
result
=
smsProcessor
.
send
(
smsMessage
.
getPhone
(),
smsMessage
.
getSignName
(),
smsMessage
.
getTemplateCode
(),
smsMessage
.
getTemplateParam
().
toString
());
log
.
debug
(
"CONSUMER SMS result of execution:{}"
,
result
);
if
(
result
)
{
channel
.
basicAck
(
deliveryTag
,
false
);
}
else
{
log
.
warn
(
"###CONSUMER SMS[consumerQueue:{},deliveryTag={},sqlMessage:{}]"
,
consumerQueue
,
deliveryTag
,
msgBody
);
channel
.
basicAck
(
deliveryTag
,
false
);
}
}
catch
(
IOException
e
)
{
log
.
error
(
"CONSUMER SMS[consumerQueue:{},deliveryTag:{},sqlMessage:{}]"
,
consumerQueue
,
deliveryTag
,
msgBody
,
e
);
}
}
/* ================================================================== | 短信验证码 */
//package com.liquidnet.service.consumer.adam.service.processor;
//
//import com.liquidnet.common.mq.constant.MQConst;
//import com.liquidnet.common.sms.processor.SmsProcessor;
//import com.liquidnet.commons.lang.util.JsonUtils;
//import com.liquidnet.service.base.SmsMessage;
//import com.rabbitmq.client.Channel;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.core.MessageProperties;
//import org.springframework.amqp.rabbit.annotation.Exchange;
//import org.springframework.amqp.rabbit.annotation.Queue;
//import org.springframework.amqp.rabbit.annotation.QueueBinding;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.Resource;
//import java.io.IOException;
//
///**
// * ConsumerAdamSmsProcessor.class
// *
// * @author zhanggb
// * Created by IntelliJ IDEA at 2021/7/13
// */
//@Slf4j
//@Component
//public class ConsumerAdamSmsProcessor {
// @Resource
// SmsProcessor smsProcessor;
//
// private void consumerSmsSendHandler(Message msg, Channel channel) {
// MessageProperties properties = msg.getMessageProperties();
// String consumerQueue = properties.getConsumerQueue();
// long deliveryTag = properties.getDeliveryTag();
// log.info("CONSUMER SMS ==> [consumerQueue:{},deliveryTag:{}]", consumerQueue, deliveryTag);
// String msgBody = new String(msg.getBody());
// log.debug("CONSUMER SMS ==> Preparing:{}", msgBody);
// try {
// SmsMessage smsMessage = JsonUtils.fromJson(msgBody, SmsMessage.class);
// boolean result = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
// log.debug("CONSUMER SMS result of execution:{}", result);
// if (result) {
// channel.basicAck(deliveryTag, false);
// } else {
// log.warn("###CONSUMER SMS[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody);
// channel.basicAck(deliveryTag, false);
// }
// } catch (IOException e) {
// log.error("CONSUMER SMS[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody, e);
// }
// }
//
// /* ================================================================== | 短信验证码 */
//
//// @RabbitListener(
//// bindings = @QueueBinding(
//// exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
//// key = MQConst.RK_SMS_CODE,
//// value = @Queue(MQConst.QUEUES_SMS_CODE)
//// ),
//// concurrency = "25"
//// )
//// public void consumerSqlForSmsCode(Message msg, Channel channel) {
//// this.consumerSmsSendHandler(msg, channel);
//// }
//
// /* ================================================================== | 短信通知 */
//
// @RabbitListener(
// bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
// key = MQConst.RK_SMS_
COD
E,
// value = @Queue(MQConst.QUEUES_SMS_
COD
E)
// key = MQConst.RK_SMS_
NOTIC
E,
// value = @Queue(MQConst.QUEUES_SMS_
NOTIC
E)
// ),
// concurrency = "
25
"
// concurrency = "
10
"
// )
// public void consumerSqlForSms
Cod
e(Message msg, Channel channel) {
// public void consumerSqlForSms
Notic
e(Message msg, Channel channel) {
// this.consumerSmsSendHandler(msg, channel);
// }
/* ================================================================== | 短信通知 */
@RabbitListener
(
bindings
=
@QueueBinding
(
exchange
=
@Exchange
(
MQConst
.
EX_LNS_SMS_SENDER
),
key
=
MQConst
.
RK_SMS_NOTICE
,
value
=
@Queue
(
MQConst
.
QUEUES_SMS_NOTICE
)
),
concurrency
=
"10"
)
public
void
consumerSqlForSmsNotice
(
Message
msg
,
Channel
channel
)
{
this
.
consumerSmsSendHandler
(
msg
,
channel
);
}
/* ================================================================== | */
}
//
//
// /* ================================================================== | */
//}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/processor/ConsumerAdamUCenterProcessor.java
View file @
3e8b549a
This diff is collapsed.
Click to expand it.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/receiver/AbstractSmsRedisReceiver.java
0 → 100644
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
service
.
receiver
;
import
com.liquidnet.common.sms.processor.SmsProcessor
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.SmsMessage
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.consumer.adam.service.IBaseDao
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
import
javax.annotation.Resource
;
@Slf4j
public
abstract
class
AbstractSmsRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Resource
SmsProcessor
smsProcessor
;
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
log
.
info
(
"CONSUMER SMS[streamKey:{},messageId:{},stream:{},body:{}]"
,
this
.
getRedisStreamKey
(),
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerSmsSendHandler
(
message
.
getValue
().
get
(
"message"
));
log
.
debug
(
"CONSUMER SMS RESULT:{}"
,
result
);
// 消费成功确认,消息删除和消息确认是一个事务
if
(
result
)
{
log
.
info
(
"CONSUMER SMS SUCC ==> MESSAGE_ID:{}"
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
this
.
getRedisStreamKey
(),
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER SMS SUC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]"
,
message
.
getId
(),
e
);
}
}
}
private
boolean
consumerSmsSendHandler
(
String
msg
)
{
try
{
SmsMessage
smsMessage
=
JsonUtils
.
fromJson
(
msg
,
SmsMessage
.
class
);
return
smsProcessor
.
send
(
smsMessage
.
getPhone
(),
smsMessage
.
getSignName
(),
smsMessage
.
getTemplateCode
(),
smsMessage
.
getTemplateParam
().
toString
());
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER SMS FAIL ==> {}"
,
e
.
getMessage
(),
e
);
return
false
;
}
}
protected
abstract
String
getRedisStreamKey
();
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/receiver/AbstractSqlRedisReceiver.java
0 → 100644
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
service
.
receiver
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.consumer.adam.service.IBaseDao
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
@Slf4j
public
abstract
class
AbstractSqlRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
private
IBaseDao
baseDao
;
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
log
.
info
(
"CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]"
,
this
.
getRedisStreamKey
(),
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerSqlDaoHandler
(
message
.
getValue
().
get
(
"message"
));
log
.
debug
(
"CONSUMER SQL RESULT:{}"
,
result
);
// 消费成功确认,消息删除和消息确认是一个事务
if
(
result
)
{
log
.
info
(
"CONSUMER SMS SUCC ==> MESSAGE_ID:{}"
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
this
.
getRedisStreamKey
(),
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER SMS SUCC ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{}]"
,
message
.
getId
(),
e
);
}
}
}
private
boolean
consumerSqlDaoHandler
(
String
msg
)
{
try
{
SqlMapping
.
SqlMessage
sqlMessage
=
JsonUtils
.
fromJson
(
msg
,
SqlMapping
.
SqlMessage
.
class
);
return
baseDao
.
batchSqls
(
sqlMessage
.
getSqls
(),
sqlMessage
.
getArgs
());
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER SMS FAIL ==> {}"
,
e
.
getMessage
(),
e
);
return
false
;
}
}
protected
abstract
String
getRedisStreamKey
();
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/receiver/ConsumerAdamSmsNoticeRdsReceiver.java
0 → 100644
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
service
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
ConsumerAdamSmsNoticeRdsReceiver
extends
AbstractSmsRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
AdamQueue
.
SMS_NOTICE
.
getKey
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/receiver/ConsumerAdamUCenterRdsReceiverSql.java
0 → 100644
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
service
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
ConsumerAdamUCenterRdsReceiverSql
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
AdamQueue
.
SQL_UCENTER
.
getKey
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/receiver/ConsumerAdamUMemberRdsReceiverSql.java
0 → 100644
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
service
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
ConsumerAdamUMemberRdsReceiverSql
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
AdamQueue
.
SQL_UMEMBER
.
getKey
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/service/receiver/ConsumerAdamURegisterRdsReceiverSql.java
0 → 100644
View file @
3e8b549a
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
service
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
ConsumerAdamURegisterRdsReceiverSql
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/test/java/com/liquidnet/service/consumer/service/processor/ConsumerAdamUCenterProcessorTest.java
deleted
100644 → 0
View file @
1c89b7f7
package
com
.
liquidnet
.
service
.
consumer
.
service
.
processor
;
import
com.liquidnet.service.consumer.adam.service.processor.ConsumerAdamUCenterProcessor
;
import
org.junit.jupiter.api.Test
;
import
org.junit.runner.RunWith
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.test.context.SpringBootTest
;
import
org.springframework.test.context.junit4.SpringJUnit4ClassRunner
;
@RunWith
(
SpringJUnit4ClassRunner
.
class
)
@SpringBootTest
public
class
ConsumerAdamUCenterProcessorTest
{
@Autowired
ConsumerAdamUCenterProcessor
consumerAdamUCenterProcessor
;
@Test
public
void
test
()
{
String
msg
=
"{\"sqls\":[\"INSERT INTO kylin_order_tickets(order_tickets_id,user_id,user_name,user_mobile,performance_title,order_code,qr_code,order_type,order_version,number,price,price_member,price_total,price_voucher,price_actual,price_express,price_refund,refund_number,pay_type,payment_type,time_pay,express_contacts,express_address,express_phone,coupon_type,get_ticket_type,get_ticket_describe,pay_countdown_minute,`comment`,created_at,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_status(order_ticket_status_id ,order_id ,express_type ,is_student ,transfer_status ,`status` ,pay_status ,created_at ,updated_at)VALUES(?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_relations(order_ticket_relations_id ,order_id ,transfer_id ,live_id ,agent_id ,is_member ,performance_id ,time_id,ticket_id ,created_at ,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_entities(order_ticket_entities_id ,order_id ,ticket_id ,user_id ,time_id ,performance_id ,enter_type ,enter_name ,enter_mobile,enter_id_code,`status`,sys_damai,check_client,is_payment,`comment`,created_at,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)\"],\"args\":[[[\"73901653639766016\",\"73459712083042304\",\"\",\"15001268203\",\"0607演出0607\",\"T7390165363976516260\",\"\",\"\",\"\",1,0.01,0.01,0.01,0.0,0.01,0.00,0.0,0,\"alipay\",null,null,\"\",\"\",\"\",\"no\",\"electronic\",\"\",5,null,\"2021-06-07T22:18:47.264\",null]],[[\"73901653694291968\",\"73901653639766016\",2,0,0,0,0,\"2021-06-07T22:18:47.267\",null]],[[\"73901653694291969\",\"73901653639766016\",\"\",\"\",\"0\",0,\"73776030099382272\",\"73776073304907776\",\"73776225042243584\",\"2021-06-07T22:18:47.267\",null]],[[\"73901653694291970\",\"73901653639766016\",\"73776225042243584\",\"73459712083042304\",\"73776073304907776\",\"73776030099382272\",0,\"\",\"\",\"\",0,0,\"\",0,\"\",\"2021-06-07T22:18:47.267\",null]]]}"
;
// consumerAdamProcessor.consumerSqlForUCenterA(null, null);
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-dragon/pom.xml
View file @
3e8b549a
...
...
@@ -17,20 +17,6 @@
</properties>
<dependencies>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-common-mq
</artifactId>
<version>
1.0-SNAPSHOT
</version>
</dependency>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-common-web
</artifactId>
</dependency>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-common-cache-redis
</artifactId>
<version>
1.0-SNAPSHOT
</version>
</dependency>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-service-dragon-api
</artifactId>
...
...
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-dragon/src/main/java/com/liquidnet/service/consumer/dragon/service/processor/ConsumerAdamUCenterProcessor.java
deleted
100644 → 0
View file @
1c89b7f7
package
com
.
liquidnet
.
service
.
consumer
.
dragon
.
service
.
processor
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.consumer.dragon.service.IBaseDao
;
import
com.rabbitmq.client.Channel
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.amqp.core.Message
;
import
org.springframework.amqp.core.MessageProperties
;
import
org.springframework.amqp.rabbit.annotation.Exchange
;
import
org.springframework.amqp.rabbit.annotation.Queue
;
import
org.springframework.amqp.rabbit.annotation.QueueBinding
;
import
org.springframework.amqp.rabbit.annotation.RabbitListener
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
import
java.io.IOException
;
/**
* ConsumerAdamProcessor.class
*
* @author zhanggb
* Created by IntelliJ IDEA at 2021/4/29
*/
@Slf4j
@Component
public
class
ConsumerAdamUCenterProcessor
{
// @Resource
// IBaseDao baseDao;
//
// private void consumerSqlDaoHandler(Message msg, Channel channel) {
// MessageProperties properties = msg.getMessageProperties();
// String consumerQueue = properties.getConsumerQueue();
// long deliveryTag = properties.getDeliveryTag();
// log.info("CONSUMER SQL ==> [consumerQueue:{},deliveryTag:{}]", consumerQueue, deliveryTag);
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(new String(msg.getBody()), SqlMapping.SqlMessage.class);
// log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
// log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
// try {
// Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
// if (rstBatchSqls) {
// channel.basicAck(deliveryTag, false);
// } else {
// log.warn("###CONSUMER SQL[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, JsonUtils.toJson(sqlMessage));
// channel.basicAck(deliveryTag, false);
// }
// } catch (IOException e) {
// log.error("CONSUMER SQL[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, JsonUtils.toJson(sqlMessage), e);
// }
// }
//
// /* ================================================================== | 用户注册 */
//
// @RabbitListener(
// bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER),
// key = MQConst.RK_SQL_UREGISTER,
// value = @Queue(MQConst.QUEUES_SQL_UREGISTER)
// ),
// concurrency = "5"
// )
// public void consumerSqlForURegister(Message msg, Channel channel) {
// this.consumerSqlDaoHandler(msg, channel);
// }
//
// /* ================================================================== | 用户信息 */
//
// @RabbitListener(
// bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER),
// key = MQConst.RK_SQL_UCENTER,
// value = @Queue(MQConst.QUEUES_SQL_UCENTER)
// ),
// concurrency = "5"
// )
// public void consumerSqlForUCenter(Message msg, Channel channel) {
// this.consumerSqlDaoHandler(msg, channel);
// }
//
// /* ================================================================== | 会员购买 */
//
// @RabbitListener(
// bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER),
// key = MQConst.RK_SQL_UMEMBER,
// value = @Queue(MQConst.QUEUES_SQL_UMEMBER)
// ),
// concurrency = "5"
// )
// public void consumerSqlForUMember(Message msg, Channel channel) {
// this.consumerSqlDaoHandler(msg, channel);
// }
/* ================================================================== | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-dragon/src/test/java/com/liquidnet/service/consumer/service/processor/ConsumerDragonUCenterProcessorTest.java
deleted
100644 → 0
View file @
1c89b7f7
package
com
.
liquidnet
.
service
.
consumer
.
service
.
processor
;
import
org.junit.jupiter.api.Test
;
import
org.junit.runner.RunWith
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.test.context.SpringBootTest
;
import
org.springframework.test.context.junit4.SpringJUnit4ClassRunner
;
@RunWith
(
SpringJUnit4ClassRunner
.
class
)
@SpringBootTest
public
class
ConsumerDragonUCenterProcessorTest
{
@Autowired
ConsumerDragonUCenterProcessorTest
consumerAdamUCenterProcessor
;
@Test
public
void
test
()
{
String
msg
=
"{\"sqls\":[\"INSERT INTO kylin_order_tickets(order_tickets_id,user_id,user_name,user_mobile,performance_title,order_code,qr_code,order_type,order_version,number,price,price_member,price_total,price_voucher,price_actual,price_express,price_refund,refund_number,pay_type,payment_type,time_pay,express_contacts,express_address,express_phone,coupon_type,get_ticket_type,get_ticket_describe,pay_countdown_minute,`comment`,created_at,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_status(order_ticket_status_id ,order_id ,express_type ,is_student ,transfer_status ,`status` ,pay_status ,created_at ,updated_at)VALUES(?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_relations(order_ticket_relations_id ,order_id ,transfer_id ,live_id ,agent_id ,is_member ,performance_id ,time_id,ticket_id ,created_at ,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?)\",\"INSERT INTO kylin_order_ticket_entities(order_ticket_entities_id ,order_id ,ticket_id ,user_id ,time_id ,performance_id ,enter_type ,enter_name ,enter_mobile,enter_id_code,`status`,sys_damai,check_client,is_payment,`comment`,created_at,updated_at)VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)\"],\"args\":[[[\"73901653639766016\",\"73459712083042304\",\"\",\"15001268203\",\"0607演出0607\",\"T7390165363976516260\",\"\",\"\",\"\",1,0.01,0.01,0.01,0.0,0.01,0.00,0.0,0,\"alipay\",null,null,\"\",\"\",\"\",\"no\",\"electronic\",\"\",5,null,\"2021-06-07T22:18:47.264\",null]],[[\"73901653694291968\",\"73901653639766016\",2,0,0,0,0,\"2021-06-07T22:18:47.267\",null]],[[\"73901653694291969\",\"73901653639766016\",\"\",\"\",\"0\",0,\"73776030099382272\",\"73776073304907776\",\"73776225042243584\",\"2021-06-07T22:18:47.267\",null]],[[\"73901653694291970\",\"73901653639766016\",\"73776225042243584\",\"73459712083042304\",\"73776073304907776\",\"73776030099382272\",0,\"\",\"\",\"\",0,0,\"\",0,\"\",\"2021-06-07T22:18:47.267\",null]]]}"
;
// consumerAdamProcessor.consumerSqlForUCenterA(null, null);
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/pom.xml
View file @
3e8b549a
...
...
@@ -17,11 +17,6 @@
</modules>
<dependencies>
<dependency>
<groupId>
com.xuxueli
</groupId>
<artifactId>
xxl-job-core
</artifactId>
<version>
2.3.0
</version>
</dependency>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-common-web
</artifactId>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment