记得上下班打卡 | git大法好,push需谨慎

Commit b799cdec authored by 胡佳晨's avatar 胡佳晨

消费

parent 314aa040
......@@ -15,7 +15,8 @@ import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.SweetQueue.SWEET_USER_INSERT_DRAW;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_BACK;
@Configuration
public class ConsumerCandyCouponBackRedisStreamConfig {
......@@ -40,8 +41,8 @@ public class ConsumerCandyCouponBackRedisStreamConfig {
*/
private Subscription receiveSqlCandyCouponBack(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(SWEET_USER_INSERT_DRAW.getGroup(), SWEET_USER_INSERT_DRAW.name() + t),
StreamOffset.create(SWEET_USER_INSERT_DRAW.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponBackRdsReceiver
Consumer.from(COUPON_BACK.getGroup(), COUPON_BACK.name() + t),
StreamOffset.create(COUPON_BACK.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponBackRdsReceiver
);
}
......
package com.liquidnet.service.consumer.kylin.config;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerCandyCouponBackRdsReceiver;
import com.liquidnet.service.consumer.kylin.receiver.ConsumerCandyCouponReceiveRdsReceiver;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -16,6 +15,8 @@ import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_RECEIVE;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_USE;
import static com.liquidnet.service.base.constant.MQConst.SweetQueue.SWEET_USER_INSERT_DRAW;
@Configuration
......@@ -41,8 +42,8 @@ public class ConsumerCandyCouponReceiveRedisStreamConfig {
*/
private Subscription receiveSqlCandyCouponReceive(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(SWEET_USER_INSERT_DRAW.getGroup(), SWEET_USER_INSERT_DRAW.name() + t),
StreamOffset.create(SWEET_USER_INSERT_DRAW.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponReceiveRdsReceiver
Consumer.from(COUPON_RECEIVE.getGroup(), COUPON_RECEIVE.name() + t),
StreamOffset.create(COUPON_RECEIVE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponReceiveRdsReceiver
);
}
......
......@@ -16,6 +16,8 @@ import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_RECEIVE;
import static com.liquidnet.service.base.constant.MQConst.CandyQueue.COUPON_USE;
import static com.liquidnet.service.base.constant.MQConst.SweetQueue.SWEET_USER_INSERT_DRAW;
@Configuration
......@@ -41,8 +43,8 @@ public class ConsumerCandyCouponUseRedisStreamConfig {
*/
private Subscription receiveSqlCandyCouponUse(StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer, int t) {
return listenerContainer.receiveAutoAck(
Consumer.from(SWEET_USER_INSERT_DRAW.getGroup(), SWEET_USER_INSERT_DRAW.name() + t),
StreamOffset.create(SWEET_USER_INSERT_DRAW.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponUseRdsReceiver
Consumer.from(COUPON_USE.getGroup(), COUPON_USE.name() + t),
StreamOffset.create(COUPON_USE.getKey(), ReadOffset.lastConsumed()), consumerCandyCouponUseRdsReceiver
);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment