记得上下班打卡 | git大法好,push需谨慎
Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
L
liquidnet-bus-v1
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
董敬伟
liquidnet-bus-v1
Commits
fc0abb39
Commit
fc0abb39
authored
May 18, 2022
by
胡佳晨
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
修改消费 提交到slowly
parent
52adc2eb
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
75 additions
and
23 deletions
+75
-23
AbstractXlsRedisReceiver.java
...ce/consumer/slowly/receiver/AbstractXlsRedisReceiver.java
+75
-23
No files found.
liquidnet-bus-service/liquidnet-service-consumer-all/liquidnet-service-consumer-slowly/src/main/java/com/liquidnet/service/consumer/slowly/receiver/AbstractXlsRedisReceiver.java
View file @
fc0abb39
...
...
@@ -16,6 +16,7 @@ import org.springframework.data.redis.stream.StreamListener;
import
java.net.URL
;
import
java.time.LocalDateTime
;
import
java.util.HashMap
;
import
java.util.LinkedList
;
import
java.util.Map
;
...
...
@@ -35,7 +36,7 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String,
String
redisStreamKey
=
this
.
getRedisStreamKey
();
log
.
debug
(
"CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]"
,
redisStreamKey
,
message
.
getId
(),
message
.
getStream
(),
message
.
getValue
());
boolean
result
=
this
.
consumerMessageHandler
(
message
.
getValue
());
log
.
info
(
"CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
log
.
info
(
"
XLS MESSAGE
CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}"
,
result
,
redisStreamKey
,
message
.
getId
());
try
{
stringRedisTemplate
.
opsForStream
().
acknowledge
(
getRedisStreamGroup
(),
message
);
...
...
@@ -52,40 +53,91 @@ public abstract class AbstractXlsRedisReceiver implements StreamListener<String,
private
boolean
consumerMessageHandler
(
Map
<
String
,
String
>
message
)
{
LinkedList
<
Object
[]>
objs
=
CollectionUtil
.
linkedListObjectArr
();
String
xlsPath
=
null
,
skuId
=
null
;
String
oXlsPath
=
null
;
Integer
type
=
null
;
boolean
aBoolean
=
false
;
try
{
xlsPath
=
message
.
get
(
"message"
);
String
[]
path
=
message
.
get
(
"message"
).
split
(
","
);
if
(
path
.
length
==
0
)
{
xlsPath
=
""
;
}
else
{
xlsPath
=
path
[
0
];
}
if
(
path
.
length
>
1
)
{
oXlsPath
=
path
[
1
];
}
String
finalSkuId
=
(
skuId
=
message
.
get
(
"skuId"
));
String
listId
;
// try {
listId
=
message
.
getOrDefault
(
"listId"
,
""
);
// }catch (Exception e){
// listId = "";
// }
Integer
finalType
=
(
type
=
Integer
.
parseInt
(
message
.
get
(
"type"
)));
EasyExcel
.
read
(
new
URL
(
xlsPath
).
openStream
(),
PhoneDto
.
class
,
new
PageReadListener
<
PhoneDto
>(
dataList
->
{
for
(
PhoneDto
data
:
dataList
)
{
if
(
data
.
getMobile
()
==
null
)
{
continue
;
}
String
redisKey
=
GoblinRedisConst
.
REDIS_CAN_BUY
.
concat
(
finalSkuId
+
":"
).
concat
(
data
.
getMobile
());
if
(
finalType
.
equals
(
1
))
{
//添加
if
(
log
.
isDebugEnabled
())
{
log
.
debug
(
"添加 读取到一条数据{}"
,
JSON
.
toJSONString
(
data
));
if
(
finalType
.
equals
(
1
)
||
finalType
.
equals
(
2
))
{
EasyExcel
.
read
(
new
URL
(
xlsPath
).
openStream
(),
PhoneDto
.
class
,
new
PageReadListener
<
PhoneDto
>(
dataList
->
{
for
(
PhoneDto
data
:
dataList
)
{
if
(
data
.
getMobile
()
==
null
)
{
continue
;
}
redisUtil
.
set
(
redisKey
,
0
);
}
else
{
if
(
log
.
isDebugEnabled
())
{
log
.
debug
(
"删除 读取到一条数据{}"
,
JSON
.
toJSONString
(
data
));
String
redisKey
=
GoblinRedisConst
.
REDIS_CAN_BUY
.
concat
(
finalSkuId
+
":"
).
concat
(
data
.
getMobile
());
if
(
finalType
.
equals
(
1
))
{
//添加
if
(
log
.
isDebugEnabled
())
{
log
.
debug
(
"添加 读取到一条数据{}"
,
JSON
.
toJSONString
(
data
));
}
redisUtil
.
set
(
redisKey
,
0
);
}
else
{
if
(
log
.
isDebugEnabled
())
{
log
.
debug
(
"删除 读取到一条数据{}"
,
JSON
.
toJSONString
(
data
));
}
redisUtil
.
del
(
redisKey
);
}
redisUtil
.
del
(
redisKey
);
}
})).
sheet
().
doRead
();
objs
.
add
(
new
Object
[]{
skuId
,
xlsPath
,
type
,
1
,
LocalDateTime
.
now
()});
aBoolean
=
baseDao
.
batchSql
(
SQL_INSERT_GOODS_BUY_ROSTER_LOG
,
objs
);
}
else
if
(
finalType
.
equals
(
3
)
||
finalType
.
equals
(
4
))
{
if
(
oXlsPath
!=
null
&&
!
oXlsPath
.
equals
(
""
))
{
//删除旧的黑白名单
EasyExcel
.
read
(
new
URL
(
oXlsPath
).
openStream
(),
new
PageReadListener
<
HashMap
<
String
,
String
>>(
dataList
->
{
for
(
HashMap
<
String
,
String
>
data
:
dataList
)
{
if
(
data
.
get
(
0
)
==
null
)
{
continue
;
}
if
(
finalType
.
equals
(
3
))
{
log
.
debug
(
"删除 白名单 读取到一条数据{}"
,
JSON
.
toJSONString
(
data
));
redisUtil
.
del
(
GoblinRedisConst
.
REDIS_WHITE
.
concat
(
listId
+
":"
).
concat
(
finalSkuId
+
":"
).
concat
(
data
.
get
(
0
)));
}
else
{
log
.
debug
(
"删除 黑名单 读取到一条数据{}"
,
JSON
.
toJSONString
(
data
));
redisUtil
.
del
(
GoblinRedisConst
.
REDIS_BLACK
.
concat
(
listId
+
":"
).
concat
(
finalSkuId
+
":"
).
concat
(
data
.
get
(
0
)));
}
}
})).
sheet
().
doRead
();
}
})).
sheet
().
doRead
();
objs
.
add
(
new
Object
[]{
skuId
,
xlsPath
,
type
,
1
,
LocalDateTime
.
now
()});
aBoolean
=
baseDao
.
batchSql
(
SQL_INSERT_GOODS_BUY_ROSTER_LOG
,
objs
);
if
(
xlsPath
!=
null
&&
!
xlsPath
.
equals
(
""
))
{
//添加
EasyExcel
.
read
(
new
URL
(
xlsPath
).
openStream
(),
new
PageReadListener
<
HashMap
<
String
,
String
>>(
dataList
->
{
for
(
HashMap
<
String
,
String
>
data
:
dataList
)
{
if
(
data
.
get
(
0
)
==
null
)
{
continue
;
}
if
(
finalType
.
equals
(
3
))
{
log
.
debug
(
"添加 白名单 读取到一条数据{}"
,
JSON
.
toJSONString
(
data
));
redisUtil
.
set
(
GoblinRedisConst
.
REDIS_WHITE
.
concat
(
listId
+
":"
).
concat
(
finalSkuId
+
":"
).
concat
(
data
.
get
(
0
)),
1
);
}
else
{
log
.
debug
(
"添加 黑名单 读取到一条数据{}"
,
JSON
.
toJSONString
(
data
));
redisUtil
.
set
(
GoblinRedisConst
.
REDIS_BLACK
.
concat
(
listId
+
":"
).
concat
(
finalSkuId
+
":"
).
concat
(
data
.
get
(
0
)),
1
);
}
}
})).
sheet
().
doRead
();
}
aBoolean
=
true
;
}
}
catch
(
Exception
e
)
{
log
.
error
(
"CONSUMER MSG EX_HANDLE ==> [{}]:{}"
,
this
.
getRedisStreamKey
(),
message
,
e
);
try
{
objs
.
add
(
new
Object
[]{
skuId
,
xlsPath
,
type
,
2
,
LocalDateTime
.
now
()});
aBoolean
=
baseDao
.
batchSql
(
SQL_INSERT_GOODS_BUY_ROSTER_LOG
,
objs
);
if
(
type
.
equals
(
1
)
||
type
.
equals
(
2
))
{
objs
.
add
(
new
Object
[]{
skuId
,
xlsPath
,
type
,
2
,
LocalDateTime
.
now
()});
aBoolean
=
baseDao
.
batchSql
(
SQL_INSERT_GOODS_BUY_ROSTER_LOG
,
objs
);
}
}
catch
(
Exception
ignored
)
{
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment