记得上下班打卡 | git大法好,push需谨慎

Commit aa53db51 authored by anjiabin's avatar anjiabin

dragon-redis队列迁移

parent 2ec0899b
...@@ -353,6 +353,39 @@ public class MQConst { ...@@ -353,6 +353,39 @@ public class MQConst {
} }
} }
public enum DragonQueue {
//支付相关队列
DRAGON_PAY_KEY("dragon:stream:dragon-pay","dragon-pay-group","同步数据-支付"),
DRAGON_REFUND_KEY ("dragon:stream:dragon-refund","dragon-refund-group","同步数据-退款"),
//三方异步通知相关队列
DRAGON_PAY_NOTIFY_KEY("dragon:stream:dragon-pay-notify-key","dragon-pay-notify-group","支付异步通知"),
//商户通知相关队列
PAY_MCH_NOTIFY_KEY("dragon:stream:pay-mch-notify-key","pay-mch-notify-group","商户异步通知"),
PAY_MCH_NOTIFY_ERROR_KEY("dragon:stream:pay-mch-notify-error-key","pay-mch-notify-error-group","商户异步通知失败");
private final String key;
private final String group;
private final String desc;
DragonQueue(String key, String group, String desc) {
this.key = key;
this.group = group;
this.desc = desc;
}
public String getKey() {
return key;
}
public String getGroup() {
return group;
}
public String getDesc() {
return desc;
}
}
public enum CommonQueue { public enum CommonQueue {
/** /**
* SQL持久化:优先级0<1<2<3... * SQL持久化:优先级0<1<2<3...
......
package com.liquidnet.service.consumer.base.receiver.dragon;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 支付商户通知消费
*/
@Slf4j
@Component
public class ConsumerDragonMchNotifyFailReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.DragonQueue.PAY_MCH_NOTIFY_ERROR_KEY.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.DragonQueue.PAY_MCH_NOTIFY_ERROR_KEY.getGroup();
}
}
package com.liquidnet.service.consumer.base.receiver.dragon;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 支付商户通知消费
*/
@Slf4j
@Component
public class ConsumerDragonMchNotifyReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.DragonQueue.PAY_MCH_NOTIFY_KEY.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.DragonQueue.PAY_MCH_NOTIFY_KEY.getGroup();
}
}
package com.liquidnet.service.consumer.base.receiver.dragon;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 支付商户通知消费
*/
@Slf4j
@Component
public class ConsumerDragonPayNotifyReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.DragonQueue.DRAGON_PAY_NOTIFY_KEY.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.DragonQueue.DRAGON_PAY_NOTIFY_KEY.getGroup();
}
}
package com.liquidnet.service.consumer.base.receiver.dragon;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerDragonPayReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.DragonQueue.DRAGON_PAY_KEY.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.DragonQueue.DRAGON_PAY_KEY.getGroup();
}
}
package com.liquidnet.service.consumer.base.receiver.dragon;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.base.receiver.AbstractSqlRedisReceiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerDragonRefundReceiver extends AbstractSqlRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.DragonQueue.DRAGON_REFUND_KEY.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.DragonQueue.DRAGON_REFUND_KEY.getGroup();
}
// @Override
// public void onMessage(MapRecord<String, String, String> message) {
// log.info("接受到来自redis REFUND 的消息");
// log.info("message id " + message.getId());
// log.info("stream " + message.getStream());
// log.info("body " + message.getValue());
// boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
//// if(result){
// log.info("consumer success delete message messageId:{} ",message.getId());
// try {
//// stringRedisTemplate.multi();
// stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
// stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
//// stringRedisTemplate.exec();
// } catch (Exception e) {
// log.error("delete redis queue message Exception error: {} ",e);
// log.error("delete redis queue message error messageId:{} errMsg:{}",message.getId(),e.getMessage());
// }finally {
// stringRedisTemplate.opsForStream().acknowledge(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_GROUP.getCode(), message);
// stringRedisTemplate.opsForStream().delete(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode(), message.getId());
// }
//// }
// }
//
// private boolean consumerSqlDaoHandler(String msg) {
// try {
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
// log.debug("CONSUMER MSG ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
// log.debug("CONSUMER MSG ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
// Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// log.debug("CONSUMER MSG result of execution:{}", rstBatchSqls);
// if (rstBatchSqls) {
// return true;
// }else{
// sendMySqlRedis(msg);
// }
// } catch (Exception e) {
// e.printStackTrace();
// log.error("CONSUMER MSG Exception error:{}", e);
// }
// return false;
// }
//
// /**
// * 给 REDIS 队列发送消息 数据库相关
// *
// * @param msg 接收到的内容
// * @return
// */
// private boolean sendMySqlRedis(String msg) {
// try {
// HashMap<String, String> map = new HashMap<>();
// map.put("message", msg);
// MapRecord<String, String, String> record = StreamRecords.mapBacked(map).withStreamKey(DragonConstant.MysqlRedisQueueEnum.DRAGON_REFUND_KEY.getCode());
// stringRedisTemplate.opsForStream().add(record);
// return true;
// } catch (Exception e) {
// e.printStackTrace();
// return false;
// }
// }
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment