记得上下班打卡 | git大法好,push需谨慎

Commit a3e07ab9 authored by zhanggb's avatar zhanggb

~queue:consumer-base.+kylin.sms_notice;

parent 07d06dbe
...@@ -30,6 +30,11 @@ ...@@ -30,6 +30,11 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId> <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-sms</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- liquidnet-bus-api --> <!-- liquidnet-bus-api -->
<dependency> <dependency>
......
...@@ -29,6 +29,8 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -29,6 +29,8 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
@Autowired @Autowired
StringRedisTemplate stringRedisTemplate; StringRedisTemplate stringRedisTemplate;
@Autowired @Autowired
ConsumerKylinSmsNoticeReceiver consumerKylinSmsNoticeReceiver;
@Autowired
ConsumerGoblinBizArtworkUplReceiver consumerGoblinBizArtworkUplReceiver; ConsumerGoblinBizArtworkUplReceiver consumerGoblinBizArtworkUplReceiver;
@Autowired @Autowired
ConsumerGoblinBizArtworkClqReceiver consumerGoblinBizArtworkClqReceiver; ConsumerGoblinBizArtworkClqReceiver consumerGoblinBizArtworkClqReceiver;
...@@ -39,6 +41,22 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -39,6 +41,22 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
@Autowired @Autowired
ConsumerGoblinBizIntegralReceiver consumerGoblinBizIntegralReceiver; ConsumerGoblinBizIntegralReceiver consumerGoblinBizIntegralReceiver;
@Bean// 短信通知
public List<Subscription> subscriptionKylinSmsNotice(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.KylinQueue stream = MQConst.KylinQueue.SMS_NOTICE;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 2; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerKylinSmsNoticeReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
@Bean// 藏品上传声明 @Bean// 藏品上传声明
public List<Subscription> subscriptionGoblinBizArtworkUpl(RedisConnectionFactory factory) { public List<Subscription> subscriptionGoblinBizArtworkUpl(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
......
package com.liquidnet.service.consumer.base.receiver; package com.liquidnet.service.consumer.base.receiver;
import com.liquidnet.service.base.constant.MQConst; import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
...@@ -16,8 +15,6 @@ import org.springframework.data.redis.stream.StreamListener; ...@@ -16,8 +15,6 @@ import org.springframework.data.redis.stream.StreamListener;
*/ */
@Slf4j @Slf4j
public abstract class AbstractBizRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { public abstract class AbstractBizRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
public IBaseDao baseDao;
@Autowired @Autowired
public StringRedisTemplate stringRedisTemplate; public StringRedisTemplate stringRedisTemplate;
......
package com.liquidnet.service.consumer.base.receiver;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.liquidnet.common.sms.processor.SmsProcessor;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SmsMessage;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
@Slf4j
@Component
public class ConsumerKylinSmsNoticeReceiver extends AbstractBizRedisReceiver {
@Resource
private SmsProcessor smsProcessor;
@Override
protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
SmsMessage smsMessage = JsonUtils.fromJson(msg, SmsMessage.class);
// aBoolean = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), smsMessage.getTemplateParam().toString());
ObjectNode templateParam = smsMessage.getTemplateParam();
aBoolean = smsProcessor.send(smsMessage.getPhone(), smsMessage.getSignName(), smsMessage.getTemplateCode(), null == templateParam ? "" : templateParam.toString());
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
@Override
protected String getRedisStreamKey() {
return MQConst.KylinQueue.SMS_NOTICE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.KylinQueue.SMS_NOTICE.getGroup();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment