记得上下班打卡 | git大法好,push需谨慎
Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liquidnet-bus-v1
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
董敬伟
liquidnet-bus-v1
Commits
e0537c53
Commit
e0537c53
authored
Jul 26, 2021
by
张国柄
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Kylin:替换RABBIT为REDIS实现MQ;
parent
1eb2cf47
Changes
21
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
21 changed files
with
1571 additions
and
542 deletions
+1571
-542
ConsumerKylinSmsSenderRedisStreamConfig.java
...kylin/config/ConsumerKylinSmsSenderRedisStreamConfig.java
+132
-0
ConsumerKylinSqlOptOrderCloseRedisStreamConfig.java
...onfig/ConsumerKylinSqlOptOrderCloseRedisStreamConfig.java
+132
-0
ConsumerKylinSqlOrderAgainRedisStreamConfig.java
...n/config/ConsumerKylinSqlOrderAgainRedisStreamConfig.java
+61
-0
ConsumerKylinSqlOrderCreateRedisStreamConfig.java
.../config/ConsumerKylinSqlOrderCreateRedisStreamConfig.java
+133
-0
ConsumerKylinSqlOrderOvertimeRefundRedisStreamConfig.java
...ConsumerKylinSqlOrderOvertimeRefundRedisStreamConfig.java
+60
-0
ConsumerKylinSqlOrderPayRedisStreamConfig.java
...lin/config/ConsumerKylinSqlOrderPayRedisStreamConfig.java
+133
-0
ConsumerKylinSqlOrderRefundRedisStreamConfig.java
.../config/ConsumerKylinSqlOrderRefundRedisStreamConfig.java
+93
-0
ConsumerKylinSqlOrderWithdrawRedisStreamConfig.java
...onfig/ConsumerKylinSqlOrderWithdrawRedisStreamConfig.java
+93
-0
ConsumerKylinSqlPerformanceLackRedisStreamConfig.java
...fig/ConsumerKylinSqlPerformanceLackRedisStreamConfig.java
+60
-0
ConsumerKylinSqlStationRedisStreamConfig.java
...ylin/config/ConsumerKylinSqlStationRedisStreamConfig.java
+93
-0
ConsumerKylinSqlOrderAgainRdsReceiver.java
...kylin/receiver/ConsumerKylinSqlOrderAgainRdsReceiver.java
+2
-1
ConsumerKylinSqlOrderCreateRdsReceiver.java
...ylin/receiver/ConsumerKylinSqlOrderCreateRdsReceiver.java
+2
-1
ConsumerKylinSqlOrderOvertimeRefundRdsReceiver.java
...eiver/ConsumerKylinSqlOrderOvertimeRefundRdsReceiver.java
+2
-1
ConsumerKylinSqlOrderPayRdsReceiver.java
...r/kylin/receiver/ConsumerKylinSqlOrderPayRdsReceiver.java
+2
-1
ConsumerKylinSqlOrderRefundRdsReceiver.java
...ylin/receiver/ConsumerKylinSqlOrderRefundRdsReceiver.java
+2
-1
ConsumerKylinSqlOrderWithdrawRdsReceiver.java
...in/receiver/ConsumerKylinSqlOrderWithdrawRdsReceiver.java
+2
-1
ConsumerKylinSqlPerformanceLackRdsReceiver.java
.../receiver/ConsumerKylinSqlPerformanceLackRdsReceiver.java
+2
-1
ConsumerKylinSqlStationRdsReceiver.java
...er/kylin/receiver/ConsumerKylinSqlStationRdsReceiver.java
+2
-1
ConsumerKylinSmsProcessor.java
...er/kylin/service/processor/ConsumerKylinSmsProcessor.java
+77
-77
ConsumerProcessor.java
...e/consumer/kylin/service/processor/ConsumerProcessor.java
+457
-457
redis_queue_create.txt
.../liquidnet-service-kylin-impl/docu/redis_queue_create.txt
+31
-0
No files found.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSmsSenderRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSmsNoticeRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSmsSenderRedisStreamConfig
{
@Autowired
ConsumerKylinSmsNoticeRdsReceiver
consumerKylinSmsNoticeRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 短信通知
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlURegister
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SMS_NOTICE
.
getGroup
(),
MQConst
.
AdamQueue
.
SMS_NOTICE
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SMS_NOTICE
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSmsNoticeRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 短信通知 */
@Bean
public
Subscription
subscriptionSmsNotice1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice6
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
6
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice7
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
7
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice8
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
8
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice9
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
9
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSmsNotice10
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlURegister
(
listenerContainer
,
10
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSqlOptOrderCloseRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOptOrderCloseRedisReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSqlOptOrderCloseRedisStreamConfig
{
@Autowired
ConsumerKylinSqlOptOrderCloseRedisReceiver
consumerKylinSqlOptOrderCloseRedisReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 订单关闭
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlOptOrderClose
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getGroup
(),
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSqlOptOrderCloseRedisReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 订单关闭 */
@Bean
public
Subscription
subscriptionSqlOptOrderClose1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOptOrderClose2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOptOrderClose3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOptOrderClose4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOptOrderClose5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOptOrderClose6
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
6
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOptOrderClose7
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
7
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOptOrderClose8
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
8
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOptOrderClose9
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
9
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOptOrderClose10
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOptOrderClose
(
listenerContainer
,
10
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSqlOrderAgainRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderAgainRdsReceiver
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlPerformanceLackRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSqlOrderAgainRedisStreamConfig
{
@Autowired
ConsumerKylinSqlOrderAgainRdsReceiver
consumerKylinSqlOrderAgainRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 订单再次支付
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlOrderAgain
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getGroup
(),
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSqlOrderAgainRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 订单再次支付 */
@Bean
public
Subscription
subscriptionSqlOrderAgain1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderAgain
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSqlOrderCreateRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderAgainRdsReceiver
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderCreateRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSqlOrderCreateRedisStreamConfig
{
@Autowired
ConsumerKylinSqlOrderCreateRdsReceiver
consumerKylinSqlOrderCreateRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 创建订单
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlOrderCreate
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getGroup
(),
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSqlOrderCreateRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 创建订单 */
@Bean
public
Subscription
subscriptionSqlOrderCreate1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderCreate2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderCreate3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderCreate4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderCreate5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderCreate6
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
6
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderCreate7
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
7
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderCreate8
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
8
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderCreate9
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
9
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderCreate10
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderCreate
(
listenerContainer
,
10
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSqlOrderOvertimeRefundRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderOvertimeRefundRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSqlOrderOvertimeRefundRedisStreamConfig
{
@Autowired
ConsumerKylinSqlOrderOvertimeRefundRdsReceiver
consumerKylinSqlOrderOvertimeRefundRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 超时支付申请退款
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlOrderOvertimeRefund
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getGroup
(),
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSqlOrderOvertimeRefundRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 超时支付申请退款 */
@Bean
public
Subscription
subscriptionSqlOrderOvertimeRefund1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderOvertimeRefund
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSqlOrderPayRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderAgainRdsReceiver
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderPayRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSqlOrderPayRedisStreamConfig
{
@Autowired
ConsumerKylinSqlOrderPayRdsReceiver
consumerKylinSqlOrderPayRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 订单支付
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlOrderPay
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getGroup
(),
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSqlOrderPayRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 订单支付 */
@Bean
public
Subscription
subscriptionSqlOrderPay1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderPay2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderPay3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderPay4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderPay5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderPay6
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
6
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderPay7
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
7
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderPay8
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
8
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderPay9
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
9
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderPay10
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderPay
(
listenerContainer
,
10
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSqlOrderRefundRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderAgainRdsReceiver
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderRefundRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSqlOrderRefundRedisStreamConfig
{
@Autowired
ConsumerKylinSqlOrderRefundRdsReceiver
consumerKylinSqlOrderRefundRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 订单申请退款
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlOrderRefund
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getGroup
(),
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSqlOrderRefundRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 订单申请退款 */
@Bean
public
Subscription
subscriptionSqlOrderRefund1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderRefund
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderRefund2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderRefund
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderRefund3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderRefund
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderRefund4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderRefund
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderRefund5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderRefund
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSqlOrderWithdrawRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderAgainRdsReceiver
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderWithdrawRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSqlOrderWithdrawRedisStreamConfig
{
@Autowired
ConsumerKylinSqlOrderWithdrawRdsReceiver
consumerKylinSqlOrderWithdrawRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 订单申请撤回
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlOrderWithdraw
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getGroup
(),
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSqlOrderWithdrawRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 订单申请撤回 */
@Bean
public
Subscription
subscriptionSqlOrderWithdraw1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderWithdraw
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderWithdraw2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderWithdraw
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderWithdraw3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderWithdraw
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderWithdraw4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderWithdraw
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlOrderWithdraw5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlOrderWithdraw
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSqlPerformanceLackRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlPerformanceLackRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSqlPerformanceLackRedisStreamConfig
{
@Autowired
ConsumerKylinSqlPerformanceLackRdsReceiver
consumerKylinSqlPerformanceLackRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlPerformanceLack
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getGroup
(),
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSqlPerformanceLackRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public
Subscription
subscriptionSqlPerformanceLack1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlPerformanceLack
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/config/ConsumerKylinSqlStationRedisStreamConfig.java
0 → 100644
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
config
;
import
com.liquidnet.service.base.constant.MQConst
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlOrderAgainRdsReceiver
;
import
com.liquidnet.service.consumer.kylin.receiver.ConsumerKylinSqlStationRdsReceiver
;
import
lombok.var
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.connection.RedisConnectionFactory
;
import
org.springframework.data.redis.connection.stream.Consumer
;
import
org.springframework.data.redis.connection.stream.MapRecord
;
import
org.springframework.data.redis.connection.stream.ReadOffset
;
import
org.springframework.data.redis.connection.stream.StreamOffset
;
import
org.springframework.data.redis.stream.StreamMessageListenerContainer
;
import
org.springframework.data.redis.stream.Subscription
;
import
java.time.Duration
;
@Configuration
public
class
ConsumerKylinSqlStationRedisStreamConfig
{
@Autowired
ConsumerKylinSqlStationRdsReceiver
consumerKylinSqlStationRdsReceiver
;
private
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
buildStreamMessageListenerContainer
(
RedisConnectionFactory
factory
)
{
var
options
=
StreamMessageListenerContainer
.
StreamMessageListenerContainerOptions
.
builder
()
.
pollTimeout
(
Duration
.
ofMillis
(
1
))
.
build
();
return
StreamMessageListenerContainer
.
create
(
factory
,
options
);
}
/**
* 验票更新
*
* @param listenerContainer
* @param t
* @return
*/
private
Subscription
receiveSqlStation
(
StreamMessageListenerContainer
<
String
,
MapRecord
<
String
,
String
,
String
>>
listenerContainer
,
int
t
)
{
return
listenerContainer
.
receiveAutoAck
(
Consumer
.
from
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getGroup
(),
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
name
()
+
t
),
StreamOffset
.
create
(
MQConst
.
AdamQueue
.
SQL_UREGISTER
.
getKey
(),
ReadOffset
.
lastConsumed
()),
consumerKylinSqlStationRdsReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 验票更新 */
@Bean
public
Subscription
subscriptionSqlStation1
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStation
(
listenerContainer
,
1
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStation2
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStation
(
listenerContainer
,
2
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStation3
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStation
(
listenerContainer
,
3
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStation4
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStation
(
listenerContainer
,
4
);
listenerContainer
.
start
();
return
subscription
;
}
@Bean
public
Subscription
subscriptionSqlStation5
(
RedisConnectionFactory
factory
)
{
var
listenerContainer
=
this
.
buildStreamMessageListenerContainer
(
factory
);
var
subscription
=
receiveSqlStation
(
listenerContainer
,
5
);
listenerContainer
.
start
();
return
subscription
;
}
/* -------------------------------------------------------- | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerKylinSqlOrderAgainRdsReceiver.java
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
@Component
@Component
public
class
ConsumerKylinSqlOrderAgainRdsReceiver
extends
AbstractSqlRedisReceiver
{
public
class
ConsumerKylinSqlOrderAgainRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
@Override
protected
String
getRedisStreamKey
()
{
protected
String
getRedisStreamKey
()
{
return
null
;
return
MQConst
.
KylinQueue
.
SQL_ORDER_AGAIN
.
getKey
()
;
}
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerKylinSqlOrderCreateRdsReceiver.java
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
@Component
@Component
public
class
ConsumerKylinSqlOrderCreateRdsReceiver
extends
AbstractSqlRedisReceiver
{
public
class
ConsumerKylinSqlOrderCreateRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
@Override
protected
String
getRedisStreamKey
()
{
protected
String
getRedisStreamKey
()
{
return
null
;
return
MQConst
.
KylinQueue
.
SQL_ORDER_CREATE
.
getKey
()
;
}
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerKylinSqlOrderOvertimeRefundRdsReceiver.java
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
@Component
@Component
public
class
ConsumerKylinSqlOrderOvertimeRefundRdsReceiver
extends
AbstractSqlRedisReceiver
{
public
class
ConsumerKylinSqlOrderOvertimeRefundRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
@Override
protected
String
getRedisStreamKey
()
{
protected
String
getRedisStreamKey
()
{
return
null
;
return
MQConst
.
KylinQueue
.
SQL_ORDER_OVERTIME_REFUND
.
getKey
()
;
}
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerKylinSqlOrderPayRdsReceiver.java
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
@Component
@Component
public
class
ConsumerKylinSqlOrderPayRdsReceiver
extends
AbstractSqlRedisReceiver
{
public
class
ConsumerKylinSqlOrderPayRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
@Override
protected
String
getRedisStreamKey
()
{
protected
String
getRedisStreamKey
()
{
return
null
;
return
MQConst
.
KylinQueue
.
SQL_ORDER_PAY
.
getKey
()
;
}
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerKylinSqlOrderRefundRdsReceiver.java
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
@Component
@Component
public
class
ConsumerKylinSqlOrderRefundRdsReceiver
extends
AbstractSqlRedisReceiver
{
public
class
ConsumerKylinSqlOrderRefundRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
@Override
protected
String
getRedisStreamKey
()
{
protected
String
getRedisStreamKey
()
{
return
null
;
return
MQConst
.
KylinQueue
.
SQL_ORDER_REFUND
.
getKey
()
;
}
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerKylinSqlOrderWithdrawRdsReceiver.java
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
@Component
@Component
public
class
ConsumerKylinSqlOrderWithdrawRdsReceiver
extends
AbstractSqlRedisReceiver
{
public
class
ConsumerKylinSqlOrderWithdrawRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
@Override
protected
String
getRedisStreamKey
()
{
protected
String
getRedisStreamKey
()
{
return
null
;
return
MQConst
.
KylinQueue
.
SQL_ORDER_WITHDRAW
.
getKey
()
;
}
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerKylinSqlPerformanceLackRdsReceiver.java
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
@Component
@Component
public
class
ConsumerKylinSqlPerformanceLackRdsReceiver
extends
AbstractSqlRedisReceiver
{
public
class
ConsumerKylinSqlPerformanceLackRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
@Override
protected
String
getRedisStreamKey
()
{
protected
String
getRedisStreamKey
()
{
return
null
;
return
MQConst
.
KylinQueue
.
SQL_PERFORMANCE_LACK
.
getKey
()
;
}
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/receiver/ConsumerKylinSqlStationRdsReceiver.java
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
receiver
;
import
com.liquidnet.service.base.constant.MQConst
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
@Component
@Component
public
class
ConsumerKylinSqlStationRdsReceiver
extends
AbstractSqlRedisReceiver
{
public
class
ConsumerKylinSqlStationRdsReceiver
extends
AbstractSqlRedisReceiver
{
@Override
@Override
protected
String
getRedisStreamKey
()
{
protected
String
getRedisStreamKey
()
{
return
null
;
return
MQConst
.
KylinQueue
.
SQL_STATION
.
getKey
()
;
}
}
}
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/service/processor/ConsumerKylinSmsProcessor.java
View file @
e0537c53
package
com
.
liquidnet
.
service
.
consumer
.
kylin
.
service
.
processor
;
//package com.liquidnet.service.consumer.kylin.service.processor;
//
import
com.liquidnet.common.mq.constant.MQConst
;
//import com.liquidnet.common.mq.constant.MQConst;
import
com.liquidnet.common.sms.processor.SmsProcessor
;
//import com.liquidnet.common.sms.processor.SmsProcessor;
import
com.liquidnet.commons.lang.util.JsonUtils
;
//import com.liquidnet.commons.lang.util.JsonUtils;
import
com.liquidnet.service.base.SmsMessage
;
//import com.liquidnet.service.base.SmsMessage;
import
com.rabbitmq.client.Channel
;
//import com.rabbitmq.client.Channel;
import
lombok.extern.slf4j.Slf4j
;
//import lombok.extern.slf4j.Slf4j;
import
org.springframework.amqp.core.Message
;
//import org.springframework.amqp.core.Message;
import
org.springframework.amqp.core.MessageProperties
;
//import org.springframework.amqp.core.MessageProperties;
import
org.springframework.amqp.rabbit.annotation.Exchange
;
//import org.springframework.amqp.rabbit.annotation.Exchange;
import
org.springframework.amqp.rabbit.annotation.Queue
;
//import org.springframework.amqp.rabbit.annotation.Queue;
import
org.springframework.amqp.rabbit.annotation.QueueBinding
;
//import org.springframework.amqp.rabbit.annotation.QueueBinding;
import
org.springframework.amqp.rabbit.annotation.RabbitListener
;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
import
org.springframework.stereotype.Component
;
//import org.springframework.stereotype.Component;
//
import
javax.annotation.Resource
;
//import javax.annotation.Resource;
import
java.io.IOException
;
//import java.io.IOException;
//
/**
///**
* ConsumerAdamSmsProcessor.class
// * ConsumerAdamSmsProcessor.class
*
// *
* @author zhanggb
// * @author zhanggb
* Created by IntelliJ IDEA at 2021/7/13
// * Created by IntelliJ IDEA at 2021/7/13
*/
// */
@Slf4j
//@Slf4j
@Component
//@Component
public
class
ConsumerKylinSmsProcessor
{
//public class ConsumerKylinSmsProcessor {
@Resource
// @Resource
SmsProcessor
smsProcessor
;
// SmsProcessor smsProcessor;
//
private
void
consumerSmsSendHandler
(
Message
msg
,
Channel
channel
)
{
// private void consumerSmsSendHandler(Message msg, Channel channel) {
MessageProperties
properties
=
msg
.
getMessageProperties
();
// MessageProperties properties = msg.getMessageProperties();
String
consumerQueue
=
properties
.
getConsumerQueue
();
// String consumerQueue = properties.getConsumerQueue();
long
deliveryTag
=
properties
.
getDeliveryTag
();
// long deliveryTag = properties.getDeliveryTag();
log
.
info
(
"CONSUMER SMS ==> [consumerQueue:{},deliveryTag:{}]"
,
consumerQueue
,
deliveryTag
);
// log.info("CONSUMER SMS ==> [consumerQueue:{},deliveryTag:{}]", consumerQueue, deliveryTag);
String
msgBody
=
new
String
(
msg
.
getBody
());
// String msgBody = new String(msg.getBody());
log
.
debug
(
"CONSUMER SMS ==> Preparing:{}"
,
msgBody
);
// log.debug("CONSUMER SMS ==> Preparing:{}", msgBody);
try
{
// try {
SmsMessage
smsMessage
=
JsonUtils
.
fromJson
(
msgBody
,
SmsMessage
.
class
);
// SmsMessage smsMessage = JsonUtils.fromJson(msgBody, SmsMessage.class);
boolean
result
=
smsProcessor
.
send
(
smsMessage
.
getPhone
(),
smsMessage
.
getSignName
(),
smsMessage
.
getTemplateCode
(),
smsMessage
.
getTemplateParam
().
toString
());
// boolean result = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
log
.
debug
(
"CONSUMER SMS result of execution:{}"
,
result
);
// log.debug("CONSUMER SMS result of execution:{}", result);
if
(
result
)
{
// if (result) {
channel
.
basicAck
(
deliveryTag
,
false
);
// channel.basicAck(deliveryTag, false);
}
else
{
// } else {
log
.
warn
(
"###CONSUMER SMS[consumerQueue:{},deliveryTag={},sqlMessage:{}]"
,
consumerQueue
,
deliveryTag
,
msgBody
);
// log.warn("###CONSUMER SMS[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody);
channel
.
basicAck
(
deliveryTag
,
false
);
// channel.basicAck(deliveryTag, false);
}
// }
}
catch
(
IOException
e
)
{
// } catch (IOException e) {
log
.
error
(
"CONSUMER SMS[consumerQueue:{},deliveryTag:{},sqlMessage:{}]"
,
consumerQueue
,
deliveryTag
,
msgBody
,
e
);
// log.error("CONSUMER SMS[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, msgBody, e);
}
// }
}
// }
//
/* ================================================================== | 短信验证码 */
// /* ================================================================== | 短信验证码 */
//
//// @RabbitListener(
//// bindings = @QueueBinding(
//// exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
//// key = MQConst.RK_SMS_CODE,
//// value = @Queue(MQConst.QUEUES_SMS_CODE)
//// ),
//// concurrency = "25"
//// )
//// public void consumerSqlForSmsCode(Message msg, Channel channel) {
//// this.consumerSmsSendHandler(msg, channel);
//// }
//
// /* ================================================================== | 短信通知 */
//
// @RabbitListener(
// @RabbitListener(
// bindings = @QueueBinding(
// bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
// exchange = @Exchange(MQConst.EX_LNS_SMS_SENDER),
// key = MQConst.RK_SMS_
COD
E,
// key = MQConst.RK_SMS_
NOTIC
E,
// value = @Queue(MQConst.QUEUES_SMS_
COD
E)
// value = @Queue(MQConst.QUEUES_SMS_
NOTIC
E)
// ),
// ),
// concurrency = "
25
"
// concurrency = "
10
"
// )
// )
// public void consumerSqlForSms
Cod
e(Message msg, Channel channel) {
// public void consumerSqlForSms
Notic
e(Message msg, Channel channel) {
// this.consumerSmsSendHandler(msg, channel);
// this.consumerSmsSendHandler(msg, channel);
// }
// }
//
/* ================================================================== | 短信通知 */
//
// /* ================================================================== | */
@RabbitListener
(
//}
bindings
=
@QueueBinding
(
exchange
=
@Exchange
(
MQConst
.
EX_LNS_SMS_SENDER
),
key
=
MQConst
.
RK_SMS_NOTICE
,
value
=
@Queue
(
MQConst
.
QUEUES_SMS_NOTICE
)
),
concurrency
=
"10"
)
public
void
consumerSqlForSmsNotice
(
Message
msg
,
Channel
channel
)
{
this
.
consumerSmsSendHandler
(
msg
,
channel
);
}
/* ================================================================== | */
}
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-kylin/src/main/java/com/liquidnet/service/consumer/kylin/service/processor/ConsumerProcessor.java
View file @
e0537c53
This diff is collapsed.
Click to expand it.
liquidnet-bus-service/liquidnet-service-kylin/liquidnet-service-kylin-impl/docu/redis_queue_create.txt
0 → 100644
View file @
e0537c53
XADD kylin:stream:rk.performance.lack * 0 0
XGROUP CREATE kylin:stream:rk.performance.lack group.performance.lack 0
XADD kylin:stream:rk.order.create * 0 0
XGROUP CREATE kylin:stream:rk.order.create group.order.create 0
XADD kylin:stream:rk.order.again * 0 0
XGROUP CREATE kylin:stream:rk.order.again group.order.again 0
XADD kylin:stream:rk.order.close * 0 0
XGROUP CREATE kylin:stream:rk.order.close group.order.close 0
XADD kylin:stream:rk.order.pay * 0 0
XGROUP CREATE kylin:stream:rk.order.pay group.order.pay 0
XADD kylin:stream:rk.order.refund * 0 0
XGROUP CREATE kylin:stream:rk.order.refund group.order.refund 0
XADD kylin:stream:rk.order.withdraw * 0 0
XGROUP CREATE kylin:stream:rk.order.withdraw group.order.withdraw 0
XADD kylin:stream:rk.order.overtime.refund * 0 0
XGROUP CREATE kylin:stream:rk.order.overtime.refund group.order.overtime.refund 0
XADD kylin:stream:rk.station * 0 0
XGROUP CREATE kylin:stream:rk.station group.station 0
# ==================================================
# XGROUP DESTROY adam:stream:rk.sms.notice group.sms.sender 0
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment