记得上下班打卡 | git大法好,push需谨慎
Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liquidnet-bus-v1
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
董敬伟
liquidnet-bus-v1
Commits
c074aba1
Commit
c074aba1
authored
Jun 15, 2022
by
胡佳晨
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
修改 candy消费
parent
eab556a3
Changes
20
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
936 additions
and
512 deletions
+936
-512
CandyConsumerServiceImpl.java
.../service/candy/service/impl/CandyConsumerServiceImpl.java
+11
-2
ConsumerCandyCouponBackRedisStreamConfig.java
...onfig/candy/ConsumerCandyCouponBackRedisStreamConfig.java
+65
-0
ConsumerCandyCouponOrderBackRedisStreamConfig.java
.../candy/ConsumerCandyCouponOrderBackRedisStreamConfig.java
+65
-0
ConsumerCandyCouponReceiveRedisStreamConfig.java
...ig/candy/ConsumerCandyCouponReceiveRedisStreamConfig.java
+65
-0
ConsumerCandyCouponUseRedisStreamConfig.java
...config/candy/ConsumerCandyCouponUseRedisStreamConfig.java
+66
-0
AbstractCouponOrderBackRedisReceiver.java
.../receiver/candy/AbstractCouponOrderBackRedisReceiver.java
+83
-0
ConsumerCandyCouponBackRdsReceiver.java
...se/receiver/candy/ConsumerCandyCouponBackRdsReceiver.java
+18
-0
ConsumerCandyCouponOrderBackRdsReceiver.java
...ceiver/candy/ConsumerCandyCouponOrderBackRdsReceiver.java
+17
-0
ConsumerCandyCouponReceiveRdsReceiver.java
...receiver/candy/ConsumerCandyCouponReceiveRdsReceiver.java
+18
-0
ConsumerCandyCouponUseRdsReceiver.java
...ase/receiver/candy/ConsumerCandyCouponUseRdsReceiver.java
+18
-0
ConsumerCandyCouponBackRedisStreamConfig.java
...andy/config/ConsumerCandyCouponBackRedisStreamConfig.java
+65
-65
ConsumerCandyCouponOrderBackRedisStreamConfig.java
...config/ConsumerCandyCouponOrderBackRedisStreamConfig.java
+65
-65
ConsumerCandyCouponReceiveRedisStreamConfig.java
...y/config/ConsumerCandyCouponReceiveRedisStreamConfig.java
+65
-65
ConsumerCandyCouponUseRedisStreamConfig.java
...candy/config/ConsumerCandyCouponUseRedisStreamConfig.java
+66
-66
AbstractCouponOrderBackRedisReceiver.java
.../candy/receiver/AbstractCouponOrderBackRedisReceiver.java
+119
-119
AbstractSqlRedisReceiver.java
...ice/consumer/candy/receiver/AbstractSqlRedisReceiver.java
+62
-62
ConsumerCandyCouponBackRdsReceiver.java
...er/candy/receiver/ConsumerCandyCouponBackRdsReceiver.java
+17
-17
ConsumerCandyCouponOrderBackRdsReceiver.java
...ndy/receiver/ConsumerCandyCouponOrderBackRdsReceiver.java
+17
-17
ConsumerCandyCouponReceiveRdsReceiver.java
...candy/receiver/ConsumerCandyCouponReceiveRdsReceiver.java
+17
-17
ConsumerCandyCouponUseRdsReceiver.java
...mer/candy/receiver/ConsumerCandyCouponUseRdsReceiver.java
+17
-17
No files found.
liquidnet-bus-service/liquidnet-service-candy/liquidnet-service-candy-impl/src/main/java/com/liquidnet/service/candy/service/impl/CandyConsumerServiceImpl.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
candy
.
service
.
impl
;
import
com.liquidnet.common.cache.redis.util.RedisUtil
;
import
com.liquidnet.service.base.OrderCloseMapping
;
import
com.liquidnet.service.base.ResponseDto
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.candy.constant.CandyRedisConst
;
import
com.liquidnet.service.candy.dto.CandyUserCouponBasicDto
;
import
com.liquidnet.service.candy.service.ICandyConsumerService
;
import
com.liquidnet.service.candy.util.QueueUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
java.util.ArrayList
;
import
java.util.LinkedList
;
import
java.util.List
;
/**
...
...
@@ -24,13 +28,18 @@ public class CandyConsumerServiceImpl implements ICandyConsumerService {
@Autowired
RedisUtil
redisUtil
;
@Autowired
QueueUtils
queueUtils
;
@Override
public
ResponseDto
<
Boolean
>
couponOrderBackRedis
(
String
uid
,
ArrayList
<
String
>
uCouponIdList
)
{
try
{
List
<
CandyUserCouponBasicDto
>
dtoList
=
backCoupon
(
getCouponByUid
(
uid
),
uCouponIdList
);
setCouponByUid
(
uid
,
dtoList
);
}
catch
(
Exception
e
){
}
catch
(
Exception
e
)
{
LinkedList
<
String
>
mqList
=
new
LinkedList
<>();
mqList
.
add
(
uCouponIdList
+
","
+
uid
);
queueUtils
.
sendMsgByRedis
(
MQConst
.
CandyQueue
.
COUPON_ORDER_BACK
.
getKey
(),
OrderCloseMapping
.
get
(
mqList
));
return
ResponseDto
.
failure
();
}
return
ResponseDto
.
success
();
...
...
@@ -49,7 +58,7 @@ public class CandyConsumerServiceImpl implements ICandyConsumerService {
}
// 覆盖 CandyUserCouponBasicDto数组 根据用户id
private
void
setCouponByUid
(
String
uid
,
List
<
CandyUserCouponBasicDto
>
dtoList
)
{
private
void
setCouponByUid
(
String
uid
,
List
<
CandyUserCouponBasicDto
>
dtoList
)
{
String
redisKey
=
CandyRedisConst
.
BASIC_USER_COUPON
.
concat
(
uid
);
redisUtil
.
set
(
redisKey
,
dtoList
);
}
...
...
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/candy/ConsumerCandyCouponBackRedisStreamConfig.java
0 → 100644
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
.
candy
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponBackRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
CandyQueue
.
COUPON_BACK
;
@Configuration
public
class
ConsumerCandyCouponBackRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerCandyCouponBackRdsReceiver
consumerCandyCouponBackRdsReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlCandyCouponBack
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
COUPON_BACK
.
getGroup
(),
getConsumerName
(
COUPON_BACK
.
name
()
+
t
)),
StreamOffset
.
create
(
COUPON_BACK
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerCandyCouponBackRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlCandyCouponBack0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponBack1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponBack2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/candy/ConsumerCandyCouponOrderBackRedisStreamConfig.java
0 → 100644
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
.
candy
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponOrderBackRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
CandyQueue
.
COUPON_ORDER_BACK
;
@Configuration
public
class
ConsumerCandyCouponOrderBackRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerCandyCouponOrderBackRdsReceiver
consumerCandyCouponOrderBackRdsReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlCandyCouponBack
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
COUPON_ORDER_BACK
.
getGroup
(),
getConsumerName
(
COUPON_ORDER_BACK
.
name
()
+
t
)),
StreamOffset
.
create
(
COUPON_ORDER_BACK
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerCandyCouponOrderBackRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlCandyCouponOrderBack0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponOrderBack1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponOrderBack2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/candy/ConsumerCandyCouponReceiveRedisStreamConfig.java
0 → 100644
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
.
candy
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponReceiveRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
CandyQueue
.
COUPON_RECEIVE
;
@Configuration
public
class
ConsumerCandyCouponReceiveRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerCandyCouponReceiveRdsReceiver
consumerCandyCouponReceiveRdsReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlCandyCouponReceive
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
COUPON_RECEIVE
.
getGroup
(),
getConsumerName
(
COUPON_RECEIVE
.
name
()
+
t
)),
StreamOffset
.
create
(
COUPON_RECEIVE
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerCandyCouponReceiveRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlCandyCouponReceive0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponReceive
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponReceive1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponReceive
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponReceive2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponReceive
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/candy/ConsumerCandyCouponUseRedisStreamConfig.java
0 → 100644
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
.
candy
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.consumer.base.receiver.candy.ConsumerCandyCouponUseRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
CandyQueue
.
COUPON_USE
;
@Configuration
public
class
ConsumerCandyCouponUseRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerCandyCouponUseRdsReceiver
consumerCandyCouponUseRdsReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlCandyCouponUse
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
COUPON_USE
.
getGroup
(),
getConsumerName
(
COUPON_USE
.
name
()
+
t
)),
StreamOffset
.
create
(
COUPON_USE
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerCandyCouponUseRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlCandyCouponUse0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponUse
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponUse1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponUse
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponUse2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponUse
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/candy/AbstractCouponOrderBackRedisReceiver.java
0 → 100644
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
.
candy
;
import
com.fasterxml.jackson.core.type.TypeReference
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.HttpUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.OrderCloseMapping
;
import
com.liquidnet.service.base.ResponseDto
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
import
org.springframework.util.LinkedMultiValueMap
;
import
org.springframework.util.MultiValueMap
;
import
java.util.HashMap
;
@Slf4j
public
abstract
class
AbstractCouponOrderBackRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Value
(
"${liquidnet.service.candy.url}"
)
private
String
candyUrl
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
String
redisStreamKey
=
this
.
getRedisStreamKey
();
log
.
debug
(
"CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]"
,
redisStreamKey
,
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
().
get
(
"message"
));
log
.
info
(
"CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
redisStreamKey
,
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
}
private
boolean
consumerMessageHandler
(
String
msg
)
{
boolean
aBoolean
=
false
;
try
{
OrderCloseMapping
.
orderCloseMessage
mqMessage
=
JsonUtils
.
fromJson
(
msg
,
OrderCloseMapping
.
orderCloseMessage
.
class
);
//这里是 uCouponId和uid 懒。所以没新写方法
for
(
int
x
=
0
;
x
<
mqMessage
.
getOrderTicketIds
().
size
();
x
++)
{
String
t
=
mqMessage
.
getOrderTicketIds
().
get
(
x
);
String
uCouponId
=
t
.
split
(
","
)[
0
];
String
uid
=
t
.
split
(
","
)[
1
];
MultiValueMap
<
String
,
String
>
params
=
new
LinkedMultiValueMap
();
params
.
add
(
"uid"
,
uid
);
params
.
add
(
"uCouponId"
,
uCouponId
);
MultiValueMap
<
String
,
String
>
headers
=
CollectionUtil
.
linkedMultiValueMapStringString
();
headers
.
add
(
"Accept"
,
"application/json;charset=UTF-8"
);
String
returnData
=
HttpUtil
.
post
(
candyUrl
+
"/goblin/inner/candy-consumer/couponOrderBackRedis"
,
params
,
headers
);
ResponseDto
<
Boolean
>
rsp
=
JsonUtils
.
fromJson
(
returnData
,
new
TypeReference
<
ResponseDto
<
Boolean
>>()
{});
}
aBoolean
=
true
;
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER MSG EX_HANDLE ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
msg
,
e
);
}
finally
{
if
(!
aBoolean
)
{
HashMap
<
String
,
String
>
map
=
CollectionUtil
.
mapStringString
();
map
.
put
(
"message"
,
msg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
this
.
getRedisStreamKey
()));
}
}
return
aBoolean
;
}
protected
abstract
String
getRedisStreamKey
();
protected
abstract
String
getRedisStreamGroup
();
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/candy/ConsumerCandyCouponBackRdsReceiver.java
0 → 100644
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
.
candy
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerCandyCouponBackRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
CandyQueue
.
COUPON_BACK
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
CandyQueue
.
COUPON_BACK
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/candy/ConsumerCandyCouponOrderBackRdsReceiver.java
0 → 100644
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
.
candy
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerCandyCouponOrderBackRdsReceiver
extends
AbstractCouponOrderBackRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
CandyQueue
.
COUPON_ORDER_BACK
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
CandyQueue
.
COUPON_ORDER_BACK
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/candy/ConsumerCandyCouponReceiveRdsReceiver.java
0 → 100644
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
.
candy
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerCandyCouponReceiveRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
CandyQueue
.
COUPON_RECEIVE
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
CandyQueue
.
COUPON_RECEIVE
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/candy/ConsumerCandyCouponUseRdsReceiver.java
0 → 100644
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
.
candy
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerCandyCouponUseRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
CandyQueue
.
COUPON_USE
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
CandyQueue
.
COUPON_USE
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/config/ConsumerCandyCouponBackRedisStreamConfig.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
candy
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponBackRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
CandyQueue
.
COUPON_BACK
;
@Configuration
public
class
ConsumerCandyCouponBackRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerCandyCouponBackRdsReceiver
consumerCandyCouponBackRdsReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlCandyCouponBack
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
COUPON_BACK
.
getGroup
(),
getConsumerName
(
COUPON_BACK
.
name
()
+
t
)),
StreamOffset
.
create
(
COUPON_BACK
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerCandyCouponBackRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlCandyCouponBack0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponBack1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponBack2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
//
package com.liquidnet.service.consumer.candy.config;
//
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
//
import com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponBackRdsReceiver;
//
import lombok.var;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.data.redis.connection.RedisConnectionFactory;
//
import org.springframework.data.redis.connection.stream.Consumer;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.ReadOffset;
//
import org.springframework.data.redis.connection.stream.StreamOffset;
//
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
import org.springframework.data.redis.stream.Subscription;
//
//
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_BACK;
//
//
@Configuration
//
public class ConsumerCandyCouponBackRedisStreamConfig extends RedisStreamConfig {
//
@Autowired
//
ConsumerCandyCouponBackRdsReceiver consumerCandyCouponBackRdsReceiver;
//
//
/**
//
* 缺票登记
//
*
//
* @param listenerContainer
//
* @param t
//
* @return
//
*/
//
private Subscription receiveSqlCandyCouponBack(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
//
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_BACK.getGroup(), getConsumerName(COUPON_BACK.name() + t)),
//
StreamOffset.create(COUPON_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponBackRdsReceiver);
//
}
//
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
//
/* -------------------------------------------------------- | 缺票登记 */
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponBack0(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponBack(listenerContainer, 0);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponBack1(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponBack2(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponBack(listenerContainer, 2);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
/* -------------------------------------------------------- | */
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/config/ConsumerCandyCouponOrderBackRedisStreamConfig.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
candy
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponOrderBackRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
CandyQueue
.
COUPON_ORDER_BACK
;
@Configuration
public
class
ConsumerCandyCouponOrderBackRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerCandyCouponOrderBackRdsReceiver
consumerCandyCouponOrderBackRdsReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlCandyCouponBack
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
COUPON_ORDER_BACK
.
getGroup
(),
getConsumerName
(
COUPON_ORDER_BACK
.
name
()
+
t
)),
StreamOffset
.
create
(
COUPON_ORDER_BACK
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerCandyCouponOrderBackRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlCandyCouponOrderBack0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponOrderBack1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponOrderBack2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponBack
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
//
package com.liquidnet.service.consumer.candy.config;
//
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
//
import com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponOrderBackRdsReceiver;
//
import lombok.var;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.data.redis.connection.RedisConnectionFactory;
//
import org.springframework.data.redis.connection.stream.Consumer;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.ReadOffset;
//
import org.springframework.data.redis.connection.stream.StreamOffset;
//
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
import org.springframework.data.redis.stream.Subscription;
//
//
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_ORDER_BACK;
//
//
@Configuration
//
public class ConsumerCandyCouponOrderBackRedisStreamConfig extends RedisStreamConfig {
//
@Autowired
//
ConsumerCandyCouponOrderBackRdsReceiver consumerCandyCouponOrderBackRdsReceiver;
//
//
/**
//
* 缺票登记
//
*
//
* @param listenerContainer
//
* @param t
//
* @return
//
*/
//
private Subscription receiveSqlCandyCouponBack(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
//
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_ORDER_BACK.getGroup(), getConsumerName(COUPON_ORDER_BACK.name() + t)),
//
StreamOffset.create(COUPON_ORDER_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponOrderBackRdsReceiver);
//
}
//
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
//
/* -------------------------------------------------------- | 缺票登记 */
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponOrderBack0(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponBack(listenerContainer, 0);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponOrderBack1(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponBack(listenerContainer, 1);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponOrderBack2(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponBack(listenerContainer, 2);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
/* -------------------------------------------------------- | */
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/config/ConsumerCandyCouponReceiveRedisStreamConfig.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
candy
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponReceiveRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
CandyQueue
.
COUPON_RECEIVE
;
@Configuration
public
class
ConsumerCandyCouponReceiveRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerCandyCouponReceiveRdsReceiver
consumerCandyCouponReceiveRdsReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlCandyCouponReceive
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
COUPON_RECEIVE
.
getGroup
(),
getConsumerName
(
COUPON_RECEIVE
.
name
()
+
t
)),
StreamOffset
.
create
(
COUPON_RECEIVE
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerCandyCouponReceiveRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlCandyCouponReceive0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponReceive
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponReceive1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponReceive
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponReceive2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponReceive
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
//
package com.liquidnet.service.consumer.candy.config;
//
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
//
import com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponReceiveRdsReceiver;
//
import lombok.var;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.data.redis.connection.RedisConnectionFactory;
//
import org.springframework.data.redis.connection.stream.Consumer;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.ReadOffset;
//
import org.springframework.data.redis.connection.stream.StreamOffset;
//
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
import org.springframework.data.redis.stream.Subscription;
//
//
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_RECEIVE;
//
//
@Configuration
//
public class ConsumerCandyCouponReceiveRedisStreamConfig extends RedisStreamConfig {
//
@Autowired
//
ConsumerCandyCouponReceiveRdsReceiver consumerCandyCouponReceiveRdsReceiver;
//
//
/**
//
* 缺票登记
//
*
//
* @param listenerContainer
//
* @param t
//
* @return
//
*/
//
private Subscription receiveSqlCandyCouponReceive(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
//
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_RECEIVE.getGroup(), getConsumerName(COUPON_RECEIVE.name() + t)),
//
StreamOffset.create(COUPON_RECEIVE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponReceiveRdsReceiver);
//
}
//
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
//
/* -------------------------------------------------------- | 缺票登记 */
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponReceive0(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 0);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponReceive1(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 1);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponReceive2(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponReceive(listenerContainer, 2);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
/* -------------------------------------------------------- | */
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/config/ConsumerCandyCouponUseRedisStreamConfig.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
candy
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponUseRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
static
com
.
liquidnet
.
service
.
base
.
constant
.
MQConst
.
CandyQueue
.
COUPON_USE
;
@Configuration
public
class
ConsumerCandyCouponUseRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
ConsumerCandyCouponUseRdsReceiver
consumerCandyCouponUseRdsReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlCandyCouponUse
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
COUPON_USE
.
getGroup
(),
getConsumerName
(
COUPON_USE
.
name
()
+
t
)),
StreamOffset
.
create
(
COUPON_USE
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerCandyCouponUseRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlCandyCouponUse0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponUse
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponUse1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponUse
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlCandyCouponUse2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlCandyCouponUse
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
//
package com.liquidnet.service.consumer.candy.config;
//
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
//
import com.liquidnet.service.consumer.candy.receiver.ConsumerCandyCouponUseRdsReceiver;
//
import lombok.var;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.data.redis.connection.RedisConnectionFactory;
//
import org.springframework.data.redis.connection.stream.Consumer;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.ReadOffset;
//
import org.springframework.data.redis.connection.stream.StreamOffset;
//
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
import org.springframework.data.redis.stream.Subscription;
//
//
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_USE;
//
//
@Configuration
//
public class ConsumerCandyCouponUseRedisStreamConfig extends RedisStreamConfig {
//
@Autowired
//
ConsumerCandyCouponUseRdsReceiver consumerCandyCouponUseRdsReceiver;
//
//
/**
//
* 缺票登记
//
*
//
* @param listenerContainer
//
* @param t
//
* @return
//
*/
//
private Subscription receiveSqlCandyCouponUse(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
//
return listenerContainer.receiveAutoAck(Consumer.from(COUPON_USE.getGroup(), getConsumerName(COUPON_USE.name() + t)),
//
StreamOffset.create(COUPON_USE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponUseRdsReceiver
//
);
//
}
//
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
//
/* -------------------------------------------------------- | 缺票登记 */
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponUse0(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponUse(listenerContainer, 0);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponUse1(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponUse(listenerContainer, 1);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlCandyCouponUse2(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlCandyCouponUse(listenerContainer, 2);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
/* -------------------------------------------------------- | */
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/receiver/AbstractCouponOrderBackRedisReceiver.java
View file @
c074aba1
This diff is collapsed.
Click to expand it.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/receiver/AbstractSqlRedisReceiver.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
candy
.
receiver
;
import
com.liquidnet.commons.lang.util.CollectionUtil
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.SqlMapping
;
import
com.liquidnet.service.consumer.candy.service.IBaseDao
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.StreamRecords
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamListener
;
import
java.util.HashMap
;
@Slf4j
public
abstract
class
AbstractSqlRedisReceiver
implements
StreamListener
<
String
,
MapRecord
<
String
,
String
,
String
>>
{
@Autowired
private
IBaseDao
baseDao
;
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Override
public
void
onMessage
(
MapRecord
<
String
,
String
,
String
>
message
)
{
String
redisStreamKey
=
this
.
getRedisStreamKey
();
log
.
debug
(
"CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]"
,
redisStreamKey
,
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
().
get
(
"message"
));
log
.
info
(
"CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
try
{
stringRedisTemplate
.
opsForStream
().
delete
(
redisStreamKey
,
message
.
getId
());
}
catch
(
Exception
e
)
{
log
.
error
(
"#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}"
,
redisStreamKey
,
result
,
message
.
getValue
(),
e
);
}
}
private
boolean
consumerMessageHandler
(
String
msg
)
{
boolean
aBoolean
=
false
;
try
{
SqlMapping
.
SqlMessage
sqlMessage
=
JsonUtils
.
fromJson
(
msg
,
SqlMapping
.
SqlMessage
.
class
);
aBoolean
=
null
==
sqlMessage
||
baseDao
.
batchSqls
(
sqlMessage
.
getSqls
(),
sqlMessage
.
getArgs
());
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER MSG EX_HANDLE ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
msg
,
e
);
}
finally
{
if
(!
aBoolean
)
{
HashMap
<
String
,
String
>
map
=
CollectionUtil
.
mapStringString
();
map
.
put
(
"message"
,
msg
);
stringRedisTemplate
.
opsForStream
().
add
(
StreamRecords
.
mapBacked
(
map
).
withStreamKey
(
this
.
getRedisStreamKey
()));
}
}
return
aBoolean
;
}
protected
abstract
String
getRedisStreamKey
();
protected
abstract
String
getRedisStreamGroup
();
}
\ No newline at end of file
//package com.liquidnet.service.consumer.candy.receiver;
//
//import com.liquidnet.commons.lang.util.CollectionUtil;
//import com.liquidnet.commons.lang.util.JsonUtils;
//import com.liquidnet.service.base.SqlMapping;
//import com.liquidnet.service.consumer.candy.service.IBaseDao;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.data.redis.connection.stream.MapRecord;
//import org.springframework.data.redis.connection.stream.StreamRecords;
//import org.springframework.data.redis.core.StringRedisTemplate;
//import org.springframework.data.redis.stream.StreamListener;
//
//import java.util.HashMap;
//
//@Slf4j
//public abstract class AbstractSqlRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
// @Autowired
// private IBaseDao baseDao;
// @Autowired
// StringRedisTemplate stringRedisTemplate;
//
// @Override
// public void onMessage(MapRecord<String, String, String> message) {
// String redisStreamKey = this.getRedisStreamKey();
// log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
// boolean result = this.consumerMessageHandler(message.getValue().get("message"));
// log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
//
// try {
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// } catch (Exception e) {
// log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
// }
// try {
// stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
// } catch (Exception e) {
// log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
// }
// }
//
// private boolean consumerMessageHandler(String msg) {
// boolean aBoolean = false;
// try {
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
// aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// } catch (Exception e) {
// log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
// } finally {
// if (!aBoolean) {
// HashMap<String, String> map = CollectionUtil.mapStringString();
// map.put("message", msg);
// stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
// }
// }
// return aBoolean;
// }
//
// protected abstract String getRedisStreamKey();
//
// protected abstract String getRedisStreamGroup();
//}
\ No newline at end of file
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/receiver/ConsumerCandyCouponBackRdsReceiver.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
candy
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerCandyCouponBackRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
CandyQueue
.
COUPON_BACK
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
CandyQueue
.
COUPON_BACK
.
getGroup
();
}
}
//
package com.liquidnet.service.consumer.candy.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import org.springframework.stereotype.Component;
//
//
@Component
//
public class ConsumerCandyCouponBackRdsReceiver extends AbstractSqlRedisReceiver {
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.CandyQueue.COUPON_BACK.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.CandyQueue.COUPON_BACK.getGroup();
//
}
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/receiver/ConsumerCandyCouponOrderBackRdsReceiver.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
candy
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerCandyCouponOrderBackRdsReceiver
extends
AbstractCouponOrderBackRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
CandyQueue
.
COUPON_ORDER_BACK
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
CandyQueue
.
COUPON_ORDER_BACK
.
getGroup
();
}
}
//
package com.liquidnet.service.consumer.candy.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import org.springframework.stereotype.Component;
//
//
@Component
//
public class ConsumerCandyCouponOrderBackRdsReceiver extends AbstractCouponOrderBackRedisReceiver {
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.CandyQueue.COUPON_ORDER_BACK.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.CandyQueue.COUPON_ORDER_BACK.getGroup();
//
}
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/receiver/ConsumerCandyCouponReceiveRdsReceiver.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
candy
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerCandyCouponReceiveRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
CandyQueue
.
COUPON_RECEIVE
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
CandyQueue
.
COUPON_RECEIVE
.
getGroup
();
}
}
//
package com.liquidnet.service.consumer.candy.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import org.springframework.stereotype.Component;
//
//
@Component
//
public class ConsumerCandyCouponReceiveRdsReceiver extends AbstractSqlRedisReceiver {
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.CandyQueue.COUPON_RECEIVE.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.CandyQueue.COUPON_RECEIVE.getGroup();
//
}
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-candy/src/main/java/com/liquidnet/service/consumer/candy/receiver/ConsumerCandyCouponUseRdsReceiver.java
View file @
c074aba1
package
com
.
liquidnet
.
service
.
consumer
.
candy
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ConsumerCandyCouponUseRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
CandyQueue
.
COUPON_USE
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
CandyQueue
.
COUPON_USE
.
getGroup
();
}
}
//
package com.liquidnet.service.consumer.candy.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import org.springframework.stereotype.Component;
//
//
@Component
//
public class ConsumerCandyCouponUseRdsReceiver extends AbstractSqlRedisReceiver {
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.CandyQueue.COUPON_USE.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.CandyQueue.COUPON_USE.getGroup();
//
}
//
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment