记得上下班打卡 | git大法好,push需谨慎

Commit 80ef69ef authored by anjiabin's avatar anjiabin

优化支付代码

parent 1aba782d
package com.liquidnet.service.consumer.dragon.service.receiver;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao;
import com.liquidnet.service.dragon.constant.DragonConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.util.HashMap;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: TODO
* @class: AbstractRedisReceiver
* @Package com.liquidnet.service.consumer.dragon.service.receiver
* @Copyright: LightNet @ Copyright (c) 2021
* @date 2021/7/22 20:28
*/
@Slf4j
public abstract class AbstractRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis PAY key:{} 的消息",getRedisStreamKey());
log.info("message id " + message.getId());
log.info("stream " + message.getStream());
log.info("body " + message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
// 消费成功确认,消息删除和消息确认是一个事务
if(result){
log.error("consumer success delete message messageId:{} ",message.getId());
try {
// stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// stringRedisTemplate.exec();
} catch (Exception e) {
e.printStackTrace();
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
}
}
}
private boolean consumerSqlDaoHandler(String msg) {
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
log.error("CONSUMER SQL Exception error:{}", e);
}
return false;
}
/**
* 给 REDIS 队列发送消息 数据库相关
*
* @param msg 接收到的内容
* @return
*/
private boolean sendMySqlRedis(String msg) {
try {
HashMap<String, String> map = new HashMap<>();
map.put("message", msg);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_KEY.getCode());
stringRedisTemplate.opsForStream().add(record);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
package com.liquidnet.service.consumer.dragon.service.receiver; package com.liquidnet.service.consumer.dragon.service.receiver;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.service.dragon.constant.DragonConstant;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/** /**
* 支付商户通知消费 * 支付商户通知消费
*/ */
@Slf4j @Slf4j
@Component @Component
public class RedisMchNotifyFailReceiver implements StreamListener<String, MapRecord<String, String, String>> { public class RedisMchNotifyFailReceiver extends AbstractRedisReceiver {
@Resource
IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { protected String getRedisStreamKey() {
log.info("接受到来自redis pay notify 的消息"); return DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_ERROR_KEY.getCode();
System.out.println("message id " + message.getId());
System.out.println("stream " + message.getStream());
System.out.println("body " + message.getValue());
consumerSqlDaoHandler(message.getValue().get("message"));
} }
private void consumerSqlDaoHandler(String msg) { @Override
try { protected String getRedisStreamGroup() {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); return DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_ERROR_GROUP.getCode();
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
//应答
}
} catch (Exception e) {
e.printStackTrace();
}
} }
} }
package com.liquidnet.service.consumer.dragon.service.receiver; package com.liquidnet.service.consumer.dragon.service.receiver;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.service.dragon.constant.DragonConstant;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/** /**
* 支付商户通知消费 * 支付商户通知消费
*/ */
@Slf4j @Slf4j
@Component @Component
public class RedisMchNotifyReceiver implements StreamListener<String, MapRecord<String, String, String>> { public class RedisMchNotifyReceiver extends AbstractRedisReceiver {
@Resource
IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { protected String getRedisStreamKey() {
log.info("接受到来自redis pay notify 的消息"); return DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_KEY.getCode();
System.out.println("message id " + message.getId());
System.out.println("stream " + message.getStream());
System.out.println("body " + message.getValue());
consumerSqlDaoHandler(message.getValue().get("message"));
} }
private void consumerSqlDaoHandler(String msg) { @Override
try { protected String getRedisStreamGroup() {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); return DragonConstant.MysqlRedisQueueEnum.PAY_MCH_NOTIFY_GROUP.getCode();
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
//应答
}
} catch (Exception e) {
e.printStackTrace();
}
} }
} }
package com.liquidnet.service.consumer.dragon.service.receiver; package com.liquidnet.service.consumer.dragon.service.receiver;
import com.liquidnet.commons.lang.util.JsonUtils; import com.liquidnet.service.dragon.constant.DragonConstant;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/** /**
* 支付商户通知消费 * 支付商户通知消费
*/ */
@Slf4j @Slf4j
@Component @Component
public class RedisPayNotifyReceiver implements StreamListener<String, MapRecord<String, String, String>> { public class RedisPayNotifyReceiver extends AbstractRedisReceiver {
@Resource
IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { protected String getRedisStreamKey() {
log.info("接受到来自redis pay notify 的消息"); return DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_NOTIFY_KEY.getCode();
System.out.println("message id " + message.getId());
System.out.println("stream " + message.getStream());
System.out.println("body " + message.getValue());
consumerSqlDaoHandler(message.getValue().get("message"));
} }
private void consumerSqlDaoHandler(String msg) { @Override
try { protected String getRedisStreamGroup() {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class); return DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_NOTIFY_GROUP.getCode();
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
//应答
}
} catch (Exception e) {
e.printStackTrace();
}
} }
} }
package com.liquidnet.service.consumer.dragon.service.receiver; package com.liquidnet.service.consumer.dragon.service.receiver;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.dragon.service.IBaseDao;
import com.liquidnet.service.dragon.constant.DragonConstant; import com.liquidnet.service.dragon.constant.DragonConstant;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
@Slf4j @Slf4j
@Component @Component
public class RedisPayReceiver implements StreamListener<String, MapRecord<String, String, String>> { public class RedisPayReceiver extends AbstractRedisReceiver {
@Resource
IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { protected String getRedisStreamKey() {
log.info("接受到来自redis PAY 的消息"); return DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_KEY.getCode();
System.out.println("message id " + message.getId());
System.out.println("stream " + message.getStream());
System.out.println("body " + message.getValue());
boolean result = consumerSqlDaoHandler(message.getValue().get("message"));
// 消费成功确认,消息删除和消息确认是一个事务
if(result){
stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_KEY.getCode(), message.getId());
// stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_GROUP.getCode(), message);
stringRedisTemplate.exec();
}
}
private boolean consumerSqlDaoHandler(String msg) {
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
if (rstBatchSqls) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
} }
/** @Override
* 给 REDIS 队列发送消息 数据库相关 protected String getRedisStreamGroup() {
* return DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_GROUP.getCode();
* @param msg 接收到的内容
* @return
*/
private boolean sendMySqlRedis(String msg) {
try {
HashMap<String, String> map = new HashMap<>();
map.put("message", msg);
MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_PAY_KEY.getCode());
stringRedisTemplate.opsForStream().add(record);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
} }
} }
...@@ -14,7 +14,6 @@ import org.springframework.stereotype.Component; ...@@ -14,7 +14,6 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
@Slf4j @Slf4j
@Component @Component
...@@ -32,6 +31,16 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str ...@@ -32,6 +31,16 @@ public class RedisRefundReceiver implements StreamListener<String, MapRecord<Str
System.out.println("stream " + message.getStream()); System.out.println("stream " + message.getStream());
System.out.println("body " + message.getValue()); System.out.println("body " + message.getValue());
consumerSqlDaoHandler(message.getValue().get("message")); consumerSqlDaoHandler(message.getValue().get("message"));
log.error("consumer success delete message messageId:{} ",message.getId());
try {
// stringRedisTemplate.multi();
stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
// stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
// stringRedisTemplate.exec();
} catch (Exception e) {
log.error("delete redis queue message Exception error: {} ",e);
log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
}
} }
private void consumerSqlDaoHandler(String msg) { private void consumerSqlDaoHandler(String msg) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment