记得上下班打卡 | git大法好,push需谨慎

Commit ab1901f3 authored by jiangxiulong's avatar jiangxiulong

消费

parent c7927f5a
...@@ -8,6 +8,7 @@ import com.liquidnet.service.feign.stone.api.FeignStoneIntegralClient; ...@@ -8,6 +8,7 @@ import com.liquidnet.service.feign.stone.api.FeignStoneIntegralClient;
import com.liquidnet.service.sweet.param.SweetStoneIntegralParam; import com.liquidnet.service.sweet.param.SweetStoneIntegralParam;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -30,6 +31,28 @@ public class ConsumerSweetStoneIntegralReceiver extends AbstractSqlRedisReceiver ...@@ -30,6 +31,28 @@ public class ConsumerSweetStoneIntegralReceiver extends AbstractSqlRedisReceiver
return MQConst.SweetQueue.SWEET_STONE_INTEGRAL.getGroup(); return MQConst.SweetQueue.SWEET_STONE_INTEGRAL.getGroup();
} }
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.debug("CONSUMER SQL[streamKey:{},messageId:{},stream:{},body:{}]",
this.getRedisStreamKey(), message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerSqlDaoHandler(message.getValue().get("message"));
log.info("CONSUMER SQL RESULT:{} ==> MESSAGE_ID:{}", result, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER SQL RESULT:{} ==> DEL_REDIS_QUEUE_MSG_EXCEPTION[MESSAGE_ID:{},MSG:{}]", result, message.getId(), JsonUtils.toJson(message), e);
} finally {
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception ignored) {
}
}
}
private boolean consumerSqlDaoHandler(String msg) { private boolean consumerSqlDaoHandler(String msg) {
Boolean aBoolean = false; Boolean aBoolean = false;
try { try {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment