记得上下班打卡 | git大法好,push需谨慎
Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liquidnet-bus-v1
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
董敬伟
liquidnet-bus-v1
Commits
35a8f441
Commit
35a8f441
authored
Jun 15, 2022
by
胡佳晨
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
stone 消费迁移
parent
37503dba
Changes
13
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
579 additions
and
378 deletions
+579
-378
ConsumerStoneLogsRedisStreamConfig.java
...base/config/stone/ConsumerStoneLogsRedisStreamConfig.java
+46
-0
ConsumerStoneOrderRedisStreamConfig.java
...ase/config/stone/ConsumerStoneOrderRedisStreamConfig.java
+47
-0
ConsumerStoneUserRedisStreamConfig.java
...base/config/stone/ConsumerStoneUserRedisStreamConfig.java
+45
-0
RedisInsertLogReceiver.java
.../consumer/base/receiver/stone/RedisInsertLogReceiver.java
+21
-0
RedisInsertOrderReceiver.java
...onsumer/base/receiver/stone/RedisInsertOrderReceiver.java
+21
-0
RedisInsertUserReceiver.java
...consumer/base/receiver/stone/RedisInsertUserReceiver.java
+21
-0
ConsumerStoneLogsRedisStreamConfig.java
...ne/service/config/ConsumerStoneLogsRedisStreamConfig.java
+64
-64
ConsumerStoneOrderRedisStreamConfig.java
...e/service/config/ConsumerStoneOrderRedisStreamConfig.java
+64
-64
ConsumerStoneUserRedisStreamConfig.java
...ne/service/config/ConsumerStoneUserRedisStreamConfig.java
+64
-64
AbstractRedisReceiver.java
...onsumer/stone/service/receiver/AbstractRedisReceiver.java
+126
-126
RedisInsertLogReceiver.java
...nsumer/stone/service/receiver/RedisInsertLogReceiver.java
+20
-20
RedisInsertOrderReceiver.java
...umer/stone/service/receiver/RedisInsertOrderReceiver.java
+20
-20
RedisInsertUserReceiver.java
...sumer/stone/service/receiver/RedisInsertUserReceiver.java
+20
-20
No files found.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/stone/ConsumerStoneLogsRedisStreamConfig.java
0 → 100644
View file @
35a8f441
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
.
stone
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.ConsumerCommonSQL0Receiver
;
import
com.liquidnet.service.consumer.base.receiver.stone.RedisInsertLogReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.util.ArrayList
;
import
java.util.List
;
@Configuration
public
class
ConsumerStoneLogsRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
RedisInsertLogReceiver
redisInsertLogReceiver
;
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Bean
public
List
<
Subscription
>
insertLogs
(
RedisConnectionFactory
factory
)
{
List
<
Subscription
>
subscriptionList
=
new
ArrayList
<>();
MQConst
.
StoneQueue
stream
=
MQConst
.
StoneQueue
.
STONE_INSERT_LOGS
;
this
.
initStream
(
stringRedisTemplate
,
stream
.
getKey
(),
stream
.
getGroup
());
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
subscriptionList
.
add
(
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
stream
.
getGroup
(),
getConsumerName
(
stream
.
name
()
+
i
)),
StreamOffset
.
create
(
stream
.
getKey
(),
ReadOffset
.
lastConsumed
()),
redisInsertLogReceiver
));
listenerContainer
.
start
();
}
return
subscriptionList
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/stone/ConsumerStoneOrderRedisStreamConfig.java
0 → 100644
View file @
35a8f441
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
.
stone
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.stone.RedisInsertOrderReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.util.ArrayList
;
import
java.util.List
;
@Configuration
public
class
ConsumerStoneOrderRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
RedisInsertOrderReceiver
redisInsertOrderReceiver
;
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Bean
public
List
<
Subscription
>
insertOrder
(
RedisConnectionFactory
factory
)
{
List
<
Subscription
>
subscriptionList
=
new
ArrayList
<>();
MQConst
.
StoneQueue
stream
=
MQConst
.
StoneQueue
.
STONE_ORDER_COUPON
;
this
.
initStream
(
stringRedisTemplate
,
stream
.
getKey
(),
stream
.
getGroup
());
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
subscriptionList
.
add
(
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
stream
.
getGroup
(),
getConsumerName
(
stream
.
name
()
+
i
)),
StreamOffset
.
create
(
stream
.
getKey
(),
ReadOffset
.
lastConsumed
()),
redisInsertOrderReceiver
));
listenerContainer
.
start
();
}
return
subscriptionList
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/config/stone/ConsumerStoneUserRedisStreamConfig.java
0 → 100644
View file @
35a8f441
package
com
.
liquidnet
.
service
.
consumer
.
base
.
config
.
stone
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.stone.RedisInsertUserReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.util.ArrayList
;
import
java.util.List
;
@Configuration
public
class
ConsumerStoneUserRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
RedisInsertUserReceiver
redisInsertUserReceiver
;
@Autowired
StringRedisTemplate
stringRedisTemplate
;
@Bean
public
List
<
Subscription
>
insertUser
(
RedisConnectionFactory
factory
)
{
List
<
Subscription
>
subscriptionList
=
new
ArrayList
<>();
MQConst
.
StoneQueue
stream
=
MQConst
.
StoneQueue
.
STONE_INSERT_USER
;
this
.
initStream
(
stringRedisTemplate
,
stream
.
getKey
(),
stream
.
getGroup
());
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
subscriptionList
.
add
(
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
stream
.
getGroup
(),
getConsumerName
(
stream
.
name
()
+
i
)),
StreamOffset
.
create
(
stream
.
getKey
(),
ReadOffset
.
lastConsumed
()),
redisInsertUserReceiver
));
listenerContainer
.
start
();
}
return
subscriptionList
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/stone/RedisInsertLogReceiver.java
0 → 100644
View file @
35a8f441
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
.
stone
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
RedisInsertLogReceiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
StoneQueue
.
STONE_INSERT_LOGS
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
StoneQueue
.
STONE_INSERT_LOGS
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/stone/RedisInsertOrderReceiver.java
0 → 100644
View file @
35a8f441
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
.
stone
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
RedisInsertOrderReceiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
StoneQueue
.
STONE_ORDER_COUPON
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
StoneQueue
.
STONE_ORDER_COUPON
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-base/src/main/java/com/liquidnet/service/consumer/base/receiver/stone/RedisInsertUserReceiver.java
0 → 100644
View file @
35a8f441
package
com
.
liquidnet
.
service
.
consumer
.
base
.
receiver
.
stone
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
RedisInsertUserReceiver
extends
AbstractSqlRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
StoneQueue
.
STONE_INSERT_USER
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
StoneQueue
.
STONE_INSERT_USER
.
getGroup
();
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-stone/src/main/java/com/liquidnet/servce/consumer/stone/service/config/ConsumerStoneLogsRedisStreamConfig.java
View file @
35a8f441
package
com
.
liquidnet
.
servce
.
consumer
.
stone
.
service
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.servce.consumer.stone.service.receiver.RedisInsertLogReceiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
@Configuration
public
class
ConsumerStoneLogsRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
RedisInsertLogReceiver
redisInsertLogReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlStoneInsertLog
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
StoneQueue
.
STONE_INSERT_LOGS
.
getGroup
(),
getConsumerName
(
MQConst
.
StoneQueue
.
STONE_INSERT_LOGS
.
name
()
+
t
)),
StreamOffset
.
create
(
MQConst
.
StoneQueue
.
STONE_INSERT_LOGS
.
getKey
(),
ReadOffset
.
lastConsumed
()),
redisInsertLogReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlStoneInsertLog0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStoneInsertLog
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStoneInsertLog1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStoneInsertLog
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStoneInsertLog2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStoneInsertLog
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
//
package com.liquidnet.servce.consumer.stone.service.config;
//
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
//
import com.liquidnet.servce.consumer.stone.service.receiver.RedisInsertLogReceiver;
//
import com.liquidnet.service.base.constant.MQConst;
//
import lombok.var;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.data.redis.connection.RedisConnectionFactory;
//
import org.springframework.data.redis.connection.stream.Consumer;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.ReadOffset;
//
import org.springframework.data.redis.connection.stream.StreamOffset;
//
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
import org.springframework.data.redis.stream.Subscription;
//
//
@Configuration
//
public class ConsumerStoneLogsRedisStreamConfig extends RedisStreamConfig {
//
@Autowired
//
RedisInsertLogReceiver redisInsertLogReceiver;
//
//
/**
//
* 缺票登记
//
*
//
* @param listenerContainer
//
* @param t
//
* @return
//
*/
//
private Subscription receiveSqlStoneInsertLog(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
//
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.StoneQueue.STONE_INSERT_LOGS.getGroup(), getConsumerName(MQConst.StoneQueue.STONE_INSERT_LOGS.name() + t)),
//
StreamOffset.create(MQConst.StoneQueue.STONE_INSERT_LOGS.getKey(), ReadOffset.lastConsumed()), redisInsertLogReceiver);
//
}
//
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
//
/* -------------------------------------------------------- | 缺票登记 */
//
//
@Bean
//
public Subscription subscriptionSqlStoneInsertLog0(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlStoneInsertLog(listenerContainer, 0);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlStoneInsertLog1(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlStoneInsertLog(listenerContainer, 1);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlStoneInsertLog2(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlStoneInsertLog(listenerContainer, 2);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
/* -------------------------------------------------------- | */
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-stone/src/main/java/com/liquidnet/servce/consumer/stone/service/config/ConsumerStoneOrderRedisStreamConfig.java
View file @
35a8f441
package
com
.
liquidnet
.
servce
.
consumer
.
stone
.
service
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.servce.consumer.stone.service.receiver.RedisInsertOrderReceiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
@Configuration
public
class
ConsumerStoneOrderRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
RedisInsertOrderReceiver
redisInsertOrderReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlStoneInsertOrder
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
StoneQueue
.
STONE_ORDER_COUPON
.
getGroup
(),
getConsumerName
(
MQConst
.
StoneQueue
.
STONE_ORDER_COUPON
.
name
()
+
t
)),
StreamOffset
.
create
(
MQConst
.
StoneQueue
.
STONE_ORDER_COUPON
.
getKey
(),
ReadOffset
.
lastConsumed
()),
redisInsertOrderReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlStoneInsertOrder0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStoneInsertOrder
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStoneInsertOrder1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStoneInsertOrder
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStoneInsertOrder2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStoneInsertOrder
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
//
package com.liquidnet.servce.consumer.stone.service.config;
//
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
//
import com.liquidnet.servce.consumer.stone.service.receiver.RedisInsertOrderReceiver;
//
import com.liquidnet.service.base.constant.MQConst;
//
import lombok.var;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.data.redis.connection.RedisConnectionFactory;
//
import org.springframework.data.redis.connection.stream.Consumer;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.ReadOffset;
//
import org.springframework.data.redis.connection.stream.StreamOffset;
//
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
import org.springframework.data.redis.stream.Subscription;
//
//
@Configuration
//
public class ConsumerStoneOrderRedisStreamConfig extends RedisStreamConfig {
//
@Autowired
//
RedisInsertOrderReceiver redisInsertOrderReceiver;
//
//
/**
//
* 缺票登记
//
*
//
* @param listenerContainer
//
* @param t
//
* @return
//
*/
//
private Subscription receiveSqlStoneInsertOrder(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
//
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.StoneQueue.STONE_ORDER_COUPON.getGroup(), getConsumerName(MQConst.StoneQueue.STONE_ORDER_COUPON.name() + t)),
//
StreamOffset.create(MQConst.StoneQueue.STONE_ORDER_COUPON.getKey(), ReadOffset.lastConsumed()), redisInsertOrderReceiver);
//
}
//
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
//
/* -------------------------------------------------------- | 缺票登记 */
//
//
@Bean
//
public Subscription subscriptionSqlStoneInsertOrder0(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlStoneInsertOrder(listenerContainer, 0);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlStoneInsertOrder1(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlStoneInsertOrder(listenerContainer, 1);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlStoneInsertOrder2(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlStoneInsertOrder(listenerContainer, 2);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
/* -------------------------------------------------------- | */
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-stone/src/main/java/com/liquidnet/servce/consumer/stone/service/config/ConsumerStoneUserRedisStreamConfig.java
View file @
35a8f441
package
com
.
liquidnet
.
servce
.
consumer
.
stone
.
service
.
config
;
import
com.liquidnet.common.cache.redis.config.RedisStreamConfig
;
import
com.liquidnet.servce.consumer.stone.service.receiver.RedisInsertUserReceiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
@Configuration
public
class
ConsumerStoneUserRedisStreamConfig
extends
RedisStreamConfig
{
@Autowired
RedisInsertUserReceiver
redisInsertUserReceiver
;
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlStoneInsertUser
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
StoneQueue
.
STONE_INSERT_USER
.
getGroup
(),
getConsumerName
(
MQConst
.
StoneQueue
.
STONE_INSERT_USER
.
name
()
+
t
)),
StreamOffset
.
create
(
MQConst
.
StoneQueue
.
STONE_INSERT_USER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
redisInsertUserReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlStoneInsertUser0
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStoneInsertUser
(
listenerContainer
,
0
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStoneInsertUser1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStoneInsertUser
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStoneInsertUser2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStoneInsertUser
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
//
package com.liquidnet.servce.consumer.stone.service.config;
//
//
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
//
import com.liquidnet.servce.consumer.stone.service.receiver.RedisInsertUserReceiver;
//
import com.liquidnet.service.base.constant.MQConst;
//
import lombok.var;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.context.annotation.Bean;
//
import org.springframework.context.annotation.Configuration;
//
import org.springframework.data.redis.connection.RedisConnectionFactory;
//
import org.springframework.data.redis.connection.stream.Consumer;
//
import org.springframework.data.redis.connection.stream.MapRecord;
//
import org.springframework.data.redis.connection.stream.ReadOffset;
//
import org.springframework.data.redis.connection.stream.StreamOffset;
//
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
import org.springframework.data.redis.stream.Subscription;
//
//
@Configuration
//
public class ConsumerStoneUserRedisStreamConfig extends RedisStreamConfig {
//
@Autowired
//
RedisInsertUserReceiver redisInsertUserReceiver;
//
//
/**
//
* 缺票登记
//
*
//
* @param listenerContainer
//
* @param t
//
* @return
//
*/
//
private Subscription receiveSqlStoneInsertUser(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
//
return listenerContainer.receiveAutoAck(Consumer.from(MQConst.StoneQueue.STONE_INSERT_USER.getGroup(), getConsumerName(MQConst.StoneQueue.STONE_INSERT_USER.name() + t)),
//
StreamOffset.create(MQConst.StoneQueue.STONE_INSERT_USER.getKey(), ReadOffset.lastConsumed()), redisInsertUserReceiver);
//
}
//
//
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
//
//
/* -------------------------------------------------------- | 缺票登记 */
//
//
@Bean
//
public Subscription subscriptionSqlStoneInsertUser0(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlStoneInsertUser(listenerContainer, 0);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlStoneInsertUser1(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlStoneInsertUser(listenerContainer, 1);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
@Bean
//
public Subscription subscriptionSqlStoneInsertUser2(RedisConnectionFactory factory) {
//
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
//
var subscription = receiveSqlStoneInsertUser(listenerContainer, 2);
//
listenerContainer.start();
//
return subscription;
//
}
//
//
/* -------------------------------------------------------- | */
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-stone/src/main/java/com/liquidnet/servce/consumer/stone/service/receiver/AbstractRedisReceiver.java
View file @
35a8f441
This diff is collapsed.
Click to expand it.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-stone/src/main/java/com/liquidnet/servce/consumer/stone/service/receiver/RedisInsertLogReceiver.java
View file @
35a8f441
package
com
.
liquidnet
.
servce
.
consumer
.
stone
.
service
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
RedisInsertLogReceiver
extends
AbstractRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
StoneQueue
.
STONE_INSERT_LOGS
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
StoneQueue
.
STONE_INSERT_LOGS
.
getGroup
();
}
}
//
package com.liquidnet.servce.consumer.stone.service.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import lombok.extern.slf4j.Slf4j;
//
import org.springframework.stereotype.Component;
//
//
@Slf4j
//
@Component
//
public class RedisInsertLogReceiver extends AbstractRedisReceiver {
//
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.StoneQueue.STONE_INSERT_LOGS.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.StoneQueue.STONE_INSERT_LOGS.getGroup();
//
}
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-stone/src/main/java/com/liquidnet/servce/consumer/stone/service/receiver/RedisInsertOrderReceiver.java
View file @
35a8f441
package
com
.
liquidnet
.
servce
.
consumer
.
stone
.
service
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
RedisInsertOrderReceiver
extends
AbstractRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
StoneQueue
.
STONE_ORDER_COUPON
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
StoneQueue
.
STONE_ORDER_COUPON
.
getGroup
();
}
}
//
package com.liquidnet.servce.consumer.stone.service.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import lombok.extern.slf4j.Slf4j;
//
import org.springframework.stereotype.Component;
//
//
@Slf4j
//
@Component
//
public class RedisInsertOrderReceiver extends AbstractRedisReceiver {
//
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.StoneQueue.STONE_ORDER_COUPON.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.StoneQueue.STONE_ORDER_COUPON.getGroup();
//
}
//
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-stone/src/main/java/com/liquidnet/servce/consumer/stone/service/receiver/RedisInsertUserReceiver.java
View file @
35a8f441
package
com
.
liquidnet
.
servce
.
consumer
.
stone
.
service
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
@Slf4j
@Component
public
class
RedisInsertUserReceiver
extends
AbstractRedisReceiver
{
@Override
protected
String
getRedisStreamKey
()
{
return
MQConst
.
StoneQueue
.
STONE_INSERT_USER
.
getKey
();
}
@Override
protected
String
getRedisStreamGroup
()
{
return
MQConst
.
StoneQueue
.
STONE_INSERT_USER
.
getGroup
();
}
}
//
package com.liquidnet.servce.consumer.stone.service.receiver;
//
//
import com.liquidnet.service.base.constant.MQConst;
//
import lombok.extern.slf4j.Slf4j;
//
import org.springframework.stereotype.Component;
//
//
@Slf4j
//
@Component
//
public class RedisInsertUserReceiver extends AbstractRedisReceiver {
//
//
@Override
//
protected String getRedisStreamKey() {
//
return MQConst.StoneQueue.STONE_INSERT_USER.getKey();
//
}
//
//
@Override
//
protected String getRedisStreamGroup() {
//
return MQConst.StoneQueue.STONE_INSERT_USER.getGroup();
//
}
//
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment