记得上下班打卡 | git大法好,push需谨慎
Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liquidnet-bus-v1
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
董敬伟
liquidnet-bus-v1
Commits
2e324a78
Commit
2e324a78
authored
Jun 06, 2022
by
zhanggb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
~queue:consumer-base.+biz、+mdb;
~queue:consumer-base.+biz.artwork.upl;
parent
c15885c1
Changes
14
Show whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
732 additions
and
15 deletions
+732
-15
MQConst.java
...ain/java/com/liquidnet/service/base/constant/MQConst.java
+35
-0
liquidnet-service-consumer-base-dev.yml
.../liquidnet-config/liquidnet-service-consumer-base-dev.yml
+21
-0
liquidnet-service-consumer-base-test.yml
...liquidnet-config/liquidnet-service-consumer-base-test.yml
+21
-0
liquidnet-service-consumer-base.yml
...nfig/liquidnet-config/liquidnet-service-consumer-base.yml
+90
-0
ConsumerCommonBizRedisStreamConfig.java
...sumer/base/config/ConsumerCommonBizRedisStreamConfig.java
+49
-0
ConsumerCommonMdbRedisStreamConfig.java
...sumer/base/config/ConsumerCommonMdbRedisStreamConfig.java
+49
-0
ConsumerCommonSqlRedisStreamConfig.java
...sumer/base/config/ConsumerCommonSqlRedisStreamConfig.java
+15
-15
AbstractBizRedisReceiver.java
...vice/consumer/base/receiver/AbstractBizRedisReceiver.java
+48
-0
AbstractMdbRedisReceiver.java
...vice/consumer/base/receiver/AbstractMdbRedisReceiver.java
+105
-0
ConsumerCommonMDB0Receiver.java
...ce/consumer/base/receiver/ConsumerCommonMDB0Receiver.java
+18
-0
ConsumerGoblinBizArtworkUplReceiver.java
...er/base/receiver/ConsumerGoblinBizArtworkUplReceiver.java
+72
-0
bootstrap-service-consumer-base.yml
...se/src/main/resources/bootstrap-service-consumer-base.yml
+1
-0
GoblinQueBizArtworkController.java
...oblin/controller/Inner/GoblinQueBizArtworkController.java
+36
-0
GoblinQueBizArtworkService.java
...goblin/service/impl/inner/GoblinQueBizArtworkService.java
+172
-0
No files found.
liquidnet-bus-common/liquidnet-common-service-base/src/main/java/com/liquidnet/service/base/constant/MQConst.java
View file @
2e324a78
...
@@ -353,6 +353,41 @@ public class MQConst {
...
@@ -353,6 +353,41 @@ public class MQConst {
}
}
}
}
public
enum
CommonQueue
{
/**
* SQL持久化:优先级0<1<2<3...
*/
SQL0
(
"stream:common:sql:0"
,
"group.sql"
,
"Mysql持久化0"
),
SQL1
(
"stream:common:sql:1"
,
"group.sql"
,
"Mysql持久化1"
),
SQL2
(
"stream:common:sql:2"
,
"group.sql"
,
"Mysql持久化2"
),
SQL3
(
"stream:common:sql:3"
,
"group.sql"
,
"Mysql持久化3"
),
MDB0
(
"stream:common:mdb:0"
,
"group.mdb"
,
"Mongo持久化0"
),
;
private
final
String
key
;
private
final
String
group
;
private
final
String
desc
;
CommonQueue
(
String
key
,
String
group
,
String
desc
)
{
this
.
key
=
key
;
this
.
group
=
group
;
this
.
desc
=
desc
;
}
public
String
getKey
()
{
return
key
;
}
public
String
getGroup
()
{
return
group
;
}
public
String
getDesc
()
{
return
desc
;
}
}
public
static
void
main
(
String
[]
args
)
{
public
static
void
main
(
String
[]
args
)
{
System
.
out
.
println
(
ChimeQueue
.
USER_OPERATION_LIKE
.
name
());
System
.
out
.
println
(
ChimeQueue
.
USER_OPERATION_LIKE
.
name
());
}
}
...
...
liquidnet-bus-config/liquidnet-config/liquidnet-service-consumer-base-dev.yml
0 → 100644
View file @
2e324a78
liquidnet
:
system
:
updating
:
switch
:
false
info
:
port
:
9989
context
:
# context: /service-consumer
name
:
liquidnet-service-consumer-base
logfile
:
path
:
/data/logs
name
:
service-consumer-base
max-history
:
7
level
:
debug
mysql
:
database-name
:
dev_ln_scene
mongodb
:
sslEnabled
:
false
database
:
dev_ln_scene
#以下为spring各环境个性配置
\ No newline at end of file
liquidnet-bus-config/liquidnet-config/liquidnet-service-consumer-base-test.yml
0 → 100644
View file @
2e324a78
liquidnet
:
system
:
updating
:
switch
:
false
info
:
port
:
9989
context
:
# context: /service-consumer
name
:
liquidnet-service-consumer-base
logfile
:
path
:
/data/logs
name
:
service-consumer-base
max-history
:
7
level
:
info
mysql
:
database-name
:
test_ln_scene
mongodb
:
sslEnabled
:
false
database
:
test_ln_scene
#以下为spring各环境个性配置
liquidnet-bus-config/liquidnet-config/liquidnet-service-consumer-base.yml
0 → 100644
View file @
2e324a78
server
:
port
:
${liquidnet.info.port}
tomcat
:
uri-encoding
:
UTF-8
servlet
:
context-path
:
${liquidnet.info.context}
# -----------------------------------------------------------
knife4j
:
production
:
${liquidnet.knife4j.disable}
basic
:
enable
:
true
username
:
${liquidnet.security.username}
password
:
${liquidnet.security.password}
# -----------------------------------------------------------
logging
:
# config: ${liquidnet.logfile.config}
file
:
name
:
${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size
:
200MB
max-history
:
${liquidnet.logfile.max-history}
pattern
:
file
:
'
%d{yyyy-MM-dd
HH:mm:ss.SSS}
[
%-5level]
%thread
[%logger{36}:%line]
-
%msg%n'
console
:
'
%d{yyyy-MM-dd
HH:mm:ss.SSS}
[
%-5level]
%thread
[%logger{36}:%line]
-
%msg%n'
rolling-file-name
:
${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level
:
root
:
error
#以下是为指定包设置日志级别
com.liquidnet.service.feign
:
error
com.liquidnet
:
${liquidnet.logfile.level}
# -----------------------------------------------------------
eureka
:
# client:
# register-with-eureka: true
# fetch-registry: true
# serviceUrl:
# defaultZone: http://${liquidnet.security.username}:${liquidnet.security.password}@${liquidnet.eureka.host}/eureka-server/eureka
instance
:
hostname
:
${spring.cloud.client.ip-address}
lease-expiration-duration-in-seconds
:
15
#服务过期时间配置,超过这个时间没有接收到心跳EurekaServer就会将这个实例剔除
lease-renewal-interval-in-seconds
:
5
#服务刷新时间配置,每隔这个时间会主动心跳一次
prefer-ip-address
:
true
instance-id
:
${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
# -----------------------------------------------------------
#actuator/info
info
:
app
:
name
:
${liquidnet.info.name}
company
:
name
:
zhengzai.tv
build
:
groupId
:
'
@project.groupId@'
artifactId
:
'
@project.artifactId@'
version
:
'
@project.version@'
# -----------------------------------------------------------
spring
:
application
:
name
:
${liquidnet.info.name}
profiles
:
include
:
common-service
#这里加载management相关公共配置
redis
:
database
:
${liquidnet.redis.kylin.database}
dbs
:
${liquidnet.redis.kylin.dbs}
port
:
${liquidnet.redis.kylin.port}
host
:
${liquidnet.redis.kylin.host}
password
:
${liquidnet.redis.kylin.password}
lettuce
:
pool
:
max-active
:
20
max-wait
:
-1
max-idle
:
8
min-idle
:
0
data
:
mongodb
:
uri
:
mongodb://${liquidnet.mongodb.user}:${liquidnet.mongodb.pwd}@${liquidnet.mongodb.host}/?authSource=admin&maxPoolSize=200&waitQueueMultiple=100
sslEnabled
:
${liquidnet.mongodb.sslEnabled}
database
:
${liquidnet.mongodb.database}
datasource
:
name
:
${liquidnet.mysql.database-name}
url
:
jdbc:mysql://${liquidnet.mysql.urlHostAndPort}/${liquidnet.mysql.database-name}?serverTimezone=UTC&characterEncoding=utf-8&useSSL=false
username
:
${liquidnet.mysql.username}
password
:
${liquidnet.mysql.password}
# type: org.apache.tomcat.jdbc.pool.DataSource
driver-class-name
:
com.mysql.cj.jdbc.Driver
hikari
:
maximum-pool-size
:
45
minimum-idle
:
8
connection-test-query
:
SELECT 1
# -----------------------------------------------------------
# -----------------------------------------------------------
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/ConsumerCommonBizRedisStreamConfig.java
0 → 100644
View file @
2e324a78
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.ConsumerGoblinBizArtworkUplReceiver
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* 公共的业务队列消息消费器初始化配置
*
* @author zhanggb
* Created by IntelliJ IDEA at 2022/6/2
*/
@Configuration
public
class
ConsumerCommonBizRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Autowired
ConsumerGoblinBizArtworkUplReceiver
consumerGoblinBizArtworkUplReceiver
;
@Bean
// 藏品上传声明
public
List
<
Subscription
>
subscriptionGoblinBizArtworkUpl
(
RedisConnectionFactory
factory
)
{
List
<
Subscription
>
subscriptionList
=
new
ArrayList
<>();
MQConst
.
GoblinQueue
stream
=
MQConst
.
GoblinQueue
.
BIZ_ARTWORK_UPL
;
this
.
initStream
(
stringRedisTemplate
,
stream
.
getKey
(),
stream
.
getGroup
());
for
(
int
i
=
0
;
i
<
1
;
i
++)
{
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
subscriptionList
.
add
(
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
stream
.
getGroup
(),
getConsumerName
(
stream
.
name
()
+
i
)),
StreamOffset
.
create
(
stream
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerGoblinBizArtworkUplReceiver
));
listenerContainer
.
start
();
}
return
subscriptionList
;
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/ConsumerCommonMdbRedisStreamConfig.java
0 → 100644
View file @
2e324a78
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.ConsumerCommonMDB0Receiver
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* 公共的Mongo队列消息消费器初始化配置
*
* @author zhanggb
* Created by IntelliJ IDEA at 2022/6/2
*/
@Configuration
public
class
ConsumerCommonMdbRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Autowired
ConsumerCommonMDB0Receiver
consumerCommonMDB0Receiver
;
// @Bean
// public List<Subscription> subscriptionMDB0(RedisConnectionFactory factory) {
// List<Subscription> subscriptionList = new ArrayList<>();
// MQConst.CommonQueue stream = MQConst.CommonQueue.MDB0;
// this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
// for (int i = 0; i < 5; i++) {
// StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
// subscriptionList.add(listenerContainer.receiveAutoAck(
// Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
// StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerCommonMDB0Receiver
// ));
// listenerContainer.start();
// }
// return subscriptionList;
// }
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/ConsumerCommonSqlRedisStreamConfig.java
View file @
2e324a78
...
@@ -31,19 +31,19 @@ public class ConsumerCommonSqlRedisStreamConfig extends RedisStreamConfig {
...
@@ -31,19 +31,19 @@ public class ConsumerCommonSqlRedisStreamConfig extends RedisStreamConfig {
@Autowired
@Autowired
ConsumerCommonSQL0Receiver
consumerCommonSQL0Receiver
;
ConsumerCommonSQL0Receiver
consumerCommonSQL0Receiver
;
@Bean
//
@Bean
public
List
<
Subscription
>
subscriptionSQL0
(
RedisConnectionFactory
factory
)
{
//
public List<Subscription> subscriptionSQL0(RedisConnectionFactory factory) {
List
<
Subscription
>
subscriptionList
=
new
ArrayList
<>();
//
List<Subscription> subscriptionList = new ArrayList<>();
MQConst
.
CommonQueue
stream
=
MQConst
.
CommonQueue
.
SQL0
;
//
MQConst.CommonQueue stream = MQConst.CommonQueue.SQL0;
this
.
initStream
(
stringRedisTemplate
,
stream
.
getKey
(),
stream
.
getGroup
());
//
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
//
for (int i = 0; i < 5; i++) {
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
//
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList
.
add
(
listenerContainer
.
receiveAutoAck
(
//
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer
.
from
(
stream
.
getGroup
(),
getConsumerName
(
stream
.
name
()
+
i
)),
//
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset
.
create
(
stream
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerCommonSQL0Receiver
//
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerCommonSQL0Receiver
));
//
));
listenerContainer
.
start
();
//
listenerContainer.start();
}
//
}
return
subscriptionList
;
//
return subscriptionList;
}
//
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/AbstractBizRedisReceiver.java
0 → 100644
View file @
2e324a78
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.service.IBaseDao
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
/**
* 公共的业务队列消息监听器,具体消费逻辑通过`consumerMessageHandler`实现
*
* @author zhanggb
* Created by IntelliJ IDEA at 2022/3/31
*/
@Slf4j
public
abstract
class
AbstractBizRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
public
IBaseDao
baseDao
;
@Autowired
public
StringRedisTemplate
stringRedisTemplate
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
String
redisStreamKey
=
this
.
getRedisStreamKey
();
log
.
debug
(
"CONSUMER MSG_BIZ[streamKey:{},messageId:{},stream:{},body:{}]"
,
redisStreamKey
,
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
().
get
(
MQConst
.
QUEUE_MESSAGE_KEY
));
log
.
info
(
"CONSUMER MSG_BIZ RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG_BIZ EX_ACK ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
redisStreamKey
,
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG_BIZ EX_DEL ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
}
protected
abstract
boolean
consumerMessageHandler
(
String
msg
);
protected
abstract
String
getRedisStreamKey
();
protected
abstract
String
getRedisStreamGroup
();
}
\ No newline at end of file
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/AbstractMdbRedisReceiver.java
0 → 100644
View file @
2e324a78
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
;
import
com.liquidnet.common.cache.redis.util.RedisUtil
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.MdbMessage
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.mongodb.BasicDBObject
;
import
com.mongodb.client.result.UpdateResult
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.mongodb.core.MongoTemplate
;
import
org.springframework.data.mongodb.core.query.Criteria
;
import
org.springframework.data.mongodb.core.query.Query
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
import
java.util.HashMap
;
/**
* 公共的Mongo数据处理队列消息监听器,具体消费逻辑统一使用`consumerMessageHandler`
*
* @author zhanggb
* Created by IntelliJ IDEA at 2022/4/22
*/
@Slf4j
public
abstract
class
AbstractMdbRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
public
StringRedisTemplate
stringRedisTemplate
;
@Autowired
private
MongoTemplate
mongoTemplate
;
@Autowired
private
RedisUtil
redisUtil
;
private
static
final
BasicDBObject
BASIC_DB_OBJECT
=
new
BasicDBObject
();
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
String
redisStreamKey
=
this
.
getRedisStreamKey
();
log
.
debug
(
"CONSUMER MSG_MDB[streamKey:{},messageId:{},stream:{},body:{}]"
,
redisStreamKey
,
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
().
get
(
MQConst
.
QUEUE_MESSAGE_KEY
));
log
.
info
(
"CONSUMER MSG_MDB RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG_MDB EX_ACK ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
redisStreamKey
,
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG_MDB EX_DEL ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
}
private
boolean
consumerMessageHandler
(
String
msg
)
{
boolean
aBoolean
=
true
;
try
{
MdbMessage
mdbMessage
=
JsonUtils
.
fromJson
(
msg
,
MdbMessage
.
class
);
if
(
null
!=
mdbMessage
)
{
String
collectName
=
mdbMessage
.
getCollect
(),
columnName
=
mdbMessage
.
getColumn
();
String
prefix
=
mdbMessage
.
getPrefix
(),
bizId
=
mdbMessage
.
getBizId
();
Object
o
;
switch
(
mdbMessage
.
getOpType
())
{
case
1
:
// insert
o
=
redisUtil
.
get
(
prefix
.
concat
(
bizId
));
if
(
null
!=
o
)
{
mongoTemplate
.
insert
(
o
,
collectName
);
}
break
;
case
2
:
// update
o
=
redisUtil
.
get
(
prefix
.
concat
(
bizId
));
if
(
null
!=
o
)
{
BasicDBObject
basicDBObject
=
(
BasicDBObject
)
AbstractMdbRedisReceiver
.
BASIC_DB_OBJECT
.
clone
();
UpdateResult
updateResult
=
mongoTemplate
.
getCollection
(
collectName
).
updateOne
(
Query
.
query
(
Criteria
.
where
(
columnName
).
is
(
bizId
)).
getQueryObject
(),
basicDBObject
.
append
(
"$set"
,
mongoTemplate
.
getConverter
().
convertToMongoType
(
o
))
);
}
break
;
default
:
log
.
error
(
"CONSUMER MSG_MDB ERR_HANDLE[未知的操作类型,{}]"
,
msg
);
break
;
}
}
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER MSG_MDB EX_HANDLE ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
msg
,
e
);
aBoolean
=
false
;
}
finally
{
if
(!
aBoolean
)
{
HashMap
<
String
,
String
>
map
=
CollectionUtil
.
mapStringString
();
map
.
put
(
MQConst
.
QUEUE_MESSAGE_KEY
,
msg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
this
.
getRedisStreamKey
()));
}
}
return
aBoolean
;
}
protected
abstract
String
getRedisStreamKey
();
protected
abstract
String
getRedisStreamGroup
();
}
\ No newline at end of file
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/ConsumerCommonMDB0Receiver.java
0 → 100644
View file @
2e324a78
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerCommonMDB0Receiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
CommonQueue
.
MDB0
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
CommonQueue
.
MDB0
.
getGroup
();
}
}
\ No newline at end of file
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/ConsumerGoblinBizArtworkUplReceiver.java
0 → 100644
View file @
2e324a78
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
;
import
com.fasterxml.jackson.databind.JsonNode
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.HttpUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang3.StringUtils
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.stereotype.Component
;
import
org.springframework.util.LinkedMultiValueMap
;
import
java.util.HashMap
;
@Slf4j
@Component
public
class
ConsumerGoblinBizArtworkUplReceiver
extends
AbstractBizRedisReceiver
{
@Value
(
"${liquidnet.service.goblin.url}"
)
private
String
serviceGoblinUrl
;
@Override
protected
boolean
consumerMessageHandler
(
String
msg
)
{
boolean
aBoolean
=
false
;
try
{
if
(
StringUtils
.
isEmpty
(
msg
))
{
log
.
warn
(
"CONSUMER MSG NULL_DTO ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
msg
);
aBoolean
=
true
;
}
else
{
aBoolean
=
this
.
bizArtworkUplProcessing
(
msg
);
}
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER MSG EX_HANDLE ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
msg
,
e
);
}
finally
{
if
(!
aBoolean
)
{
HashMap
<
String
,
String
>
map
=
CollectionUtil
.
mapStringString
();
map
.
put
(
MQConst
.
QUEUE_MESSAGE_KEY
,
msg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
this
.
getRedisStreamKey
()));
}
}
return
aBoolean
;
}
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
GoblinQueue
.
BIZ_ARTWORK_UPL
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
GoblinQueue
.
BIZ_ARTWORK_UPL
.
getGroup
();
}
private
boolean
bizArtworkUplProcessing
(
String
skuId
)
{
String
postUrl
=
serviceGoblinUrl
+
"/goblin/que/artwork/upl"
;
LinkedMultiValueMap
<
String
,
String
>
postDataMap
=
CollectionUtil
.
linkedMultiValueMapStringString
();
try
{
postDataMap
.
add
(
"skuId"
,
skuId
);
String
postRespStr
=
HttpUtil
.
post
(
postUrl
,
postDataMap
);
JsonNode
postRespJNode
=
JsonUtils
.
fromJson
(
postRespStr
,
JsonNode
.
class
),
postRespCode
;
if
(
null
==
postRespJNode
||
null
==
(
postRespCode
=
postRespJNode
.
get
(
"code"
))
||
!
postRespCode
.
asText
().
equals
(
"0"
))
{
log
.
warn
(
"#NFT素材上传:请求失败[paramsStr={},postRespStr={}]"
,
postDataMap
,
postRespStr
);
return
false
;
}
return
true
;
}
catch
(
Exception
e
)
{
log
.
error
(
"Ex.NFT素材上传:请求异常[url={},paramsStr={}],ex:{}"
,
postUrl
,
postDataMap
,
e
.
getMessage
());
return
false
;
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/resources/bootstrap-service-consumer-base.yml
View file @
2e324a78
...
@@ -9,6 +9,7 @@ eureka:
...
@@ -9,6 +9,7 @@ eureka:
spring
:
spring
:
cloud
:
cloud
:
config
:
config
:
# uri: http://127.0.0.1:7002/support-config
# uri: http://39.107.71.112:7002/support-config
# uri: http://39.107.71.112:7002/support-config
profile
:
${liquidnet.cloudConfig.profile}
profile
:
${liquidnet.cloudConfig.profile}
name
:
${spring.application.name}
#默认为spring.application.name
name
:
${spring.application.name}
#默认为spring.application.name
...
...
liquidnet-bus-service/liquidnet-service-goblin/liquidnet-service-goblin-impl/src/main/java/com/liquidnet/service/goblin/controller/Inner/GoblinQueBizArtworkController.java
0 → 100644
View file @
2e324a78
package
com
.
liquidnet
.
service
.
goblin
.
controller
.
Inner
;
import
com.liquidnet.service.base.ResponseDto
;
import
com.liquidnet.service.goblin.service.impl.inner.GoblinQueBizArtworkService
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiImplicitParam
;
import
io.swagger.annotations.ApiImplicitParams
;
import
io.swagger.annotations.ApiOperation
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.validation.annotation.Validated
;
import
org.springframework.web.bind.annotation.PostMapping
;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.RequestParam
;
import
org.springframework.web.bind.annotation.RestController
;
import
javax.validation.constraints.NotBlank
;
@Slf4j
@Api
(
tags
=
"@API:QUE"
)
@RestController
@Validated
@RequestMapping
(
"que/artwork"
)
public
class
GoblinQueBizArtworkController
{
@Autowired
private
GoblinQueBizArtworkService
goblinQueBizArtworkService
;
@PostMapping
(
"upl"
)
@ApiOperation
(
"藏品上传声明"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
type
=
"form"
,
required
=
true
,
dataType
=
"String"
,
name
=
"skuId"
,
value
=
"藏品ID"
,
example
=
"1"
),
})
public
ResponseDto
<
String
>
orderDetails
(
@NotBlank
(
message
=
"藏品ID不能为空"
)
@RequestParam
String
skuId
)
{
return
goblinQueBizArtworkService
.
bizArtworkUplProcessing
(
skuId
);
}
}
liquidnet-bus-service/liquidnet-service-goblin/liquidnet-service-goblin-impl/src/main/java/com/liquidnet/service/goblin/service/impl/inner/GoblinQueBizArtworkService.java
0 → 100644
View file @
2e324a78
package
com
.
liquidnet
.
service
.
goblin
.
service
.
impl
.
inner
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.ResponseDto
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.galaxy.dto.param.GalaxyArtSeriesClaimReqDto
;
import
com.liquidnet.service.galaxy.dto.param.GalaxyArtSeriesClaimRespDto
;
import
com.liquidnet.service.galaxy.dto.param.GalaxyNftUploadReqDto
;
import
com.liquidnet.service.galaxy.dto.param.GalaxyNftUploadRespDto
;
import
com.liquidnet.service.galaxy.service.IGalaxyArtworkService
;
import
com.liquidnet.service.goblin.constant.GoblinRedisConst
;
import
com.liquidnet.service.goblin.dto.vo.GoblinGoodsInfoVo
;
import
com.liquidnet.service.goblin.dto.vo.GoblinGoodsSkuInfoVo
;
import
com.liquidnet.service.goblin.util.GoblinMongoUtils
;
import
com.liquidnet.service.goblin.util.GoblinRedisUtils
;
import
com.liquidnet.service.goblin.util.QueueUtils
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang3.StringUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.mongodb.core.MongoTemplate
;
import
org.springframework.data.mongodb.core.query.Criteria
;
import
org.springframework.data.mongodb.core.query.Query
;
import
org.springframework.data.mongodb.core.query.Update
;
import
org.springframework.stereotype.Service
;
import
javax.annotation.Resource
;
import
java.time.LocalDateTime
;
import
java.util.LinkedList
;
@Slf4j
@Service
public
class
GoblinQueBizArtworkService
{
@Autowired
QueueUtils
queueUtils
;
@Autowired
GoblinRedisUtils
goblinRedisUtils
;
@Autowired
GoblinMongoUtils
goblinMongoUtils
;
@Autowired
MongoTemplate
mongoTemplate
;
@Resource
(
name
=
"galaxyArtworkServiceImpl"
)
private
IGalaxyArtworkService
galaxyArtworkService
;
private
static
final
String
SQL_UPDATE_GOODS_SKU_NFT
=
"UPDATE goblin_goods_sku_nft SET upchain=?,display_url=?,nft_url=? WHERE sku_id=? AND upchain=0 "
;
public
ResponseDto
<
String
>
bizArtworkUplProcessing
(
String
skuId
)
{
GoblinGoodsSkuInfoVo
mgtGoodsSkuInfoVo
=
goblinMongoUtils
.
getGoodsSkuInfoVo
(
skuId
);
if
(
null
==
mgtGoodsSkuInfoVo
)
{
log
.
warn
(
"#NFT素材上传:藏品SKU不存在[skuId={}]"
,
skuId
);
return
ResponseDto
.
success
(
String
.
format
(
"藏品SKU不存在[skuId:%s]"
,
skuId
));
}
int
skuType
=
mgtGoodsSkuInfoVo
.
getSkuType
(),
upchain
=
mgtGoodsSkuInfoVo
.
getUpchain
();
String
unbox
=
mgtGoodsSkuInfoVo
.
getUnbox
();
// 非数字藏品 || 盲盒 || 非声明中 || 已有声明系列ID
if
(
1
!=
skuType
||
!
"0"
.
equals
(
unbox
)
||
0
!=
upchain
||
StringUtils
.
isNotEmpty
(
mgtGoodsSkuInfoVo
.
getSeriesId
()))
{
log
.
warn
(
"#NFT素材上传:藏品SKU无效或已声明[skuId={},skuType={},unbox={},upchain={},seriesId={}]"
,
skuId
,
skuType
,
unbox
,
upchain
,
mgtGoodsSkuInfoVo
.
getSeriesId
());
return
ResponseDto
.
success
(
String
.
format
(
"藏品SKU无效或已声明[skuId:%s]"
,
skuId
));
}
String
displayUrl
=
mgtGoodsSkuInfoVo
.
getDisplayUrl
(),
nftUrl
=
mgtGoodsSkuInfoVo
.
getNftUrl
();
if
(
StringUtils
.
isBlank
(
displayUrl
))
{
// 未上传过的直接上传处理,已上传过的跳过上传直接声明
GalaxyNftUploadReqDto
galaxyNftUploadReqDto
=
GalaxyNftUploadReqDto
.
getNew
();
galaxyNftUploadReqDto
.
setSkuId
(
skuId
);
galaxyNftUploadReqDto
.
setOriginalDisplayUrl
(
mgtGoodsSkuInfoVo
.
getSkuPic
());
galaxyNftUploadReqDto
.
setOriginalNftUrl
(
mgtGoodsSkuInfoVo
.
getMaterialUrl
());
galaxyNftUploadReqDto
.
setRouterType
(
mgtGoodsSkuInfoVo
.
getRouteType
());
GalaxyNftUploadRespDto
galaxyNftUploadRespDto
=
this
.
uploadNftMaterial
(
galaxyNftUploadReqDto
);
if
(
null
==
galaxyNftUploadRespDto
)
{
return
ResponseDto
.
failure
(
String
.
format
(
"藏品上传失败[skuId:%s]"
,
skuId
));
// 上传失败,重新入队处理
}
displayUrl
=
galaxyNftUploadRespDto
.
getDisplayUrl
();
nftUrl
=
galaxyNftUploadRespDto
.
getNftUrl
();
}
GoblinGoodsInfoVo
mgtGoodsInfoVo
=
goblinMongoUtils
.
getGoodsInfoVo
(
mgtGoodsSkuInfoVo
.
getSpuId
());
String
skuTitle
=
mgtGoodsSkuInfoVo
.
getName
()
+
mgtGoodsSkuInfoVo
.
getSubtitle
();
GalaxyArtSeriesClaimReqDto
galaxyArtSeriesClaimReqDto
=
GalaxyArtSeriesClaimReqDto
.
getNew
();
galaxyArtSeriesClaimReqDto
.
setAuthor
(
mgtGoodsInfoVo
.
getAuthor
());
galaxyArtSeriesClaimReqDto
.
setCoverUrl
(
displayUrl
);
galaxyArtSeriesClaimReqDto
.
setDisplayUrl
(
displayUrl
);
galaxyArtSeriesClaimReqDto
.
setNftDesc
(
skuTitle
);
galaxyArtSeriesClaimReqDto
.
setNftName
(
skuTitle
);
galaxyArtSeriesClaimReqDto
.
setNftUrl
(
nftUrl
);
galaxyArtSeriesClaimReqDto
.
setRouterType
(
mgtGoodsSkuInfoVo
.
getRouteType
());
galaxyArtSeriesClaimReqDto
.
setSellCount
(
String
.
valueOf
(
mgtGoodsSkuInfoVo
.
getPrice
()));
galaxyArtSeriesClaimReqDto
.
setSeriesDesc
(
skuTitle
);
galaxyArtSeriesClaimReqDto
.
setSkuId
(
skuId
);
galaxyArtSeriesClaimReqDto
.
setTotalCount
(
Long
.
valueOf
(
mgtGoodsSkuInfoVo
.
getSkuStock
()));
// 声明失败,标记`声明失败`
upchain
=
null
==
this
.
claimNftSeries
(
galaxyArtSeriesClaimReqDto
)
?
2
:
9
;
mongoTemplate
.
getCollection
(
GoblinGoodsSkuInfoVo
.
class
.
getSimpleName
()).
updateOne
(
Query
.
query
(
Criteria
.
where
(
"skuId"
).
is
(
skuId
).
and
(
"delFlg"
).
is
(
"0"
)).
getQueryObject
(),
Update
.
update
(
"upchain"
,
upchain
).
set
(
"displayUrl"
,
displayUrl
).
set
(
"nftUrl"
,
nftUrl
).
getUpdateObject
()
);
goblinRedisUtils
.
del
(
GoblinRedisConst
.
BASIC_GOODS_SKU
.
concat
(
skuId
));
// Mysql持久化
// HashMap<String, String> sqlUpdateMap = CollectionUtil.mapStringString();
LinkedList
<
String
>
toMqSqls
=
CollectionUtil
.
linkedListString
();
toMqSqls
.
add
(
SQL_UPDATE_GOODS_SKU_NFT
);
LinkedList
<
Object
[]>
updateGoodsSkuNftObjs
=
CollectionUtil
.
linkedListObjectArr
();
updateGoodsSkuNftObjs
.
add
(
new
Object
[]{
upchain
,
displayUrl
,
nftUrl
,
skuId
});
// sqlUpdateMap.put(MQConst.QUEUE_MESSAGE_KEY, SqlMapping.gets(toMqSqls, updateGoodsSkuNftObjs));
queueUtils
.
sendMsgByRedis
(
MQConst
.
GoblinQueue
.
SQL_GOODS
.
getKey
(),
SqlMapping
.
gets
(
toMqSqls
,
updateGoodsSkuNftObjs
));
// StreamOperations<String, Object, Object> streamOperations = stringRedisTemplate.opsForStream();
// streamOperations.add(StreamRecords.mapBacked(sqlUpdateMap).withStreamKey(MQConst.GoblinQueue.SQL_GOODS.getKey()));
queueUtils
.
sendMsgByRedis
(
MQConst
.
GoblinQueue
.
BIZ_ARTWORK_CLQ
.
getKey
(),
skuId
.
concat
(
","
).
concat
(
String
.
valueOf
(
LocalDateTime
.
now
())));
// HashMap<String, String> toQueueBeClaimQueryMsg = CollectionUtil.mapStringString();
// toQueueBeClaimQueryMsg.put(MQConst.QUEUE_MESSAGE_KEY, skuId.concat(",").concat(String.valueOf(LocalDateTime.now())));
// streamOperations.add(StreamRecords.mapBacked(toQueueBeClaimQueryMsg).withStreamKey(MQConst.GoblinQueue.BIZ_ARTWORK_CLQ.getKey()));
// return true;
return
null
;
}
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------------------ */
/**
* NFT素材上传
*
* @param nftUploadReqDto GalaxyNftUploadReqDto
* @return GalaxyNftUploadRespDto
*/
private
GalaxyNftUploadRespDto
uploadNftMaterial
(
GalaxyNftUploadReqDto
nftUploadReqDto
)
{
ResponseDto
<
GalaxyNftUploadRespDto
>
responseDto
=
null
;
try
{
responseDto
=
galaxyArtworkService
.
nftUpload
(
nftUploadReqDto
);
if
(!
responseDto
.
isSuccess
())
{
log
.
warn
(
"#NFT素材上传:请求失败[paramsStr={},postRespStr={}]"
,
JsonUtils
.
toJson
(
nftUploadReqDto
),
JsonUtils
.
toJson
(
responseDto
));
return
null
;
}
return
responseDto
.
getData
();
}
catch
(
Exception
e
)
{
log
.
error
(
"Ex.NFT素材上传:请求异常[paramsStr={},postRespStr={}],ex:{}"
,
JsonUtils
.
toJson
(
nftUploadReqDto
),
JsonUtils
.
toJson
(
responseDto
),
e
.
getMessage
());
return
null
;
}
}
/**
* NFT系列声明
*
* @param requestDto GalaxyArtSeriesClaimReqDto
* @return GalaxyArtSeriesClaimRespDto
*/
private
GalaxyArtSeriesClaimRespDto
claimNftSeries
(
GalaxyArtSeriesClaimReqDto
requestDto
)
{
ResponseDto
<
GalaxyArtSeriesClaimRespDto
>
responseDto
=
null
;
try
{
responseDto
=
galaxyArtworkService
.
seriesClaim
(
requestDto
);
if
(!
responseDto
.
isSuccess
())
{
log
.
warn
(
"#NFT系列声明:请求失败[paramsStr={},postRespStr={}]"
,
JsonUtils
.
toJson
(
requestDto
),
JsonUtils
.
toJson
(
responseDto
));
return
null
;
}
return
responseDto
.
getData
();
}
catch
(
Exception
e
)
{
log
.
error
(
"Ex.NFT系列声明:请求异常[paramsStr={},postRespStr={}],ex:{}"
,
JsonUtils
.
toJson
(
requestDto
),
JsonUtils
.
toJson
(
responseDto
),
e
.
getMessage
());
return
null
;
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment