记得上下班打卡 | git大法好,push需谨慎
Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liquidnet-bus-v1
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
董敬伟
liquidnet-bus-v1
Commits
98c887bf
Commit
98c887bf
authored
Jun 06, 2022
by
胡佳晨
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
消费迁移 xls相关 转赠订单 定时过期 提交
parent
d1e3d6bf
Changes
17
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
992 additions
and
671 deletions
+992
-671
ConsumerGoblinXlsRedisStreamConfig.java
...sumer/base/config/ConsumerGoblinXlsRedisStreamConfig.java
+46
-0
ConsumerTransferRedisStreamConfig.java
...nsumer/base/config/ConsumerTransferRedisStreamConfig.java
+49
-0
AbstractOptOrderTransferOverTimeRedisReceiver.java
...ceiver/AbstractOptOrderTransferOverTimeRedisReceiver.java
+79
-0
AbstractXlsRedisReceiver.java
...vice/consumer/base/receiver/AbstractXlsRedisReceiver.java
+116
-0
ConsumerGoblinXlsRdsReceiver.java
.../consumer/base/receiver/ConsumerGoblinXlsRdsReceiver.java
+17
-0
ConsumerKylinTransferOverTimeRdsReceiver.java
...se/receiver/ConsumerKylinTransferOverTimeRdsReceiver.java
+17
-0
AbstractOptOrderTransferOverTimeRedisReceiver.java
...ceiver/AbstractOptOrderTransferOverTimeRedisReceiver.java
+81
-81
AbstractXlsRedisReceiver.java
...ice/consumer/kylin/receiver/AbstractXlsRedisReceiver.java
+149
-149
ConsumerGoblinXlsRdsReceiver.java
...consumer/kylin/receiver/ConsumerGoblinXlsRdsReceiver.java
+17
-17
ConsumerKylinTransferOverTimeRdsReceiver.java
...in/receiver/ConsumerKylinTransferOverTimeRdsReceiver.java
+17
-17
ConsumerGoblinXlsRedisStreamConfig.java
...mer/slowly/config/ConsumerGoblinXlsRedisStreamConfig.java
+58
-58
ConsumerKylinOptTransferOverTimeRedisStreamConfig.java
...ig/ConsumerKylinOptTransferOverTimeRedisStreamConfig.java
+81
-81
AbstractOptOrderTransferOverTimeRedisReceiver.java
...ceiver/AbstractOptOrderTransferOverTimeRedisReceiver.java
+81
-81
AbstractXlsRedisReceiver.java
...ce/consumer/slowly/receiver/AbstractXlsRedisReceiver.java
+148
-148
ConsumerGoblinXlsRdsReceiver.java
...onsumer/slowly/receiver/ConsumerGoblinXlsRdsReceiver.java
+17
-17
ConsumerKylinTransferOverTimeRdsReceiver.java
...ly/receiver/ConsumerKylinTransferOverTimeRdsReceiver.java
+17
-17
InnerController.java
...m/liquidnet/service/kylin/controller/InnerController.java
+2
-5
No files found.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/ConsumerGoblinXlsRedisStreamConfig.java
0 → 100644
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.ConsumerGoblinXlsRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.util.ArrayList
;
import
java.util.List
;
@Configuration
public
class
ConsumerGoblinXlsRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Autowired
ConsumerGoblinXlsRdsReceiver
consumerGoblinXlsRdsReceiver
;
// xls黑白名单
@Bean
public
List
<
Subscription
>
subscriptionGoblinXls
(
RedisConnectionFactory
factory
)
{
List
<
Subscription
>
subscriptionList
=
new
ArrayList
<>();
MQConst
.
GoblinQueue
stream
=
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
;
this
.
initStream
(
stringRedisTemplate
,
stream
.
getKey
(),
stream
.
getGroup
());
for
(
int
i
=
0
;
i
<
1
;
i
++)
{
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
subscriptionList
.
add
(
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
stream
.
getGroup
(),
getConsumerName
(
stream
.
name
()
+
i
)),
StreamOffset
.
create
(
stream
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerGoblinXlsRdsReceiver
));
listenerContainer
.
start
();
}
return
subscriptionList
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/ConsumerTransferRedisStreamConfig.java
0 → 100644
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.ConsumerKylinTransferOverTimeRdsReceiver
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.util.ArrayList
;
import
java.util.List
;
@Configuration
public
class
ConsumerTransferRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerKylinTransferOverTimeRdsReceiver
consumerKylinTransferOverTimeRdsReceiver
;
@Autowired
StringRedisTemplate
stringRedisTemplate
;
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 验票更新 */
// 转赠过期定时任务
@Bean
public
List
<
Subscription
>
subscriptionKylinTransferOverTime
(
RedisConnectionFactory
factory
)
{
List
<
Subscription
>
subscriptionList
=
new
ArrayList
<>();
MQConst
.
KylinQueue
stream
=
MQConst
.
KylinQueue
.
SQL_TRANSFER_OVERTIME
;
this
.
initStream
(
stringRedisTemplate
,
stream
.
getKey
(),
stream
.
getGroup
());
for
(
int
i
=
0
;
i
<
1
;
i
++)
{
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
subscriptionList
.
add
(
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
stream
.
getGroup
(),
getConsumerName
(
stream
.
name
()
+
i
)),
StreamOffset
.
create
(
stream
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinTransferOverTimeRdsReceiver
));
listenerContainer
.
start
();
}
return
subscriptionList
;
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/AbstractOptOrderTransferOverTimeRedisReceiver.java
0 → 100644
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
;
import
com.fasterxml.jackson.core.type.TypeReference
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.HttpUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.OrderCloseMapping
;
import
com.liquidnet.service.base.ResponseDto
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
import
org.springframework.util.LinkedMultiValueMap
;
import
org.springframework.util.MultiValueMap
;
import
java.util.HashMap
;
@Slf4j
public
abstract
class
AbstractOptOrderTransferOverTimeRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Value
(
"${liquidnet.service.kylin.url}"
)
private
String
kylinUrl
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
String
redisStreamKey
=
this
.
getRedisStreamKey
();
log
.
debug
(
"CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]"
,
redisStreamKey
,
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
().
get
(
"message"
));
log
.
info
(
"CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
redisStreamKey
,
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
}
private
boolean
consumerMessageHandler
(
String
msg
)
{
boolean
aBoolean
=
false
;
try
{
OrderCloseMapping
.
orderCloseMessage
mqMessage
=
JsonUtils
.
fromJson
(
msg
,
OrderCloseMapping
.
orderCloseMessage
.
class
);
if
(
mqMessage
==
null
)
{
}
else
{
for
(
int
x
=
0
;
x
<
mqMessage
.
getOrderTicketIds
().
size
();
x
++)
{
String
orderTicketId
=
mqMessage
.
getOrderTicketIds
().
get
(
x
);
MultiValueMap
<
String
,
String
>
params
=
new
LinkedMultiValueMap
();
params
.
add
(
"orderId"
,
orderTicketId
);
MultiValueMap
<
String
,
String
>
headers
=
CollectionUtil
.
linkedMultiValueMapStringString
();
headers
.
add
(
"Accept"
,
"application/json;charset=UTF-8"
);
String
returnData
=
HttpUtil
.
post
(
kylinUrl
+
"/inner/consumer/orderTransferOverTime"
,
params
,
headers
);
ResponseDto
<
Boolean
>
rsp
=
JsonUtils
.
fromJson
(
returnData
,
new
TypeReference
<
ResponseDto
<
Boolean
>>()
{});
}
}
aBoolean
=
true
;
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER MSG EX_HANDLE ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
msg
,
e
);
}
finally
{
if
(!
aBoolean
)
{
HashMap
<
String
,
String
>
map
=
CollectionUtil
.
mapStringString
();
map
.
put
(
"message"
,
msg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
this
.
getRedisStreamKey
()));
}
}
return
aBoolean
;
}
protected
abstract
String
getRedisStreamKey
();
protected
abstract
String
getRedisStreamGroup
();
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/AbstractXlsRedisReceiver.java
0 → 100644
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
;
import
com.alibaba.fastjson.JSON
;
import
com.fasterxml.jackson.core.type.TypeReference
;
import
com.liquidnet.common.cache.redis.util.RedisUtil
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.HttpUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.ResponseDto
;
import
com.liquidnet.service.consumer.base.service.IBaseDao
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
import
org.springframework.util.LinkedMultiValueMap
;
import
org.springframework.util.MultiValueMap
;
import
java.time.LocalDateTime
;
import
java.util.LinkedList
;
import
java.util.Map
;
@Slf4j
public
abstract
class
AbstractXlsRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
private
IBaseDao
baseDao
;
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Value
(
"${liquidnet.service.goblin.url}"
)
private
String
goblinUrl
;
private
static
final
String
SQL_INSERT_GOODS_BUY_ROSTER_LOG
=
"INSERT INTO goblin_goods_buy_roster_log (sku_id,buy_roster,buy_roster_type,parsing_result,created_at)VALUES(?,?,?,?,?)"
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
String
redisStreamKey
=
this
.
getRedisStreamKey
();
log
.
debug
(
"CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]"
,
redisStreamKey
,
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
());
log
.
info
(
"XLS MESSAGE CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
redisStreamKey
,
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
}
private
boolean
consumerMessageHandler
(
Map
<
String
,
String
>
message
)
{
LinkedList
<
Object
[]>
objs
=
CollectionUtil
.
linkedListObjectArr
();
String
xlsPath
=
null
,
skuId
=
null
;
String
oXlsPath
=
null
;
Integer
type
=
null
;
boolean
aBoolean
=
false
;
try
{
String
[]
path
=
message
.
get
(
"message"
).
split
(
","
);
if
(
path
.
length
==
0
)
{
xlsPath
=
""
;
}
else
{
xlsPath
=
path
[
0
];
}
if
(
path
.
length
>
1
)
{
oXlsPath
=
path
[
1
];
}
String
finalSkuId
=
(
skuId
=
message
.
get
(
"skuId"
));
String
listId
;
listId
=
message
.
getOrDefault
(
"listId"
,
""
);
Integer
finalType
=
(
type
=
Integer
.
parseInt
(
message
.
get
(
"type"
)));
if
(
finalType
.
equals
(
1
)
||
finalType
.
equals
(
2
))
{
MultiValueMap
<
String
,
String
>
params
=
new
LinkedMultiValueMap
();
params
.
add
(
"finalSkuId"
,
finalSkuId
);
params
.
add
(
"finalType"
,
finalType
.
toString
());
params
.
add
(
"xlsPath"
,
xlsPath
);
MultiValueMap
<
String
,
String
>
headers
=
CollectionUtil
.
linkedMultiValueMapStringString
();
headers
.
add
(
"Accept"
,
"application/json;charset=UTF-8"
);
String
returnData
=
HttpUtil
.
post
(
goblinUrl
+
"/goblin/inner/consumerType12"
,
params
,
headers
);
ResponseDto
<
Boolean
>
rsp
=
JsonUtils
.
fromJson
(
returnData
,
new
TypeReference
<
ResponseDto
<
Boolean
>>()
{});
objs
.
add
(
new
Object
[]{
skuId
,
xlsPath
,
type
,
1
,
LocalDateTime
.
now
()});
aBoolean
=
baseDao
.
batchSql
(
SQL_INSERT_GOODS_BUY_ROSTER_LOG
,
objs
);
}
else
if
(
finalType
.
equals
(
3
)
||
finalType
.
equals
(
4
))
{
MultiValueMap
<
String
,
String
>
params
=
new
LinkedMultiValueMap
();
params
.
add
(
"finalSkuId"
,
finalSkuId
);
params
.
add
(
"finalType"
,
finalType
.
toString
());
params
.
add
(
"xlsPath"
,
xlsPath
);
params
.
add
(
"oXlsPath"
,
oXlsPath
);
params
.
add
(
"listId"
,
listId
);
MultiValueMap
<
String
,
String
>
headers
=
CollectionUtil
.
linkedMultiValueMapStringString
();
headers
.
add
(
"Accept"
,
"application/json;charset=UTF-8"
);
String
returnData
=
HttpUtil
.
post
(
goblinUrl
+
"/goblin/inner/consumerType34"
,
params
,
headers
);
ResponseDto
<
Boolean
>
rsp
=
JsonUtils
.
fromJson
(
returnData
,
new
TypeReference
<
ResponseDto
<
Boolean
>>()
{});
aBoolean
=
true
;
}
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER MSG EX_HANDLE ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
message
,
e
);
try
{
if
(
type
.
equals
(
1
)
||
type
.
equals
(
2
))
{
objs
.
add
(
new
Object
[]{
skuId
,
xlsPath
,
type
,
2
,
LocalDateTime
.
now
()});
aBoolean
=
baseDao
.
batchSql
(
SQL_INSERT_GOODS_BUY_ROSTER_LOG
,
objs
);
}
}
catch
(
Exception
ignored
)
{
}
}
return
aBoolean
;
}
protected
abstract
String
getRedisStreamKey
();
protected
abstract
String
getRedisStreamGroup
();
}
\ No newline at end of file
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/ConsumerGoblinXlsRdsReceiver.java
0 → 100644
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerGoblinXlsRdsReceiver
extends
AbstractXlsRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/ConsumerKylinTransferOverTimeRdsReceiver.java
0 → 100644
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerKylinTransferOverTimeRdsReceiver
extends
AbstractOptOrderTransferOverTimeRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
KylinQueue
.
SQL_TRANSFER_OVERTIME
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
KylinQueue
.
SQL_TRANSFER_OVERTIME
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/AbstractOptOrderTransferOverTimeRedisReceiver.java
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.common.cache.redis.util.RedisUtil
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.OrderCloseMapping
;
import
com.liquidnet.service.consumer.kylin.utils.KylinUtils
;
import
com.liquidnet.service.kylin.constant.KylinRedisConst
;
import
com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
import
java.util.HashMap
;
@Slf4j
public
abstract
class
AbstractOptOrderTransferOverTimeRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Autowired
private
RedisUtil
redisUtil
;
@Autowired
private
KylinUtils
kylinUtils
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
String
redisStreamKey
=
this
.
getRedisStreamKey
();
log
.
debug
(
"CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]"
,
redisStreamKey
,
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
().
get
(
"message"
));
log
.
info
(
"CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
redisStreamKey
,
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
}
private
boolean
consumerMessageHandler
(
String
msg
)
{
boolean
aBoolean
=
false
;
try
{
OrderCloseMapping
.
orderCloseMessage
mqMessage
=
JsonUtils
.
fromJson
(
msg
,
OrderCloseMapping
.
orderCloseMessage
.
class
);
if
(
mqMessage
==
null
)
{
}
else
{
for
(
int
x
=
0
;
x
<
mqMessage
.
getOrderTicketIds
().
size
();
x
++)
{
String
orderTicketId
=
mqMessage
.
getOrderTicketIds
().
get
(
x
);
KylinOrderTicketVo
vo
=
kylinUtils
.
getOrderTicketVo
(
orderTicketId
);
String
uid
=
vo
.
getTransferUid
();
vo
.
setTransferUid
(
""
);
vo
.
setTransferStatus
(
0
);
redisUtil
.
del
(
KylinRedisConst
.
ORDER_TRANSFER
.
concat
(
uid
));
redisUtil
.
set
(
"kylin:order:id:"
+
orderTicketId
,
vo
);
kylinUtils
.
resetOrderListVo
(
vo
.
getUserId
(),
2
,
orderTicketId
,
vo
);
}
}
aBoolean
=
true
;
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER MSG EX_HANDLE ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
msg
,
e
);
}
finally
{
if
(!
aBoolean
)
{
HashMap
<
String
,
String
>
map
=
CollectionUtil
.
mapStringString
();
map
.
put
(
"message"
,
msg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
this
.
getRedisStreamKey
()));
}
}
return
aBoolean
;
}
protected
abstract
String
getRedisStreamKey
();
protected
abstract
String
getRedisStreamGroup
();
}
//
package com.liquidnet.service.consumer.kylin.receiver;
//
//
import com.liquidnet.common.cache.redis.util.RedisUtil;
//
import com.liquidnet.commons.lang.util.CollectionUtil;
//
import com.liquidnet.commons.lang.util.JsonUtils;
//
import com.liquidnet.service.base.OrderCloseMapping;
//
import com.liquidnet.service.consumer.kylin.utils.KylinUtils;
//
import com.liquidnet.service.kylin.constant.KylinRedisConst;
//
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo;
//
import lombok.extern.slf4j.Slf4j;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.StreamRecords;
//
import org.springframework.data.redis.core.StringRedisTemplate;
//
import org.springframework.data.redis.stream.StreamListener;
//
//
import java.util.HashMap;
//
//
@Slf4j
//
public abstract class AbstractOptOrderTransferOverTimeRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
//
@Autowired
//
StringRedisTemplate stringRedisTemplate;
//
@Autowired
//
private RedisUtil redisUtil;
//
@Autowired
//
private KylinUtils kylinUtils;
//
//
@Override
//
public void onMessage(MapRecord<String, String, String> message) {
//
String redisStreamKey = this.getRedisStreamKey();
//
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
//
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
//
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
//
//
try {
//
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
//
} catch (Exception e) {
//
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
//
}
//
try {
//
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
//
} catch (Exception e) {
//
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
//
}
//
}
//
//
private boolean consumerMessageHandler(String msg) {
//
boolean aBoolean = false;
//
try {
//
OrderCloseMapping.orderCloseMessage mqMessage = JsonUtils.fromJson(msg, OrderCloseMapping.orderCloseMessage.class);
//
if (mqMessage == null) {
//
//
} else {
//
for (int x = 0; x < mqMessage.getOrderTicketIds().size(); x++) {
//
String orderTicketId = mqMessage.getOrderTicketIds().get(x);
//
KylinOrderTicketVo vo = kylinUtils.getOrderTicketVo(orderTicketId);
//
String uid = vo.getTransferUid();
//
vo.setTransferUid("");
//
vo.setTransferStatus(0);
//
redisUtil.del(KylinRedisConst.ORDER_TRANSFER.concat(uid));
//
redisUtil.set("kylin:order:id:" + orderTicketId, vo);
//
kylinUtils.resetOrderListVo(vo.getUserId(), 2, orderTicketId, vo);
//
}
//
}
//
aBoolean = true;
//
} catch (Exception e) {
//
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
//
} finally {
//
if (!aBoolean) {
//
HashMap<String, String> map = CollectionUtil.mapStringString();
//
map.put("message", msg);
//
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
//
}
//
}
//
return aBoolean;
//
}
//
//
protected abstract String getRedisStreamKey();
//
//
protected abstract String getRedisStreamGroup();
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/AbstractXlsRedisReceiver.java
View file @
98c887bf
This diff is collapsed.
Click to expand it.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerGoblinXlsRdsReceiver.java
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerGoblinXlsRdsReceiver
extends
AbstractXlsRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
.
getGroup
();
}
}
//
package com.liquidnet.service.consumer.kylin.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import org.springframework.stereotype.Component;
//
//
@Component
//
public class ConsumerGoblinXlsRdsReceiver extends AbstractXlsRedisReceiver {
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getGroup();
//
}
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerKylinTransferOverTimeRdsReceiver.java
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerKylinTransferOverTimeRdsReceiver
extends
AbstractOptOrderTransferOverTimeRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
KylinQueue
.
SQL_TRANSFER_OVERTIME
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
KylinQueue
.
SQL_TRANSFER_OVERTIME
.
getGroup
();
}
}
//
package com.liquidnet.service.consumer.kylin.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import org.springframework.stereotype.Component;
//
//
@Component
//
public class ConsumerKylinTransferOverTimeRdsReceiver extends AbstractOptOrderTransferOverTimeRedisReceiver {
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.KylinQueue.SQL_TRANSFER_OVERTIME.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.KylinQueue.SQL_TRANSFER_OVERTIME.getGroup();
//
}
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-slowly/src/main/java/com/liquidnet/service/consumer/slowly/config/ConsumerGoblinXlsRedisStreamConfig.java
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
slowly
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.slowly.receiver.ConsumerGoblinXlsRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
@Configuration
public
class
ConsumerGoblinXlsRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerGoblinXlsRdsReceiver
consumerGoblinXlsRdsReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveGoblinXls
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
.
getGroup
(),
getConsumerName
(
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
.
name
()
+
t
)),
StreamOffset
.
create
(
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerGoblinXlsRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionGoblinXls0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveGoblinXls
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionGoblinXls1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveGoblinXls
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
//package com.liquidnet.service.consumer.slowly.config;
//
//import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
//import com.liquidnet.service.base.constant.MQConst;
//import com.liquidnet.service.consumer.slowly.receiver.ConsumerGoblinXlsRdsReceiver;
//import lombok.var;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.data.redis.connection.RedisConnectionFactory;
//import org.springframework.data.redis.connection.stream.Consumer;
//import org.springframework.data.redis.connection.stream.MapRecord;
//import org.springframework.data.redis.connection.stream.ReadOffset;
//import org.springframework.data.redis.connection.stream.StreamOffset;
//import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//import org.springframework.data.redis.stream.Subscription;
//
//@Configuration
//public class ConsumerGoblinXlsRedisStreamConfig extends RedisStreamConfig {
// @Autowired
// ConsumerGoblinXlsRdsReceiver consumerGoblinXlsRdsReceiver;
//
// /**
// * 缺票登记
// *
// * @param listenerContainer
// * @param t
// * @return
// */
// private Subscription receiveGoblinXls(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
// return listenerContainer.receiveAutoAck(Consumer.from(MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getGroup(), getConsumerName(MQConst.GoblinQueue.GOBLIN_XLS_OPERA.name() + t)),
// StreamOffset.create(MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getKey(), ReadOffset.lastConsumed()), consumerGoblinXlsRdsReceiver);
// }
//
// /* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
// /* -------------------------------------------------------- | 缺票登记 */
//
// @Bean
// public Subscription subscriptionGoblinXls
2
(RedisConnectionFactory factory) {
// public Subscription subscriptionGoblinXls
0
(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveGoblinXls(listenerContainer,
2
);
// var subscription = receiveGoblinXls(listenerContainer,
0
);
// listenerContainer.start();
// return subscription;
// }
/* -------------------------------------------------------- | */
}
//
// @Bean
// public Subscription subscriptionGoblinXls1(RedisConnectionFactory factory) {
// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
// var subscription = receiveGoblinXls(listenerContainer, 1);
// listenerContainer.start();
// return subscription;
// }
////
//// @Bean
//// public Subscription subscriptionGoblinXls2(RedisConnectionFactory factory) {
//// var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//// var subscription = receiveGoblinXls(listenerContainer, 2);
//// listenerContainer.start();
//// return subscription;
//// }
//
// /* -------------------------------------------------------- | */
//}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-slowly/src/main/java/com/liquidnet/service/consumer/slowly/config/ConsumerKylinOptTransferOverTimeRedisStreamConfig.java
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
slowly
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.consumer.slowly.receiver.ConsumerKylinTransferOverTimeRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
KylinQueue
.
SQL_TRANSFER_OVERTIME
;
@Configuration
public
class
ConsumerKylinOptTransferOverTimeRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerKylinTransferOverTimeRdsReceiver
consumerKylinTransferOverTimeRdsReceiver
;
/**
* 验票更新
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveTransferOverTimeResult
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
SQL_TRANSFER_OVERTIME
.
getGroup
(),
getConsumerName
(
SQL_TRANSFER_OVERTIME
.
name
()
+
t
)),
StreamOffset
.
create
(
SQL_TRANSFER_OVERTIME
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinTransferOverTimeRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 验票更新 */
@Bean
public
Subscription
subscriptionReceiveTransferOverTime0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveTransferOverTimeResult
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionReceiveTransferOverTime1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveTransferOverTimeResult
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionReceiveTransferOverTime2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveTransferOverTimeResult
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionReceiveTransferOverTime3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveTransferOverTimeResult
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionReceiveTransferOverTime4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveTransferOverTimeResult
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
//
package com.liquidnet.service.consumer.slowly.config;
//
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
//
import com.liquidnet.service.consumer.slowly.receiver.ConsumerKylinTransferOverTimeRdsReceiver;
//
import lombok.var;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.data.redis.connection.RedisConnectionFactory;
//
import org.springframework.data.redis.connection.stream.Consumer;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.ReadOffset;
//
import org.springframework.data.redis.connection.stream.StreamOffset;
//
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
import org.springframework.data.redis.stream.Subscription;
//
//
import static com.liquidnet.service.base.constant.MQConst.KylinQueue.SQL_TRANSFER_OVERTIME;
//
//
@Configuration
//
public class ConsumerKylinOptTransferOverTimeRedisStreamConfig extends RedisStreamConfig {
//
@Autowired
//
ConsumerKylinTransferOverTimeRdsReceiver consumerKylinTransferOverTimeRdsReceiver;
//
//
/**
//
* 验票更新
//
*
//
* @param listenerContainer
//
* @param t
//
* @return
//
*/
//
private Subscription receiveTransferOverTimeResult(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
//
return listenerContainer.receiveAutoAck(Consumer.from(SQL_TRANSFER_OVERTIME.getGroup(), getConsumerName(SQL_TRANSFER_OVERTIME.name() + t)),
//
StreamOffset.create(SQL_TRANSFER_OVERTIME.getKey(), ReadOffset.lastConsumed()), consumerKylinTransferOverTimeRdsReceiver);
//
}
//
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
//
/* -------------------------------------------------------- | 验票更新 */
//
//
@Bean
//
public Subscription subscriptionReceiveTransferOverTime0(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveTransferOverTimeResult(listenerContainer, 0);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionReceiveTransferOverTime1(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveTransferOverTimeResult(listenerContainer, 1);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionReceiveTransferOverTime2(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveTransferOverTimeResult(listenerContainer, 2);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionReceiveTransferOverTime3(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveTransferOverTimeResult(listenerContainer, 3);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionReceiveTransferOverTime4(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveTransferOverTimeResult(listenerContainer, 4);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
/* -------------------------------------------------------- | */
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-slowly/src/main/java/com/liquidnet/service/consumer/slowly/receiver/AbstractOptOrderTransferOverTimeRedisReceiver.java
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
slowly
.
receiver
;
import
com.liquidnet.common.cache.redis.util.RedisUtil
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.OrderCloseMapping
;
import
com.liquidnet.service.consumer.slowly.utils.KylinUtils
;
import
com.liquidnet.service.kylin.constant.KylinRedisConst
;
import
com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
import
java.util.HashMap
;
@Slf4j
public
abstract
class
AbstractOptOrderTransferOverTimeRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Autowired
private
RedisUtil
redisUtil
;
@Autowired
private
KylinUtils
kylinUtils
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
String
redisStreamKey
=
this
.
getRedisStreamKey
();
log
.
debug
(
"CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]"
,
redisStreamKey
,
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
().
get
(
"message"
));
log
.
info
(
"CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
redisStreamKey
,
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
}
private
boolean
consumerMessageHandler
(
String
msg
)
{
boolean
aBoolean
=
false
;
try
{
OrderCloseMapping
.
orderCloseMessage
mqMessage
=
JsonUtils
.
fromJson
(
msg
,
OrderCloseMapping
.
orderCloseMessage
.
class
);
if
(
mqMessage
==
null
)
{
}
else
{
for
(
int
x
=
0
;
x
<
mqMessage
.
getOrderTicketIds
().
size
();
x
++)
{
String
orderTicketId
=
mqMessage
.
getOrderTicketIds
().
get
(
x
);
KylinOrderTicketVo
vo
=
kylinUtils
.
getOrderTicketVo
(
orderTicketId
);
String
uid
=
vo
.
getTransferUid
();
vo
.
setTransferUid
(
""
);
vo
.
setTransferStatus
(
0
);
redisUtil
.
del
(
KylinRedisConst
.
ORDER_TRANSFER
.
concat
(
uid
));
redisUtil
.
set
(
"kylin:order:id:"
+
orderTicketId
,
vo
);
kylinUtils
.
resetOrderListVo
(
vo
.
getUserId
(),
2
,
orderTicketId
,
vo
);
}
}
aBoolean
=
true
;
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER MSG EX_HANDLE ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
msg
,
e
);
}
finally
{
if
(!
aBoolean
)
{
HashMap
<
String
,
String
>
map
=
CollectionUtil
.
mapStringString
();
map
.
put
(
"message"
,
msg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
this
.
getRedisStreamKey
()));
}
}
return
aBoolean
;
}
protected
abstract
String
getRedisStreamKey
();
protected
abstract
String
getRedisStreamGroup
();
}
//
package com.liquidnet.service.consumer.slowly.receiver;
//
//
import com.liquidnet.common.cache.redis.util.RedisUtil;
//
import com.liquidnet.commons.lang.util.CollectionUtil;
//
import com.liquidnet.commons.lang.util.JsonUtils;
//
import com.liquidnet.service.base.OrderCloseMapping;
//
import com.liquidnet.service.consumer.slowly.utils.KylinUtils;
//
import com.liquidnet.service.kylin.constant.KylinRedisConst;
//
import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo;
//
import lombok.extern.slf4j.Slf4j;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.StreamRecords;
//
import org.springframework.data.redis.core.StringRedisTemplate;
//
import org.springframework.data.redis.stream.StreamListener;
//
//
import java.util.HashMap;
//
//
@Slf4j
//
public abstract class AbstractOptOrderTransferOverTimeRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
//
@Autowired
//
StringRedisTemplate stringRedisTemplate;
//
@Autowired
//
private RedisUtil redisUtil;
//
@Autowired
//
private KylinUtils kylinUtils;
//
//
@Override
//
public void onMessage(MapRecord<String, String, String> message) {
//
String redisStreamKey = this.getRedisStreamKey();
//
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
//
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
//
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
//
//
try {
//
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
//
} catch (Exception e) {
//
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
//
}
//
try {
//
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
//
} catch (Exception e) {
//
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
//
}
//
}
//
//
private boolean consumerMessageHandler(String msg) {
//
boolean aBoolean = false;
//
try {
//
OrderCloseMapping.orderCloseMessage mqMessage = JsonUtils.fromJson(msg, OrderCloseMapping.orderCloseMessage.class);
//
if (mqMessage == null) {
//
//
} else {
//
for (int x = 0; x < mqMessage.getOrderTicketIds().size(); x++) {
//
String orderTicketId = mqMessage.getOrderTicketIds().get(x);
//
KylinOrderTicketVo vo = kylinUtils.getOrderTicketVo(orderTicketId);
//
String uid = vo.getTransferUid();
//
vo.setTransferUid("");
//
vo.setTransferStatus(0);
//
redisUtil.del(KylinRedisConst.ORDER_TRANSFER.concat(uid));
//
redisUtil.set("kylin:order:id:" + orderTicketId, vo);
//
kylinUtils.resetOrderListVo(vo.getUserId(), 2, orderTicketId, vo);
//
}
//
}
//
aBoolean = true;
//
} catch (Exception e) {
//
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
//
} finally {
//
if (!aBoolean) {
//
HashMap<String, String> map = CollectionUtil.mapStringString();
//
map.put("message", msg);
//
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
//
}
//
}
//
return aBoolean;
//
}
//
//
protected abstract String getRedisStreamKey();
//
//
protected abstract String getRedisStreamGroup();
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-slowly/src/main/java/com/liquidnet/service/consumer/slowly/receiver/AbstractXlsRedisReceiver.java
View file @
98c887bf
This diff is collapsed.
Click to expand it.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-slowly/src/main/java/com/liquidnet/service/consumer/slowly/receiver/ConsumerGoblinXlsRdsReceiver.java
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
slowly
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerGoblinXlsRdsReceiver
extends
AbstractXlsRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
GoblinQueue
.
GOBLIN_XLS_OPERA
.
getGroup
();
}
}
//
package com.liquidnet.service.consumer.slowly.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import org.springframework.stereotype.Component;
//
//
@Component
//
public class ConsumerGoblinXlsRdsReceiver extends AbstractXlsRedisReceiver {
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.GoblinQueue.GOBLIN_XLS_OPERA.getGroup();
//
}
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-slowly/src/main/java/com/liquidnet/service/consumer/slowly/receiver/ConsumerKylinTransferOverTimeRdsReceiver.java
View file @
98c887bf
package
com
.
liquidnet
.
service
.
consumer
.
slowly
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerKylinTransferOverTimeRdsReceiver
extends
AbstractOptOrderTransferOverTimeRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
KylinQueue
.
SQL_TRANSFER_OVERTIME
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
KylinQueue
.
SQL_TRANSFER_OVERTIME
.
getGroup
();
}
}
//
package com.liquidnet.service.consumer.slowly.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import org.springframework.stereotype.Component;
//
//
@Component
//
public class ConsumerKylinTransferOverTimeRdsReceiver extends AbstractOptOrderTransferOverTimeRedisReceiver {
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.KylinQueue.SQL_TRANSFER_OVERTIME.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.KylinQueue.SQL_TRANSFER_OVERTIME.getGroup();
//
}
//
}
liquidnet-bus-service/liquidnet-service-kylin/liquidnet-service-kylin-impl/src/main/java/com/liquidnet/service/kylin/controller/InnerController.java
View file @
98c887bf
...
...
@@ -14,10 +14,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.util.MultiValueMap
;
import
org.springframework.web.bind.annotation.GetMapping
;
import
org.springframework.web.bind.annotation.PathVariable
;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.RestController
;
import
org.springframework.web.bind.annotation.*
;
import
javax.validation.constraints.NotBlank
;
import
java.time.LocalDateTime
;
...
...
@@ -66,7 +63,7 @@ public class InnerController {
}
}
@
Ge
tMapping
(
"consumer/orderTransferOverTime"
)
@
Pos
tMapping
(
"consumer/orderTransferOverTime"
)
@ApiOperation
(
"转赠过期消费"
)
public
ResponseDto
<
Boolean
>
orderTransferOverTime
(
String
orderId
)
{
return
kylinConsmerService
.
orderTransferOverTime
(
orderId
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment