记得上下班打卡 | git大法好,push需谨慎

Commit 14a58dc2 authored by jiangxiulong's avatar jiangxiulong

consumer sweet add template message

parent e4522b19
...@@ -78,6 +78,7 @@ public class MQConst { ...@@ -78,6 +78,7 @@ public class MQConst {
SWEET_USER_INSERT_DRAW("sweet:stream:rk.sweetUserInsert", "group.sweetUserInsert", "关注服务号的用户信息"), SWEET_USER_INSERT_DRAW("sweet:stream:rk.sweetUserInsert", "group.sweetUserInsert", "关注服务号的用户信息"),
SWEET_REMIND_INSERT_DRAW("sweet:stream:rk.remindInsert", "group.remindInsert", "提醒记录"), SWEET_REMIND_INSERT_DRAW("sweet:stream:rk.remindInsert", "group.remindInsert", "提醒记录"),
SWEET_APPLET_USER_INSERT_DRAW("sweet:stream:rk.sweetAppletUserInsert", "group.sweetAppletUserInsert", "小程序登录记录用户解密后信息"), SWEET_APPLET_USER_INSERT_DRAW("sweet:stream:rk.sweetAppletUserInsert", "group.sweetAppletUserInsert", "小程序登录记录用户解密后信息"),
SWEET_TEMPLATE_MSG("sweet:stream:rk.sweetTemplateMsg", "group.sweetTemplateMsg", "发送模版消息"),
; ;
private final String key; private final String key;
......
package com.liquidnet.service.consumer.sweet.config;
import com.liquidnet.service.consumer.sweet.receiver.ConsumerSweetTemplateMsgReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.SweetQueue.SWEET_TEMPLATE_MSG;
@Configuration
public class ConsumerSweetTemplateMsgStreamConfig {
@Autowired
ConsumerSweetTemplateMsgReceiver consumerSweetTemplateMsgReceiver;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> buildStreamMessageListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(1))
.build();
return StreamMessageListenerContainer.create(factory, options);
}
/**
* 缺票登记
*
* @param listenerContainer
* @param t
* @return
*/
private Subscription receiveSqlTemplateMsg(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(SWEET_TEMPLATE_MSG.getGroup(), SWEET_TEMPLATE_MSG.name() + t),
StreamOffset.create(SWEET_TEMPLATE_MSG.getKey(), ReadOffset.lastConsumed()), consumerSweetTemplateMsgReceiver
);
}
/* —————————————————————————— | —————————————————————————— | —————————————————————————— */
/* -------------------------------------------------------- | 缺票登记 */
@Bean
public Subscription subscriptionSqlTemplateMsg(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlTemplateMsg(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlTemplateMsg2(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlTemplateMsg(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
@Bean
public Subscription subscriptionSqlTemplateMsg3(RedisConnectionFactory factory) {
var listenerContainer = this.buildStreamMessageListenerContainer(factory);
var subscription = receiveSqlTemplateMsg(listenerContainer, 1);
listenerContainer.start();
return subscription;
}
/* -------------------------------------------------------- | */
}
package com.liquidnet.service.consumer.sweet.receiver; package com.liquidnet.service.consumer.sweet.receiver;
import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping; import com.liquidnet.service.base.SqlMapping;
...@@ -20,8 +19,6 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String, ...@@ -20,8 +19,6 @@ public abstract class AbstractSqlRedisReceiver implements StreamListener<String,
public IBaseDao baseDao; public IBaseDao baseDao;
@Autowired @Autowired
StringRedisTemplate stringRedisTemplate; StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisUtil redisUtil;
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
......
package com.liquidnet.service.consumer.sweet.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.sweet.param.SweetOpenSendMsgParam;
import com.liquidnet.service.consumer.sweet.service.impl.SweetWechatTemplateServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@Slf4j
@Component
public class ConsumerSweetTemplateMsgReceiver extends AbstractSqlRedisReceiver {
@Autowired
SweetWechatTemplateServiceImpl sweetWechatTemplateService;
@Override
protected String getRedisStreamKey() {
return MQConst.SweetQueue.SWEET_TEMPLATE_MSG.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.SweetQueue.SWEET_TEMPLATE_MSG.getGroup();
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception ignored) {
}
}
}
private boolean consumerSqlDaoHandler(String msg) {
Boolean aBoolean = false;
try {
SweetOpenSendMsgParam sweetOpenSendMsgParam = JsonUtils.fromJson(msg, SweetOpenSendMsgParam.class);
if (sweetOpenSendMsgParam == null) {
aBoolean = true;
} else {
aBoolean = sweetWechatTemplateService.openSendMsg(sweetOpenSendMsgParam);
}
} catch (Exception e) {
log.error("CONSUMER SQL FAIL ==> {}", e.getMessage(), e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
}
...@@ -33,7 +33,7 @@ public class SweetWechatTemplateServiceImpl { ...@@ -33,7 +33,7 @@ public class SweetWechatTemplateServiceImpl {
@Autowired @Autowired
WechatMaConfigure wechatMaConfigure; WechatMaConfigure wechatMaConfigure;
public ResponseDto openSendMsg(SweetOpenSendMsgParam param) { public Boolean openSendMsg(SweetOpenSendMsgParam param) {
WxMpTemplateMessage templateMessage = null; WxMpTemplateMessage templateMessage = null;
if (param.getJumpType() == 5) { if (param.getJumpType() == 5) {
templateMessage = getTemplateMessage(param.getTemplateId(), param.getOpenId(), param.getJumpUrl()); templateMessage = getTemplateMessage(param.getTemplateId(), param.getOpenId(), param.getJumpUrl());
...@@ -48,9 +48,9 @@ public class SweetWechatTemplateServiceImpl { ...@@ -48,9 +48,9 @@ public class SweetWechatTemplateServiceImpl {
} }
String msgId = sendTmpMsg(templateMessage, param.getSendTargetType()); String msgId = sendTmpMsg(templateMessage, param.getSendTargetType());
if (null == msgId) { if (null == msgId) {
return ResponseDto.failure("模版消息发送失败"); return false;
} else { } else {
return ResponseDto.success(msgId); return true;
} }
} }
......
...@@ -23,4 +23,8 @@ XGROUP CREATE sweet:stream:rk.remindInsert group.remindInsert 0 ...@@ -23,4 +23,8 @@ XGROUP CREATE sweet:stream:rk.remindInsert group.remindInsert 0
-- 小程序登录记录用户解密后信息 -- -- 小程序登录记录用户解密后信息 --
XADD sweet:stream:rk.sweetAppletUserInsert * 0 0 XADD sweet:stream:rk.sweetAppletUserInsert * 0 0
XGROUP CREATE sweet:stream:rk.sweetAppletUserInsert group.sweetAppletUserInsert 0 XGROUP CREATE sweet:stream:rk.sweetAppletUserInsert group.sweetAppletUserInsert 0
\ No newline at end of file
-- 发送模版消息 --
XADD sweet:stream:rk.sweetTemplateMsg * 0 0
XGROUP CREATE sweet:stream:rk.sweetTemplateMsg group.sweetTemplateMsg 0
\ No newline at end of file
...@@ -32,6 +32,12 @@ public class SweetWechatTemplateController { ...@@ -32,6 +32,12 @@ public class SweetWechatTemplateController {
return sweetTemplateService.openSendMsg(param); return sweetTemplateService.openSendMsg(param);
} }
@PostMapping("testSend")
@ApiOperation("发送模版消息公共接口")
public ResponseDto testSend(@Valid @RequestBody SweetOpenSendMsgParam param) {
return sweetTemplateService.testSendMsg(param);
}
@PostMapping("remind") @PostMapping("remind")
@ApiOperation("提醒记录") @ApiOperation("提醒记录")
@ApiImplicitParams({ @ApiImplicitParams({
......
...@@ -5,7 +5,9 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; ...@@ -5,7 +5,9 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.liquidnet.common.cache.redis.util.RedisUtil; import com.liquidnet.common.cache.redis.util.RedisUtil;
import com.liquidnet.commons.lang.util.CollectionUtil; import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.DateUtil; import com.liquidnet.commons.lang.util.DateUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.ResponseDto; import com.liquidnet.service.base.ResponseDto;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.kylin.constant.KylinRedisConst; import com.liquidnet.service.kylin.constant.KylinRedisConst;
import com.liquidnet.service.kylin.dto.vo.middle.KylinTicketTimesVo; import com.liquidnet.service.kylin.dto.vo.middle.KylinTicketTimesVo;
import com.liquidnet.service.kylin.dto.vo.mongo.KylinPerformanceVo; import com.liquidnet.service.kylin.dto.vo.mongo.KylinPerformanceVo;
...@@ -15,6 +17,7 @@ import com.liquidnet.service.sweet.entity.SweetRemind; ...@@ -15,6 +17,7 @@ import com.liquidnet.service.sweet.entity.SweetRemind;
import com.liquidnet.service.sweet.entity.SweetWechatUser; import com.liquidnet.service.sweet.entity.SweetWechatUser;
import com.liquidnet.service.sweet.mapper.SweetRemindMapper; import com.liquidnet.service.sweet.mapper.SweetRemindMapper;
import com.liquidnet.service.sweet.param.SweetOpenSendMsgParam; import com.liquidnet.service.sweet.param.SweetOpenSendMsgParam;
import com.liquidnet.service.sweet.utils.QueueUtils;
import com.liquidnet.service.sweet.utils.RedisDataUtils; import com.liquidnet.service.sweet.utils.RedisDataUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.error.WxErrorException; import me.chanjar.weixin.common.error.WxErrorException;
...@@ -54,6 +57,8 @@ public class SweetWechatTemplateServiceImpl { ...@@ -54,6 +57,8 @@ public class SweetWechatTemplateServiceImpl {
private RedisDataUtils redisDataUtils; private RedisDataUtils redisDataUtils;
@Autowired @Autowired
private RedisUtil redisUtil; private RedisUtil redisUtil;
@Autowired
private QueueUtils queueUtils;
@Autowired @Autowired
private SweetRemindMapper sweetRemindMapper; private SweetRemindMapper sweetRemindMapper;
...@@ -302,4 +307,9 @@ public class SweetWechatTemplateServiceImpl { ...@@ -302,4 +307,9 @@ public class SweetWechatTemplateServiceImpl {
} }
} }
public ResponseDto testSendMsg(SweetOpenSendMsgParam param) {
queueUtils.sendMsgByRedis(MQConst.SweetQueue.SWEET_TEMPLATE_MSG.getKey(),
JsonUtils.toJson(param));
return ResponseDto.success();
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment