记得上下班打卡 | git大法好,push需谨慎
Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liquidnet-bus-v1
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
董敬伟
liquidnet-bus-v1
Commits
783757c3
Commit
783757c3
authored
Jul 02, 2021
by
张国柄
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
consumer log;
parent
6a4b2e78
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
51 additions
and
53 deletions
+51
-53
ConsumerAdamProcessor.java
...ice/consumer/service/processor/ConsumerAdamProcessor.java
+7
-17
ConsumerProcessor.java
...service/consumer/service/processor/ConsumerProcessor.java
+44
-36
No files found.
liquidnet-bus-service/liquidnet-service-consumer-adam/src/main/java/com/liquidnet/service/consumer/service/processor/ConsumerAdamProcessor.java
View file @
783757c3
package
com
.
liquidnet
.
service
.
consumer
.
service
.
processor
;
import
com.liquidnet.common.cache.redis.util.RedisUtil
;
import
com.liquidnet.common.mq.constant.MQConst
;
import
com.liquidnet.commons.lang.util.JsonUtils
;
import
com.liquidnet.service.base.SqlMapping
;
...
...
@@ -13,7 +12,6 @@ import org.springframework.amqp.rabbit.annotation.Exchange;
import
org.springframework.amqp.rabbit.annotation.Queue
;
import
org.springframework.amqp.rabbit.annotation.QueueBinding
;
import
org.springframework.amqp.rabbit.annotation.RabbitListener
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
...
...
@@ -30,8 +28,6 @@ import java.io.IOException;
public
class
ConsumerAdamProcessor
{
@Resource
IBaseDao
baseDao
;
@Autowired
RedisUtil
redisUtil
;
// @RabbitListener(bindings = @QueueBinding(
// exchange = @Exchange(MQConst.EXCHANGES_LIQUIDNET_SQL), key = MQConst.ROUTING_KEY_SQL,
...
...
@@ -63,9 +59,9 @@ public class ConsumerAdamProcessor {
private
void
consumerSqlDaoHandler
(
Message
msg
,
Channel
channel
)
{
MessageProperties
properties
=
msg
.
getMessageProperties
();
String
q
ueue
=
properties
.
getConsumerQueue
();
long
t
ag
=
properties
.
getDeliveryTag
();
log
.
info
(
"CONSUMER SQL ==> [consumerQueue:{},deliveryTag:{}]"
,
queue
,
t
ag
);
String
consumerQ
ueue
=
properties
.
getConsumerQueue
();
long
deliveryT
ag
=
properties
.
getDeliveryTag
();
log
.
info
(
"CONSUMER SQL ==> [consumerQueue:{},deliveryTag:{}]"
,
consumerQueue
,
deliveryT
ag
);
SqlMapping
.
SqlMessage
sqlMessage
=
JsonUtils
.
fromJson
(
new
String
(
msg
.
getBody
()),
SqlMapping
.
SqlMessage
.
class
);
log
.
debug
(
"CONSUMER SQL ==> Preparing:{}"
,
JsonUtils
.
toJson
(
sqlMessage
.
getSqls
()));
log
.
debug
(
"CONSUMER SQL ==> Parameters:{}"
,
JsonUtils
.
toJson
(
sqlMessage
.
getArgs
()));
...
...
@@ -73,19 +69,13 @@ public class ConsumerAdamProcessor {
Boolean
rstBatchSqls
=
baseDao
.
batchSqls
(
sqlMessage
.
getSqls
(),
sqlMessage
.
getArgs
());
log
.
debug
(
"CONSUMER SQL result of execution:{}"
,
rstBatchSqls
);
if
(
rstBatchSqls
)
{
channel
.
basicAck
(
t
ag
,
false
);
channel
.
basicAck
(
deliveryT
ag
,
false
);
}
else
{
String
rk
=
queue
+
":"
+
tag
;
if
(
redisUtil
.
incr
(
rk
,
1
)
>=
3
)
{
log
.
warn
(
"###CONSUMER SQL[consumerQueue:{},deliveryTag={},sqlMessage:{}]"
,
queue
,
tag
,
JsonUtils
.
toJson
(
sqlMessage
));
channel
.
basicAck
(
tag
,
false
);
redisUtil
.
expire
(
rk
,
600
);
}
else
{
channel
.
basicReject
(
tag
,
true
);
}
log
.
warn
(
"###CONSUMER SQL[consumerQueue:{},deliveryTag={},sqlMessage:{}]"
,
consumerQueue
,
deliveryTag
,
JsonUtils
.
toJson
(
sqlMessage
));
channel
.
basicAck
(
deliveryTag
,
false
);
}
}
catch
(
IOException
e
)
{
log
.
error
(
"CONSUMER SQL[consumerQueue:{},deliveryTag:{},sqlMessage:{}]"
,
queue
,
t
ag
,
JsonUtils
.
toJson
(
sqlMessage
),
e
);
log
.
error
(
"CONSUMER SQL[consumerQueue:{},deliveryTag:{},sqlMessage:{}]"
,
consumerQueue
,
deliveryT
ag
,
JsonUtils
.
toJson
(
sqlMessage
),
e
);
}
}
...
...
liquidnet-bus-service/liquidnet-service-consumer/src/main/java/com/liquidnet/service/consumer/service/processor/ConsumerProcessor.java
View file @
783757c3
...
...
@@ -12,6 +12,7 @@ import com.liquidnet.service.kylin.dto.vo.mongo.KylinOrderTicketVo;
import
com.rabbitmq.client.Channel
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang.StringEscapeUtils
;
import
org.springframework.amqp.core.MessageProperties
;
import
org.springframework.amqp.rabbit.annotation.Exchange
;
import
org.springframework.amqp.rabbit.annotation.Queue
;
import
org.springframework.amqp.rabbit.annotation.QueueBinding
;
...
...
@@ -68,27 +69,35 @@ public class ConsumerProcessor {
// }
private
void
consumerSqlDaoHandler
(
Message
msg
,
Channel
channel
)
{
MessageProperties
properties
=
msg
.
getMessageProperties
();
String
consumerQueue
=
properties
.
getConsumerQueue
();
long
deliveryTag
=
properties
.
getDeliveryTag
();
log
.
info
(
"CONSUMER SQL ==> [consumerQueue:{},deliveryTag:{}]"
,
consumerQueue
,
deliveryTag
);
SqlMapping
.
SqlMessage
sqlMessage
=
JsonUtils
.
fromJson
(
new
String
(
msg
.
getBody
()),
SqlMapping
.
SqlMessage
.
class
);
log
.
debug
(
"
consumer sql
==> Preparing:{}"
,
JsonUtils
.
toJson
(
sqlMessage
.
getSqls
()));
log
.
debug
(
"
consumer sql
==> Parameters:{}"
,
JsonUtils
.
toJson
(
sqlMessage
.
getArgs
()));
log
.
debug
(
"
CONSUMER SQL
==> Preparing:{}"
,
JsonUtils
.
toJson
(
sqlMessage
.
getSqls
()));
log
.
debug
(
"
CONSUMER SQL
==> Parameters:{}"
,
JsonUtils
.
toJson
(
sqlMessage
.
getArgs
()));
try
{
Boolean
rstBatchSqls
=
baseDao
.
batchSqls
(
sqlMessage
.
getSqls
(),
sqlMessage
.
getArgs
());
log
.
debug
(
"
consumer sql
result of execution:{}"
,
rstBatchSqls
);
log
.
debug
(
"
CONSUMER SQL
result of execution:{}"
,
rstBatchSqls
);
if
(
rstBatchSqls
)
{
channel
.
basicAck
(
msg
.
getMessageProperties
().
getDeliveryTag
()
,
false
);
channel
.
basicAck
(
deliveryTag
,
false
);
}
else
{
channel
.
basicReject
(
msg
.
getMessageProperties
().
getDeliveryTag
(),
true
);
log
.
warn
(
"###CONSUMER SQL[consumerQueue:{},deliveryTag={},sqlMessage:{}]"
,
consumerQueue
,
deliveryTag
,
JsonUtils
.
toJson
(
sqlMessage
));
channel
.
basicAck
(
deliveryTag
,
false
);
}
}
catch
(
IOException
e
)
{
log
.
error
(
"
error:consumer sql:Channel.msg.tag:{}"
,
msg
.
getMessageProperties
().
getDeliveryTag
(
),
e
);
log
.
error
(
"
CONSUMER SQL[consumerQueue:{},deliveryTag:{},sqlMessage:{}]"
,
consumerQueue
,
deliveryTag
,
JsonUtils
.
toJson
(
sqlMessage
),
e
);
}
}
// 处理长sql语句
private
void
consumerOperationOrderClose
(
Message
msg
,
Channel
channel
)
{
MessageProperties
properties
=
msg
.
getMessageProperties
();
String
consumerQueue
=
properties
.
getConsumerQueue
();
long
deliveryTag
=
properties
.
getDeliveryTag
();
String
jsonStr
=
StringEscapeUtils
.
unescapeJava
(
new
String
(
msg
.
getBody
()));
OrderCloseMapping
.
orderCloseMessage
mqMessage
=
JsonUtils
.
fromJson
(
jsonStr
.
substring
(
1
,
jsonStr
.
length
()
-
1
),
OrderCloseMapping
.
orderCloseMessage
.
class
);
log
.
debug
(
"
consumer ==> mqMessage
:{}"
,
mqMessage
.
getOrderTicketIds
());
log
.
debug
(
"
CONSUMER SQL ==> orderCloseMessage.orderTicketIds
:{}"
,
mqMessage
.
getOrderTicketIds
());
try
{
for
(
int
x
=
0
;
x
<
mqMessage
.
getOrderTicketIds
().
size
();
x
++)
{
String
t
=
mqMessage
.
getOrderTicketIds
().
get
(
x
);
...
...
@@ -105,39 +114,38 @@ public class ConsumerProcessor {
kylinUtils
.
changeBuyInfo
(
items
.
getUserId
(),
items
.
getEnterIdCode
(),
items
.
getPerformanceId
(),
items
.
getTicketId
(),
-
1
);
}
}
channel
.
basicAck
(
msg
.
getMessageProperties
().
getDeliveryTag
()
,
false
);
channel
.
basicAck
(
deliveryTag
,
false
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"error:consumer sql:Channel.msg.tag:{}"
,
msg
.
getMessageProperties
().
getDeliveryTag
(),
e
);
log
.
error
(
"CONSUMER SQL[consumerQueue:{},deliveryTag:{},sqlMessage:{}]"
,
consumerQueue
,
deliveryTag
,
JsonUtils
.
toJson
(
mqMessage
),
e
);
}
}
// 用户注册
@RabbitListener
(
bindings
=
@QueueBinding
(
exchange
=
@Exchange
(
MQConst
.
EX_LNS_SQL_UCENTER
),
key
=
MQConst
.
RK_SQL_UREGISTER
,
value
=
@Queue
(
MQConst
.
QUEUES_SQL_UREGISTER
)
))
public
void
consumerSqlForURegister
(
Message
msg
,
Channel
channel
)
{
this
.
consumerSqlDaoHandler
(
msg
,
channel
);
}
// 用户信息
@RabbitListener
(
bindings
=
@QueueBinding
(
exchange
=
@Exchange
(
MQConst
.
EX_LNS_SQL_UCENTER
),
key
=
MQConst
.
RK_SQL_UCENTER
,
value
=
@Queue
(
MQConst
.
QUEUES_SQL_UCENTER
)
))
public
void
consumerSqlForUCenter
(
Message
msg
,
Channel
channel
)
{
this
.
consumerSqlDaoHandler
(
msg
,
channel
);
}
// 会员购买
@RabbitListener
(
bindings
=
@QueueBinding
(
exchange
=
@Exchange
(
MQConst
.
EX_LNS_SQL_UCENTER
),
key
=
MQConst
.
RK_SQL_UMEMBER
,
value
=
@Queue
(
MQConst
.
QUEUES_SQL_UMEMBER
)
))
public
void
consumerSqlForUMember
(
Message
msg
,
Channel
channel
)
{
this
.
consumerSqlDaoHandler
(
msg
,
channel
);
}
//
// 用户注册
//
@RabbitListener(bindings = @QueueBinding(
//
exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER), key = MQConst.RK_SQL_UREGISTER,
//
value = @Queue(MQConst.QUEUES_SQL_UREGISTER)
//
))
//
public void consumerSqlForURegister(Message msg, Channel channel) {
//
this.consumerSqlDaoHandler(msg, channel);
//
}
//
//
// 用户信息
//
@RabbitListener(bindings = @QueueBinding(
//
exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER), key = MQConst.RK_SQL_UCENTER,
//
value = @Queue(MQConst.QUEUES_SQL_UCENTER)
//
))
//
public void consumerSqlForUCenter(Message msg, Channel channel) {
//
this.consumerSqlDaoHandler(msg, channel);
//
}
//
//
// 会员购买
//
@RabbitListener(bindings = @QueueBinding(
//
exchange = @Exchange(MQConst.EX_LNS_SQL_UCENTER), key = MQConst.RK_SQL_UMEMBER,
//
value = @Queue(MQConst.QUEUES_SQL_UMEMBER)
//
))
//
public void consumerSqlForUMember(Message msg, Channel channel) {
//
this.consumerSqlDaoHandler(msg, channel);
//
}
// 验票更新
@RabbitListener
(
bindings
=
@QueueBinding
(
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment