记得上下班打卡 | git大法好,push需谨慎
Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liquidnet-bus-v1
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
董敬伟
liquidnet-bus-v1
Commits
4de40f04
Commit
4de40f04
authored
Oct 28, 2021
by
anjiabin
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
社交增加喜欢和不喜欢计数
parent
454c8d4e
Changes
19
Show whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
643 additions
and
6 deletions
+643
-6
ChimeConstant.java
...a/com/liquidnet/service/chime/constant/ChimeConstant.java
+8
-0
IChimeUserService.java
...om/liquidnet/service/chime/service/IChimeUserService.java
+4
-0
ChimeUserInfoVo.java
...com/liquidnet/service/chime/vo/mongo/ChimeUserInfoVo.java
+4
-0
ChimeUserOperLogVo.java
.../liquidnet/service/chime/vo/mongo/ChimeUserOperLogVo.java
+38
-0
MQConst.java
...ain/java/com/liquidnet/service/base/constant/MQConst.java
+31
-0
liquidnet-service-chime-dev.yml
...s-config/liquidnet-config/liquidnet-service-chime-dev.yml
+1
-1
liquidnet-service-chime.yml
...t-bus-config/liquidnet-config/liquidnet-service-chime.yml
+14
-0
liquidnet-service-consumer-adam-dev.yml
.../liquidnet-config/liquidnet-service-consumer-adam-dev.yml
+3
-0
liquidnet-service-consumer-adam.yml
...nfig/liquidnet-config/liquidnet-service-consumer-adam.yml
+5
-0
redis_queue_create.txt
.../liquidnet-service-chime-impl/docu/redis_queue_create.txt
+6
-3
ChimeUserController.java
...quidnet/service/chime/controller/ChimeUserController.java
+34
-0
ChimeUserServiceImpl.java
...dnet/service/chime/service/impl/ChimeUserServiceImpl.java
+32
-2
QueueUtils.java
...in/java/com/liquidnet/service/chime/utils/QueueUtils.java
+41
-0
pom.xml
...vice-consumer-all/liquidnet-service-consumer-adam/pom.xml
+9
-0
ConsumerChimeRedisStreamConfig.java
.../consumer/adam/config/ConsumerChimeRedisStreamConfig.java
+230
-0
AbstractChimeRedisReceiver.java
...ce/consumer/adam/receiver/AbstractChimeRedisReceiver.java
+74
-0
ConsumerChimeUserOperationDisLikeRdsReceiver.java
...eceiver/ConsumerChimeUserOperationDisLikeRdsReceiver.java
+17
-0
ConsumerChimeUserOperationLikeRdsReceiver.java
...m/receiver/ConsumerChimeUserOperationLikeRdsReceiver.java
+17
-0
ChimeDataUtils.java
.../liquidnet/service/consumer/adam/util/ChimeDataUtils.java
+75
-0
No files found.
liquidnet-bus-api/liquidnet-service-chime-api/src/main/java/com/liquidnet/service/chime/constant/ChimeConstant.java
View file @
4de40f04
...
@@ -13,6 +13,7 @@ import com.liquidnet.commons.lang.util.IDGenerator;
...
@@ -13,6 +13,7 @@ import com.liquidnet.commons.lang.util.IDGenerator;
*/
*/
public
class
ChimeConstant
{
public
class
ChimeConstant
{
public
static
String
USER_ID_PREFIX
=
"CHE"
;
public
static
String
USER_ID_PREFIX
=
"CHE"
;
public
static
String
LOG_ID_PREFIX
=
"LOG"
;
public
static
final
String
PREFIX
=
"chime:"
;
public
static
final
String
PREFIX
=
"chime:"
;
public
static
final
String
REDIS_CITY_NAME_ALLOW
=
PREFIX
.
concat
(
"per:cityName:allow"
);
public
static
final
String
REDIS_CITY_NAME_ALLOW
=
PREFIX
.
concat
(
"per:cityName:allow"
);
...
@@ -22,4 +23,11 @@ public class ChimeConstant {
...
@@ -22,4 +23,11 @@ public class ChimeConstant {
public
static
String
getNewUserId
(){
public
static
String
getNewUserId
(){
return
USER_ID_PREFIX
+
IDGenerator
.
nextTimeId
();
return
USER_ID_PREFIX
+
IDGenerator
.
nextTimeId
();
}
}
public
static
String
getLogMid
(){
return
LOG_ID_PREFIX
+
IDGenerator
.
nextTimeId
();
}
public
static
final
String
LOG_USER_OPERATION_LIKE
=
"1"
;
public
static
final
String
LOG_USER_OPERATION_DISLIKE
=
"2"
;
}
}
liquidnet-bus-api/liquidnet-service-chime-api/src/main/java/com/liquidnet/service/chime/service/IChimeUserService.java
View file @
4de40f04
...
@@ -25,4 +25,8 @@ public interface IChimeUserService {
...
@@ -25,4 +25,8 @@ public interface IChimeUserService {
ChimeUserInfoDto
getUserByUserId
(
String
userId
);
ChimeUserInfoDto
getUserByUserId
(
String
userId
);
boolean
switchPerformanceId
(
String
performanceId
);
boolean
switchPerformanceId
(
String
performanceId
);
boolean
userLikeOperation
(
String
currentUserId
,
String
targetUserId
);
boolean
userDisLikeOperation
(
String
currentUserId
,
String
targetUserId
);
}
}
liquidnet-bus-api/liquidnet-service-chime-api/src/main/java/com/liquidnet/service/chime/vo/mongo/ChimeUserInfoVo.java
View file @
4de40f04
...
@@ -34,6 +34,10 @@ public class ChimeUserInfoVo {
...
@@ -34,6 +34,10 @@ public class ChimeUserInfoVo {
private
String
createdAt
;
private
String
createdAt
;
@ApiModelProperty
(
position
=
15
,
value
=
"更新时间"
)
@ApiModelProperty
(
position
=
15
,
value
=
"更新时间"
)
private
String
updatedAt
;
private
String
updatedAt
;
@ApiModelProperty
(
position
=
10
,
value
=
"喜欢操作"
)
private
long
likeCount
;
@ApiModelProperty
(
position
=
11
,
value
=
"不喜欢操作"
)
private
long
disLikeCount
;
private
static
final
ChimeUserInfoVo
obj
=
new
ChimeUserInfoVo
();
private
static
final
ChimeUserInfoVo
obj
=
new
ChimeUserInfoVo
();
public
static
ChimeUserInfoVo
getNew
()
{
public
static
ChimeUserInfoVo
getNew
()
{
...
...
liquidnet-bus-api/liquidnet-service-chime-api/src/main/java/com/liquidnet/service/chime/vo/mongo/ChimeUserOperLogVo.java
0 → 100644
View file @
4de40f04
package
com
.
liquidnet
.
service
.
chime
.
vo
.
mongo
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: TODO
* @class: ChimeUserTagsMappingVo
* @Package com.liquidnet.service.chime.vo.mongo
* @Copyright: LightNet @ Copyright (c) 2021
* @date 2021/9/3 16:50
*/
@ApiModel
(
value
=
"ChimeUserInfoVo"
,
description
=
"社交用户信息"
)
@Data
public
class
ChimeUserOperLogVo
{
private
String
mid
;
private
static
final
long
serialVersionUID
=
5325511589667456213L
;
@ApiModelProperty
(
position
=
0
,
value
=
"用户ID[64]"
)
private
String
currentUserId
;
@ApiModelProperty
(
position
=
1
,
value
=
"性别"
)
private
String
targetUserId
;
@ApiModelProperty
(
position
=
2
,
value
=
"操作类型"
)
private
String
opType
;
@ApiModelProperty
(
position
=
3
,
value
=
"创建时间"
)
private
String
createdAt
;
private
static
final
ChimeUserOperLogVo
obj
=
new
ChimeUserOperLogVo
();
public
static
ChimeUserOperLogVo
getNew
()
{
try
{
return
(
ChimeUserOperLogVo
)
obj
.
clone
();
}
catch
(
CloneNotSupportedException
e
)
{
return
new
ChimeUserOperLogVo
();
}
}
}
liquidnet-bus-common/liquidnet-common-service-base/src/main/java/com/liquidnet/service/base/constant/MQConst.java
View file @
4de40f04
...
@@ -139,4 +139,35 @@ public class MQConst {
...
@@ -139,4 +139,35 @@ public class MQConst {
return
desc
;
return
desc
;
}
}
}
}
public
enum
ChimeQueue
{
USER_OPERATION_LIKE
(
"chime:stream:rk.user.operation.like"
,
"group.user.operation.like"
,
"划卡操作-喜欢"
),
USER_OPERATION_DISLIKE
(
"chime:stream:rk.user.operation.dislike"
,
"group.user.operation.dislike"
,
"划卡操作-不喜欢"
);
private
final
String
key
;
private
final
String
group
;
private
final
String
desc
;
ChimeQueue
(
String
key
,
String
group
,
String
desc
)
{
this
.
key
=
key
;
this
.
group
=
group
;
this
.
desc
=
desc
;
}
public
String
getKey
()
{
return
key
;
}
public
String
getGroup
()
{
return
group
;
}
public
String
getDesc
()
{
return
desc
;
}
}
public
static
void
main
(
String
[]
args
)
{
System
.
out
.
println
(
ChimeQueue
.
USER_OPERATION_LIKE
.
name
());
}
}
}
liquidnet-bus-config/liquidnet-config/liquidnet-service-chime-dev.yml
View file @
4de40f04
...
@@ -17,4 +17,4 @@ liquidnet:
...
@@ -17,4 +17,4 @@ liquidnet:
level
:
info
level
:
info
mongodb
:
mongodb
:
sslEnabled
:
false
sslEnabled
:
false
database
:
dev
_ln_scene
database
:
test
_ln_scene
liquidnet-bus-config/liquidnet-config/liquidnet-service-chime.yml
View file @
4de40f04
...
@@ -73,6 +73,17 @@ spring:
...
@@ -73,6 +73,17 @@ spring:
exclude
:
exclude
:
-
org.springframework.cloud.bus.BusAutoConfiguration
-
org.springframework.cloud.bus.BusAutoConfiguration
-
org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
-
org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
redis
:
database
:
${liquidnet.redis.adam.database}
port
:
${liquidnet.redis.adam.port}
host
:
${liquidnet.redis.adam.host}
password
:
${liquidnet.redis.adam.password}
lettuce
:
pool
:
max-active
:
5
max-wait
:
-1
max-idle
:
8
min-idle
:
0
data
:
data
:
mongodb
:
mongodb
:
uri
:
mongodb://${liquidnet.mongodb.chime.user}:${liquidnet.mongodb.chime.pwd}@${liquidnet.mongodb.chime.host}/?authSource=admin&maxPoolSize=2000&waitQueueMultiple=100
uri
:
mongodb://${liquidnet.mongodb.chime.user}:${liquidnet.mongodb.chime.pwd}@${liquidnet.mongodb.chime.host}/?authSource=admin&maxPoolSize=2000&waitQueueMultiple=100
...
@@ -88,6 +99,9 @@ global-auth:
...
@@ -88,6 +99,9 @@ global-auth:
-
${liquidnet.info.context}/swagger-resources/**
-
${liquidnet.info.context}/swagger-resources/**
-
${liquidnet.info.context}/v2/api-docs*
-
${liquidnet.info.context}/v2/api-docs*
-
${liquidnet.info.context}/user/register
-
${liquidnet.info.context}/user/register
-
${liquidnet.info.context}/user/userLikeOperation
-
${liquidnet.info.context}/user/userDislikeOperation
-
${liquidnet.info.context}/performance/getUserListByCon
# -----------------------------------------------------------
# -----------------------------------------------------------
# -----------------------------------------------------------
# -----------------------------------------------------------
\ No newline at end of file
liquidnet-bus-config/liquidnet-config/liquidnet-service-consumer-adam-dev.yml
View file @
4de40f04
...
@@ -18,5 +18,8 @@ liquidnet:
...
@@ -18,5 +18,8 @@ liquidnet:
level-root
:
debug
level-root
:
debug
mysql
:
mysql
:
database-name
:
dev_ln_scene
database-name
:
dev_ln_scene
mongodb
:
sslEnabled
:
false
database
:
test_ln_scene
#以下为spring各环境个性配置
#以下为spring各环境个性配置
\ No newline at end of file
liquidnet-bus-config/liquidnet-config/liquidnet-service-consumer-adam.yml
View file @
4de40f04
...
@@ -79,6 +79,11 @@ spring:
...
@@ -79,6 +79,11 @@ spring:
maximum-pool-size
:
16
maximum-pool-size
:
16
minimum-idle
:
8
minimum-idle
:
8
connection-test-query
:
SELECT 1
connection-test-query
:
SELECT 1
data
:
mongodb
:
uri
:
mongodb://${liquidnet.mongodb.chime.user}:${liquidnet.mongodb.chime.pwd}@${liquidnet.mongodb.chime.host}/?authSource=admin&maxPoolSize=2000&waitQueueMultiple=100
sslEnabled
:
${liquidnet.mongodb.sslEnabled}
database
:
${liquidnet.mongodb.database}
# -----------------------------------------------------------
# -----------------------------------------------------------
# -----------------------------------------------------------
# -----------------------------------------------------------
liquidnet-bus-service/liquidnet-service-chime/liquidnet-service-chime-impl/docu/redis_queue_create.txt
View file @
4de40f04
-- pay
-- pay
XADD chime:stream: * 0 0
XADD chime:stream:rk.user.operation.like * 0 0
XGROUP CREATE dragon:stream:dragon-pay dragon-pay-group 0
XGROUP CREATE chime:stream:rk.user.operation.like group.user.operation.like 0
XADD chime:stream:rk.user.operation.dislike * 0 0
XGROUP CREATE chime:stream:rk.user.operation.dislike group.user.operation.dislike 0
-- delete
-- delete
XGROUP DESTROY
dragon:stream:dragon-pay dragon-pay-group
0
XGROUP DESTROY
chime:stream:rk.user.operation.like group.user.operation.like
0
liquidnet-bus-service/liquidnet-service-chime/liquidnet-service-chime-impl/src/main/java/com/liquidnet/service/chime/controller/ChimeUserController.java
View file @
4de40f04
...
@@ -91,4 +91,38 @@ public class ChimeUserController {
...
@@ -91,4 +91,38 @@ public class ChimeUserController {
}
}
return
ResponseDto
.
success
(
userInfoDto
);
return
ResponseDto
.
success
(
userInfoDto
);
}
}
@GetMapping
(
"userLikeOperation"
)
@ApiOperation
(
"划卡计数-喜欢"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
type
=
"query"
,
dataType
=
"String"
,
name
=
"currentUserId"
,
value
=
"当前登录用户ID"
,
required
=
true
),
@ApiImplicitParam
(
type
=
"query"
,
dataType
=
"String"
,
name
=
"targetUserId"
,
value
=
"目标用户ID"
,
required
=
true
)
})
public
ResponseDto
<
ChimeUserInfoDto
>
userLikeOperation
(
@RequestParam
(
defaultValue
=
""
)
String
currentUserId
,
@RequestParam
(
defaultValue
=
""
)
String
targetUserId
)
{
boolean
result
=
chimeUserService
.
userLikeOperation
(
currentUserId
,
targetUserId
);
if
(!
result
){
ResponseDto
.
failure
(
"用户划卡-喜欢-操作失败!currentUserId:{} targetUserId:{}"
,
currentUserId
,
targetUserId
);
}
return
ResponseDto
.
success
();
}
@GetMapping
(
"userDislikeOperation"
)
@ApiOperation
(
"划卡计数-不喜欢"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
type
=
"query"
,
dataType
=
"String"
,
name
=
"currentUserId"
,
value
=
"当前登录用户ID"
,
required
=
true
),
@ApiImplicitParam
(
type
=
"query"
,
dataType
=
"String"
,
name
=
"targetUserId"
,
value
=
"目标用户ID"
,
required
=
true
)
})
public
ResponseDto
<
ChimeUserInfoDto
>
userDisLikeOperation
(
@RequestParam
(
defaultValue
=
""
)
String
currentUserId
,
@RequestParam
(
defaultValue
=
""
)
String
targetUserId
)
{
boolean
result
=
chimeUserService
.
userDisLikeOperation
(
currentUserId
,
targetUserId
);
if
(!
result
){
ResponseDto
.
failure
(
"用户划卡-不喜欢-操作失败!currentUserId:{} targetUserId:{}"
,
currentUserId
,
targetUserId
);
}
return
ResponseDto
.
success
();
}
}
}
liquidnet-bus-service/liquidnet-service-chime/liquidnet-service-chime-impl/src/main/java/com/liquidnet/service/chime/service/impl/ChimeUserServiceImpl.java
View file @
4de40f04
package
com
.
liquidnet
.
service
.
chime
.
service
.
impl
;
package
com
.
liquidnet
.
service
.
chime
.
service
.
impl
;
import
com.liquidnet.common.cache.redis.util.RedisUtil
;
import
com.liquidnet.commons.lang.util.BeanUtil
;
import
com.liquidnet.commons.lang.util.BeanUtil
;
import
com.liquidnet.commons.lang.util.CurrentUtil
;
import
com.liquidnet.commons.lang.util.CurrentUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.commons.lang.util.StringUtil
;
import
com.liquidnet.commons.lang.util.StringUtil
;
import
com.liquidnet.service.adam.dto.rsc.AdamChimeUinfoDto
;
import
com.liquidnet.service.adam.dto.rsc.AdamChimeUinfoDto
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.chime.biz.ChimeUserBiz
;
import
com.liquidnet.service.chime.biz.ChimeUserBiz
;
import
com.liquidnet.service.chime.constant.ChimeConstant
;
import
com.liquidnet.service.chime.dto.ChimeUserInfoDto
;
import
com.liquidnet.service.chime.dto.ChimeUserInfoDto
;
import
com.liquidnet.service.chime.dto.ChimeUserRegisterReqDto
;
import
com.liquidnet.service.chime.dto.ChimeUserRegisterReqDto
;
import
com.liquidnet.service.chime.dto.ChimeUserTagDto
;
import
com.liquidnet.service.chime.dto.ChimeUserTagDto
;
import
com.liquidnet.service.chime.dto.ChimeUserUpdateReqDto
;
import
com.liquidnet.service.chime.dto.ChimeUserUpdateReqDto
;
import
com.liquidnet.service.chime.service.IChimeUserService
;
import
com.liquidnet.service.chime.service.IChimeUserService
;
import
com.liquidnet.service.chime.utils.DataUtils
;
import
com.liquidnet.service.chime.utils.DataUtils
;
import
com.liquidnet.service.chime.utils.QueueUtils
;
import
com.liquidnet.service.chime.vo.mongo.ChimeUserInfoVo
;
import
com.liquidnet.service.chime.vo.mongo.ChimeUserInfoVo
;
import
com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo
;
import
com.liquidnet.service.chime.vo.mongo.ChimeUserTagsMappingVo
;
import
com.liquidnet.service.chime.vo.mongo.ChimeUserTagsMappingVo
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
@@ -38,7 +42,7 @@ public class ChimeUserServiceImpl implements IChimeUserService {
...
@@ -38,7 +42,7 @@ public class ChimeUserServiceImpl implements IChimeUserService {
private
DataUtils
dataUtils
;
private
DataUtils
dataUtils
;
@Autowired
@Autowired
private
RedisUtil
redisUtil
;
private
QueueUtils
queueUtils
;
@Autowired
@Autowired
private
ChimeUserBiz
chimeUserBiz
;
private
ChimeUserBiz
chimeUserBiz
;
...
@@ -56,6 +60,8 @@ public class ChimeUserServiceImpl implements IChimeUserService {
...
@@ -56,6 +60,8 @@ public class ChimeUserServiceImpl implements IChimeUserService {
chimeUserInfoVo
.
setUserTagsVoList
(
userTagList
);
chimeUserInfoVo
.
setUserTagsVoList
(
userTagList
);
chimeUserInfoVo
.
setCreatedAt
(
LocalDateTime
.
now
().
toString
());
chimeUserInfoVo
.
setCreatedAt
(
LocalDateTime
.
now
().
toString
());
chimeUserInfoVo
.
setUpdatedAt
(
null
);
chimeUserInfoVo
.
setUpdatedAt
(
null
);
chimeUserInfoVo
.
setLikeCount
(
0
l
);
chimeUserInfoVo
.
setDisLikeCount
(
0
l
);
//插入mongo
//插入mongo
dataUtils
.
createChimeUser
(
chimeUserInfoVo
);
dataUtils
.
createChimeUser
(
chimeUserInfoVo
);
}
}
...
@@ -135,4 +141,28 @@ public class ChimeUserServiceImpl implements IChimeUserService {
...
@@ -135,4 +141,28 @@ public class ChimeUserServiceImpl implements IChimeUserService {
}
}
return
true
;
return
true
;
}
}
@Override
public
boolean
userLikeOperation
(
String
currentUserId
,
String
targetUserId
)
{
ChimeUserOperLogVo
chimeUserOperLogVo
=
ChimeUserOperLogVo
.
getNew
();
chimeUserOperLogVo
.
setMid
(
ChimeConstant
.
getLogMid
());
chimeUserOperLogVo
.
setCurrentUserId
(
currentUserId
);
chimeUserOperLogVo
.
setTargetUserId
(
targetUserId
);
chimeUserOperLogVo
.
setOpType
(
ChimeConstant
.
LOG_USER_OPERATION_LIKE
);
chimeUserOperLogVo
.
setCreatedAt
(
LocalDateTime
.
now
().
toString
());
queueUtils
.
sendMsgByRedis
(
MQConst
.
ChimeQueue
.
USER_OPERATION_LIKE
.
getKey
(),
JsonUtils
.
toJson
(
chimeUserOperLogVo
));
return
true
;
}
@Override
public
boolean
userDisLikeOperation
(
String
currentUserId
,
String
targetUserId
)
{
ChimeUserOperLogVo
chimeUserOperLogVo
=
ChimeUserOperLogVo
.
getNew
();
chimeUserOperLogVo
.
setMid
(
ChimeConstant
.
getLogMid
());
chimeUserOperLogVo
.
setCurrentUserId
(
currentUserId
);
chimeUserOperLogVo
.
setTargetUserId
(
targetUserId
);
chimeUserOperLogVo
.
setOpType
(
ChimeConstant
.
LOG_USER_OPERATION_DISLIKE
);
chimeUserOperLogVo
.
setCreatedAt
(
LocalDateTime
.
now
().
toString
());
queueUtils
.
sendMsgByRedis
(
MQConst
.
ChimeQueue
.
USER_OPERATION_DISLIKE
.
getKey
(),
JsonUtils
.
toJson
(
chimeUserOperLogVo
));
return
true
;
}
}
}
liquidnet-bus-service/liquidnet-service-chime/liquidnet-service-chime-impl/src/main/java/com/liquidnet/service/chime/utils/QueueUtils.java
0 → 100644
View file @
4de40f04
package
com
.
liquidnet
.
service
.
chime
.
utils
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.stereotype.Component
;
import
java.util.HashMap
;
@Component
public
class
QueueUtils
{
// @Autowired
// private RabbitTemplate rabbitTemplate;
@Autowired
StringRedisTemplate
stringRedisTemplate
;
/**
* 发送消息 - RABBIT
*
* @param exchange 交换机
* @param routeKey 路径
* @param jsonMsg Json字符串
*/
// public void sendMsgByRabbit(String exchange, String routeKey, String jsonMsg) {
// rabbitTemplate.convertAndSend(exchange, routeKey, jsonMsg);
// }
/**
* 发送消息 - REDIS
*
* @param streamKey Redis消费Key
* @param jsonMsg Json字符串
*/
public
void
sendMsgByRedis
(
String
streamKey
,
String
jsonMsg
)
{
HashMap
<
String
,
String
>
map
=
CollectionUtil
.
mapStringString
();
map
.
put
(
"message"
,
jsonMsg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
streamKey
));
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/pom.xml
View file @
4de40f04
...
@@ -21,6 +21,11 @@
...
@@ -21,6 +21,11 @@
<groupId>
org.springframework.boot
</groupId>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-web
</artifactId>
<artifactId>
spring-boot-starter-web
</artifactId>
</dependency>
</dependency>
<dependency>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-service-chime-api
</artifactId>
<version>
1.0-SNAPSHOT
</version>
</dependency>
<dependency>
<dependency>
<groupId>
com.liquidnet
</groupId>
<groupId>
com.liquidnet
</groupId>
<artifactId>
liquidnet-common-cache-redis
</artifactId>
<artifactId>
liquidnet-common-cache-redis
</artifactId>
...
@@ -31,6 +36,10 @@
...
@@ -31,6 +36,10 @@
<artifactId>
liquidnet-common-sms
</artifactId>
<artifactId>
liquidnet-common-sms
</artifactId>
<version>
1.0-SNAPSHOT
</version>
<version>
1.0-SNAPSHOT
</version>
</dependency>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-data-mongodb
</artifactId>
</dependency>
</dependencies>
</dependencies>
<build>
<build>
...
...
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/config/ConsumerChimeRedisStreamConfig.java
0 → 100644
View file @
4de40f04
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.adam.receiver.ConsumerChimeUserOperationDisLikeRdsReceiver
;
import
com.liquidnet.service.consumer.adam.receiver.ConsumerChimeUserOperationLikeRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerChimeRedisStreamConfig
{
@Autowired
ConsumerChimeUserOperationDisLikeRdsReceiver
consumerChimeUserOperationDisLikeRdsReceiver
;
@Autowired
ConsumerChimeUserOperationLikeRdsReceiver
consumerChimeUserOperationLikeRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 划卡-喜欢
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveUserOperationLike
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
ChimeQueue
.
USER_OPERATION_LIKE
.
getGroup
(),
MQConst
.
ChimeQueue
.
USER_OPERATION_LIKE
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
ChimeQueue
.
USER_OPERATION_LIKE
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerChimeUserOperationLikeRdsReceiver
);
}
/**
* 划卡-不喜欢
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveUserOperationDisLike
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
ChimeQueue
.
USER_OPERATION_DISLIKE
.
getGroup
(),
MQConst
.
ChimeQueue
.
USER_OPERATION_DISLIKE
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
ChimeQueue
.
USER_OPERATION_DISLIKE
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerChimeUserOperationDisLikeRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 用户注册 */
@Bean
public
Subscription
subscriptionUserOperationLike1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationLike2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationLike3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationLike4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationLike5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationLike6
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
6
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationLike7
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
7
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationLike8
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
8
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationLike9
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
9
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationLike10
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationLike
(
listenerContainer
,
10
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | 用户中心 */
@Bean
public
Subscription
subscriptionUserOperationDisLike1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationDisLike2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationDisLike3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationDisLike4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationDisLike5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationDisLike6
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
6
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationDisLike7
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
7
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationDisLike8
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
8
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationDisLike9
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
9
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionUserOperationDisLike10
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveUserOperationDisLike
(
listenerContainer
,
10
);
listenerContainer
.
start
();
return
subscription
;
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/receiver/AbstractChimeRedisReceiver.java
0 → 100644
View file @
4de40f04
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
receiver
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo
;
import
com.liquidnet.service.consumer.adam.util.ChimeDataUtils
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
import
java.util.HashMap
;
@Slf4j
public
abstract
class
AbstractChimeRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Autowired
private
ChimeDataUtils
chimeDataUtils
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
log
.
debug
(
"CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]"
,
this
.
getRedisStreamKey
(),
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
().
get
(
"message"
));
log
.
info
(
"CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}"
,
result
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
stringRedisTemplate
.
opsForStream
().
delete
(
this
.
getRedisStreamKey
(),
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]"
,
result
,
message
.
getId
(),
JsonUtils
.
toJson
(
message
),
e
);
}
finally
{
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
}
catch
(
Exception
ignored
)
{
}
}
}
private
boolean
consumerMessageHandler
(
String
msg
)
{
Boolean
aBoolean
=
false
;
try
{
ChimeUserOperLogVo
textMessage
=
JsonUtils
.
fromJson
(
msg
,
ChimeUserOperLogVo
.
class
);
if
(
textMessage
==
null
)
{
aBoolean
=
true
;
}
else
{
//执行计数
chimeDataUtils
.
updateChimeUser
(
textMessage
);
//创建操作日志
chimeDataUtils
.
createUserOperLog
(
textMessage
);
aBoolean
=
true
;
log
.
info
(
"consumerMessageHandler.msg===> "
,
msg
);
}
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER SQL FAIL ==> {}"
,
e
.
getMessage
(),
e
);
}
finally
{
if
(!
aBoolean
)
{
HashMap
<
String
,
String
>
map
=
CollectionUtil
.
mapStringString
();
map
.
put
(
"message"
,
msg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
this
.
getRedisStreamKey
()));
}
}
return
aBoolean
;
}
protected
abstract
String
getRedisStreamKey
();
protected
abstract
String
getRedisStreamGroup
();
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/receiver/ConsumerChimeUserOperationDisLikeRdsReceiver.java
0 → 100644
View file @
4de40f04
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerChimeUserOperationDisLikeRdsReceiver
extends
AbstractChimeRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
ChimeQueue
.
USER_OPERATION_DISLIKE
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
ChimeQueue
.
USER_OPERATION_DISLIKE
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/receiver/ConsumerChimeUserOperationLikeRdsReceiver.java
0 → 100644
View file @
4de40f04
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerChimeUserOperationLikeRdsReceiver
extends
AbstractChimeRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
ChimeQueue
.
USER_OPERATION_LIKE
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
ChimeQueue
.
USER_OPERATION_LIKE
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/adam/util/ChimeDataUtils.java
0 → 100644
View file @
4de40f04
package
com
.
liquidnet
.
service
.
consumer
.
adam
.
util
;
import
com.liquidnet.service.chime.constant.ChimeConstant
;
import
com.liquidnet.service.chime.vo.mongo.ChimeUserInfoVo
;
import
com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo
;
import
com.mongodb.client.result.UpdateResult
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.mongodb.core.MongoTemplate
;
import
org.springframework.data.mongodb.core.query.Criteria
;
import
org.springframework.data.mongodb.core.query.Query
;
import
org.springframework.data.mongodb.core.query.Update
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
ChimeDataUtils
{
@Autowired
private
MongoTemplate
mongoTemplate
;
/**
* 添加操作日志
* @param chimeUserOperLogVo
*/
public
void
createUserOperLog
(
ChimeUserOperLogVo
chimeUserOperLogVo
){
mongoTemplate
.
save
(
chimeUserOperLogVo
,
ChimeUserOperLogVo
.
class
.
getSimpleName
());
}
/**
* 修改社交用户操作计数
* @param chimeUserOperLogVo
*/
public
void
updateChimeUser
(
ChimeUserOperLogVo
chimeUserOperLogVo
)
{
ChimeUserInfoVo
chimeUserInfoVo
=
this
.
getUserByUserId
(
chimeUserOperLogVo
.
getTargetUserId
());
if
(
chimeUserInfoVo
==
null
){
log
.
error
(
"chimeUserInfoVo is null userId is not exist:"
);
return
;
}
Query
query
=
Query
.
query
(
Criteria
.
where
(
"userId"
).
is
(
chimeUserInfoVo
.
getUserId
()));
Update
update
=
new
Update
();
if
(
chimeUserOperLogVo
.
getOpType
().
equalsIgnoreCase
(
ChimeConstant
.
LOG_USER_OPERATION_LIKE
)){
update
.
set
(
"likeCount"
,
chimeUserInfoVo
.
getLikeCount
()
+
1
);
}
else
if
(
chimeUserOperLogVo
.
getOpType
().
equalsIgnoreCase
(
ChimeConstant
.
LOG_USER_OPERATION_DISLIKE
)){
update
.
set
(
"disLikeCount"
,
chimeUserInfoVo
.
getDisLikeCount
()
+
1
);
}
UpdateResult
result
=
mongoTemplate
.
updateFirst
(
query
,
update
,
ChimeUserInfoVo
.
class
,
ChimeUserInfoVo
.
class
.
getSimpleName
());
log
.
info
(
"updateChimeUser result:{}"
,
result
.
toString
());
// Query query = Query.query(Criteria.where("userId").is(chimeUserInfoVo.getUserId()));
//// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo)));
// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo)));
// if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)){
// update.set("likeCount", chimeUserInfoVo.getLikeCount() + 1);
// }else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
// update.set("disLikeCount", chimeUserInfoVo.getDisLikeCount() + 1);
// }
// update.set("likeCount", 8);
// update.set("disLikeCount", 9);
// UpdateResult result = mongoTemplate.updateFirst(query,update,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
// log.info("updateChimeUser result:{}",result.toString());
}
/**
* 获取单个用户信息
* @param userId
* @return
*/
public
ChimeUserInfoVo
getUserByUserId
(
String
userId
){
Query
query
=
Query
.
query
(
Criteria
.
where
(
"userId"
).
is
(
userId
));
ChimeUserInfoVo
chimeUserInfoVo
=
mongoTemplate
.
findOne
(
query
,
ChimeUserInfoVo
.
class
,
ChimeUserInfoVo
.
class
.
getSimpleName
());
return
chimeUserInfoVo
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment