记得上下班打卡 | git大法好,push需谨慎

Commit 2d29b94d authored by zhanggb's avatar zhanggb

~queue:ChimeQueue.USER_OPERATION_LIKE;

~queue:ChimeQueue.USER_OPERATION_DISLIKE;
parent 8bd1a3cf
...@@ -389,7 +389,5 @@ public class MQConst { ...@@ -389,7 +389,5 @@ public class MQConst {
} }
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(ChimeQueue.USER_OPERATION_LIKE.name());
} }
} }
package com.liquidnet.service.consumer.adam.receiver; //package com.liquidnet.service.consumer.adam.receiver;
//
import com.liquidnet.commons.lang.util.CollectionUtil; //import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils; //import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo; //import com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo;
import com.liquidnet.service.consumer.adam.util.ChimeDataUtils; //import com.liquidnet.service.consumer.adam.util.ChimeDataUtils;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord; //import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords; //import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate; //import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener; //import org.springframework.data.redis.stream.StreamListener;
//
import java.util.HashMap; //import java.util.HashMap;
//
@Slf4j //@Slf4j
public abstract class AbstractChimeRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> { //public abstract class AbstractChimeRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired // @Autowired
StringRedisTemplate stringRedisTemplate; // StringRedisTemplate stringRedisTemplate;
//
@Autowired // @Autowired
private ChimeDataUtils chimeDataUtils; // private ChimeDataUtils chimeDataUtils;
//
@Override // @Override
public void onMessage(MapRecord<String, String, String> message) { // public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey(); // String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue()); // log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message")); // boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId()); // log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
//
try { // try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message); // stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
try { // try {
stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId()); // stringRedisTemplate.opsForStream().delete(redisStreamKey, message.getId());
} catch (Exception e) { // } catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e); // log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", redisStreamKey, result, message.getValue(), e);
} // }
} // }
//
private boolean consumerMessageHandler(String msg) { // private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false; // boolean aBoolean = false;
try { // try {
ChimeUserOperLogVo textMessage = JsonUtils.fromJson(msg, ChimeUserOperLogVo.class); // ChimeUserOperLogVo textMessage = JsonUtils.fromJson(msg, ChimeUserOperLogVo.class);
if (textMessage == null) { // if (textMessage == null) {
aBoolean = true; // aBoolean = true;
} else { // } else {
//执行计数 // //执行计数
chimeDataUtils.updateChimeUser(textMessage); // chimeDataUtils.updateChimeUser(textMessage);
//创建操作日志 // //创建操作日志
chimeDataUtils.createUserOperLog(textMessage); // chimeDataUtils.createUserOperLog(textMessage);
aBoolean = true; // aBoolean = true;
} // }
} catch (Exception e) { // } catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e); // log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally { // } finally {
if (!aBoolean) { // if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString(); // HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg); // map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey())); // stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
} // }
} // }
return aBoolean; // return aBoolean;
} // }
//
protected abstract String getRedisStreamKey(); // protected abstract String getRedisStreamKey();
//
protected abstract String getRedisStreamGroup(); // protected abstract String getRedisStreamGroup();
} //}
package com.liquidnet.service.consumer.adam.receiver; //package com.liquidnet.service.consumer.adam.receiver;
//
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
@Component //@Component
public class ConsumerChimeUserOperationDisLikeRdsReceiver extends AbstractChimeRedisReceiver { //public class ConsumerChimeUserOperationDisLikeRdsReceiver extends AbstractChimeRedisReceiver {
@Override // @Override
protected String getRedisStreamKey() { // protected String getRedisStreamKey() {
return MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getKey(); // return MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getKey();
} // }
//
@Override // @Override
protected String getRedisStreamGroup() { // protected String getRedisStreamGroup() {
return MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getGroup(); // return MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getGroup();
} // }
} //}
package com.liquidnet.service.consumer.adam.receiver; //package com.liquidnet.service.consumer.adam.receiver;
//
import com.liquidnet.service.base.constant.MQConst; //import com.liquidnet.service.base.constant.MQConst;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
@Component //@Component
public class ConsumerChimeUserOperationLikeRdsReceiver extends AbstractChimeRedisReceiver { //public class ConsumerChimeUserOperationLikeRdsReceiver extends AbstractChimeRedisReceiver {
@Override // @Override
protected String getRedisStreamKey() { // protected String getRedisStreamKey() {
return MQConst.ChimeQueue.USER_OPERATION_LIKE.getKey(); // return MQConst.ChimeQueue.USER_OPERATION_LIKE.getKey();
} // }
//
@Override // @Override
protected String getRedisStreamGroup() { // protected String getRedisStreamGroup() {
return MQConst.ChimeQueue.USER_OPERATION_LIKE.getGroup(); // return MQConst.ChimeQueue.USER_OPERATION_LIKE.getGroup();
} // }
} //}
package com.liquidnet.service.consumer.adam.util; //package com.liquidnet.service.consumer.adam.util;
//
import com.liquidnet.service.chime.constant.ChimeConstant; //import com.liquidnet.service.chime.constant.ChimeConstant;
import com.liquidnet.service.chime.vo.mongo.ChimeUserInfoVo; //import com.liquidnet.service.chime.vo.mongo.ChimeUserInfoVo;
import com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo; //import com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo;
import com.mongodb.client.result.UpdateResult; //import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate; //import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria; //import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; //import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update; //import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
@Slf4j //@Slf4j
@Component //@Component
public class ChimeDataUtils { //public class ChimeDataUtils {
//
@Autowired // @Autowired
private MongoTemplate mongoTemplate; // private MongoTemplate mongoTemplate;
//
/** // /**
* 添加操作日志 // * 添加操作日志
* @param chimeUserOperLogVo // * @param chimeUserOperLogVo
*/ // */
public void createUserOperLog(ChimeUserOperLogVo chimeUserOperLogVo){ // public void createUserOperLog(ChimeUserOperLogVo chimeUserOperLogVo){
mongoTemplate.save(chimeUserOperLogVo,ChimeUserOperLogVo.class.getSimpleName()); // mongoTemplate.save(chimeUserOperLogVo,ChimeUserOperLogVo.class.getSimpleName());
} // }
//
/** // /**
* 修改社交用户操作计数 // * 修改社交用户操作计数
* @param chimeUserOperLogVo // * @param chimeUserOperLogVo
*/ // */
public void updateChimeUser(ChimeUserOperLogVo chimeUserOperLogVo) { // public void updateChimeUser(ChimeUserOperLogVo chimeUserOperLogVo) {
ChimeUserInfoVo chimeUserInfoVo = null; // ChimeUserInfoVo chimeUserInfoVo = null;
if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)||chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){ // if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)||chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
chimeUserInfoVo = this.getUserByUserId(chimeUserOperLogVo.getTargetUserId()); // chimeUserInfoVo = this.getUserByUserId(chimeUserOperLogVo.getTargetUserId());
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE_MYSELF_CLICK)||chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE_MYSELF_CLICK)){ // }else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE_MYSELF_CLICK)||chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE_MYSELF_CLICK)){
chimeUserInfoVo = this.getUserByUserId(chimeUserOperLogVo.getCurrentUserId()); // chimeUserInfoVo = this.getUserByUserId(chimeUserOperLogVo.getCurrentUserId());
} // }
if(chimeUserInfoVo == null){ // if(chimeUserInfoVo == null){
log.error("chimeUserInfoVo is null userId is not exist:"); // log.error("chimeUserInfoVo is null userId is not exist:");
return; // return;
} // }
Query query = Query.query(Criteria.where("userId").is(chimeUserInfoVo.getUserId()));
Update update = new Update();
if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)){
update.set("likeCount", chimeUserInfoVo.getLikeCount() + 1);
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
update.set("disLikeCount", chimeUserInfoVo.getDisLikeCount() + 1);
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE_MYSELF_CLICK)){
update.set("clickLikeCount", chimeUserInfoVo.getClickLikeCount() + 1);
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE_MYSELF_CLICK)){
update.set("clickDisLikeCount", chimeUserInfoVo.getClickDisLikeCount() + 1);
}
UpdateResult result = mongoTemplate.updateFirst(query,update,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
log.info("updateChimeUser result:{}",result.toString());
// Query query = Query.query(Criteria.where("userId").is(chimeUserInfoVo.getUserId())); // Query query = Query.query(Criteria.where("userId").is(chimeUserInfoVo.getUserId()));
//// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo))); // Update update = new Update();
// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo)));
// if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)){ // if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)){
// update.set("likeCount", chimeUserInfoVo.getLikeCount() + 1); // update.set("likeCount", chimeUserInfoVo.getLikeCount() + 1);
// }else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){ // }else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
// update.set("disLikeCount", chimeUserInfoVo.getDisLikeCount() + 1); // update.set("disLikeCount", chimeUserInfoVo.getDisLikeCount() + 1);
// }else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE_MYSELF_CLICK)){
// update.set("clickLikeCount", chimeUserInfoVo.getClickLikeCount() + 1);
// }else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE_MYSELF_CLICK)){
// update.set("clickDisLikeCount", chimeUserInfoVo.getClickDisLikeCount() + 1);
// } // }
// update.set("likeCount", 8); //
// update.set("disLikeCount", 9);
// UpdateResult result = mongoTemplate.updateFirst(query,update,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName()); // UpdateResult result = mongoTemplate.updateFirst(query,update,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
// log.info("updateChimeUser result:{}",result.toString()); // log.info("updateChimeUser result:{}",result.toString());
} //
//// Query query = Query.query(Criteria.where("userId").is(chimeUserInfoVo.getUserId()));
/** ////// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo)));
* 获取单个用户信息 //// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo)));
* @param userId //// if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)){
* @return //// update.set("likeCount", chimeUserInfoVo.getLikeCount() + 1);
*/ //// }else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
public ChimeUserInfoVo getUserByUserId(String userId){ //// update.set("disLikeCount", chimeUserInfoVo.getDisLikeCount() + 1);
Query query = Query.query(Criteria.where("userId").is(userId)); //// }
ChimeUserInfoVo chimeUserInfoVo = mongoTemplate.findOne(query,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName()); //// update.set("likeCount", 8);
return chimeUserInfoVo; //// update.set("disLikeCount", 9);
} //// UpdateResult result = mongoTemplate.updateFirst(query,update,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
} //// log.info("updateChimeUser result:{}",result.toString());
// }
//
// /**
// * 获取单个用户信息
// * @param userId
// * @return
// */
// public ChimeUserInfoVo getUserByUserId(String userId){
// Query query = Query.query(Criteria.where("userId").is(userId));
// ChimeUserInfoVo chimeUserInfoVo = mongoTemplate.findOne(query,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
// return chimeUserInfoVo;
// }
//}
...@@ -49,17 +49,17 @@ ...@@ -49,17 +49,17 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.liquidnet</groupId> <groupId>com.liquidnet</groupId>
<artifactId>liquidnet-service-sweet-do</artifactId> <artifactId>liquidnet-api-feign-stone</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.liquidnet</groupId> <groupId>com.liquidnet</groupId>
<artifactId>liquidnet-api-feign-stone</artifactId> <artifactId>liquidnet-service-sweet-api</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.liquidnet</groupId> <groupId>com.liquidnet</groupId>
<artifactId>liquidnet-service-sweet-api</artifactId> <artifactId>liquidnet-service-chime-api</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
......
...@@ -47,6 +47,10 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -47,6 +47,10 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
ConsumerSweetTemplateMsgReceiver consumerSweetTemplateMsgReceiver; ConsumerSweetTemplateMsgReceiver consumerSweetTemplateMsgReceiver;
@Autowired @Autowired
ConsumerSweetStoneIntegralReceiver consumerSweetStoneIntegralReceiver; ConsumerSweetStoneIntegralReceiver consumerSweetStoneIntegralReceiver;
@Autowired
ConsumerChimeUserOperationLikeReceiver consumerChimeUserOperationLikeReceiver;
@Autowired
ConsumerChimeUserOperationDislikeReceiver consumerChimeUserOperationDislikeReceiver;
/*------sweet------*/ /*------sweet------*/
@Bean // 发送模版消息 @Bean // 发送模版消息
...@@ -64,7 +68,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -64,7 +68,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
} }
return subscriptionList; return subscriptionList;
} }
@Bean // stone积分处理 @Bean
public List<Subscription> subscriptionSweetSqlApiStoneIntgral(RedisConnectionFactory factory) { public List<Subscription> subscriptionSweetSqlApiStoneIntgral(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.SweetQueue stream = MQConst.SweetQueue.SWEET_STONE_INTEGRAL; MQConst.SweetQueue stream = MQConst.SweetQueue.SWEET_STONE_INTEGRAL;
...@@ -80,7 +84,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -80,7 +84,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
return subscriptionList; return subscriptionList;
} }
@Bean// 短信通知 @Bean
public List<Subscription> subscriptionCommonSmsNotice(RedisConnectionFactory factory) { public List<Subscription> subscriptionCommonSmsNotice(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.KylinQueue stream = MQConst.KylinQueue.SMS_NOTICE; MQConst.KylinQueue stream = MQConst.KylinQueue.SMS_NOTICE;
...@@ -96,7 +100,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -96,7 +100,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
return subscriptionList; return subscriptionList;
} }
@Bean// 藏品上传声明 @Bean
public List<Subscription> subscriptionGoblinBizArtworkUpl(RedisConnectionFactory factory) { public List<Subscription> subscriptionGoblinBizArtworkUpl(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_UPL; MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_UPL;
...@@ -112,7 +116,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -112,7 +116,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
return subscriptionList; return subscriptionList;
} }
@Bean// 藏品声明查询 @Bean
public List<Subscription> subscriptionGoblinBizArtworkClq(RedisConnectionFactory factory) { public List<Subscription> subscriptionGoblinBizArtworkClq(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_CLQ; MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_ARTWORK_CLQ;
...@@ -172,7 +176,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -172,7 +176,7 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
return subscriptionList; return subscriptionList;
} }
@Bean// 增减积分 @Bean
public List<Subscription> subscriptionGoblinBizIntegral(RedisConnectionFactory factory) { public List<Subscription> subscriptionGoblinBizIntegral(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>(); List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_INTEGRAL; MQConst.GoblinQueue stream = MQConst.GoblinQueue.BIZ_INTEGRAL;
...@@ -187,4 +191,36 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig { ...@@ -187,4 +191,36 @@ public class ConsumerCommonBizRedisStreamConfig extends RedisStreamConfig {
} }
return subscriptionList; return subscriptionList;
} }
@Bean
public List<Subscription> subscriptionChimeUserOperationLike(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.ChimeQueue stream = MQConst.ChimeQueue.USER_OPERATION_LIKE;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerChimeUserOperationLikeReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
@Bean
public List<Subscription> subscriptionChimeUserOperationDislike(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.ChimeQueue stream = MQConst.ChimeQueue.USER_OPERATION_DISLIKE;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), consumerChimeUserOperationDislikeReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
} }
package com.liquidnet.service.consumer.base.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.chime.constant.ChimeConstant;
import com.liquidnet.service.chime.vo.mongo.ChimeUserInfoVo;
import com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo;
import com.liquidnet.service.consumer.base.util.ChimeDataUtils;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@Slf4j
@Component
public class ConsumerChimeUserOperationDislikeReceiver extends AbstractBizRedisReceiver {
@Autowired
private ChimeDataUtils chimeDataUtils;
@Override
protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
ChimeUserOperLogVo textMessage = JsonUtils.fromJson(msg, ChimeUserOperLogVo.class);
if (textMessage == null) {
aBoolean = true;
} else {
//执行计数
chimeDataUtils.updateChimeUser(textMessage);
//创建操作日志
chimeDataUtils.createUserOperLog(textMessage);
aBoolean = true;
}
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
@Override
protected String getRedisStreamKey() {
return MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.ChimeQueue.USER_OPERATION_DISLIKE.getGroup();
}
/* -------------------------------------------------------------------------- */
/**
* 添加操作日志
* @param chimeUserOperLogVo
*/
private void createUserOperLog(ChimeUserOperLogVo chimeUserOperLogVo){
mongoTemplate.save(chimeUserOperLogVo,ChimeUserOperLogVo.class.getSimpleName());
}
/**
* 修改社交用户操作计数
* @param chimeUserOperLogVo
*/
private void updateChimeUser(ChimeUserOperLogVo chimeUserOperLogVo) {
ChimeUserInfoVo chimeUserInfoVo = null;
if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)||chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
chimeUserInfoVo = this.getUserByUserId(chimeUserOperLogVo.getTargetUserId());
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE_MYSELF_CLICK)||chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE_MYSELF_CLICK)){
chimeUserInfoVo = this.getUserByUserId(chimeUserOperLogVo.getCurrentUserId());
}
if(chimeUserInfoVo == null){
log.error("chimeUserInfoVo is null userId is not exist:");
return;
}
Query query = Query.query(Criteria.where("userId").is(chimeUserInfoVo.getUserId()));
Update update = new Update();
if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)){
update.set("likeCount", chimeUserInfoVo.getLikeCount() + 1);
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
update.set("disLikeCount", chimeUserInfoVo.getDisLikeCount() + 1);
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE_MYSELF_CLICK)){
update.set("clickLikeCount", chimeUserInfoVo.getClickLikeCount() + 1);
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE_MYSELF_CLICK)){
update.set("clickDisLikeCount", chimeUserInfoVo.getClickDisLikeCount() + 1);
}
UpdateResult result = mongoTemplate.updateFirst(query,update,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
log.info("updateChimeUser result:{}",result.toString());
// Query query = Query.query(Criteria.where("userId").is(chimeUserInfoVo.getUserId()));
//// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo)));
// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo)));
// if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)){
// update.set("likeCount", chimeUserInfoVo.getLikeCount() + 1);
// }else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
// update.set("disLikeCount", chimeUserInfoVo.getDisLikeCount() + 1);
// }
// update.set("likeCount", 8);
// update.set("disLikeCount", 9);
// UpdateResult result = mongoTemplate.updateFirst(query,update,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
// log.info("updateChimeUser result:{}",result.toString());
}
/**
* 获取单个用户信息
* @param userId
* @return
*/
private ChimeUserInfoVo getUserByUserId(String userId){
Query query = Query.query(Criteria.where("userId").is(userId));
ChimeUserInfoVo chimeUserInfoVo = mongoTemplate.findOne(query,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
return chimeUserInfoVo;
}
}
package com.liquidnet.service.consumer.base.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.chime.constant.ChimeConstant;
import com.liquidnet.service.chime.vo.mongo.ChimeUserInfoVo;
import com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo;
import com.liquidnet.service.consumer.base.util.ChimeDataUtils;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@Slf4j
@Component
public class ConsumerChimeUserOperationLikeReceiver extends AbstractBizRedisReceiver {
@Autowired
private ChimeDataUtils chimeDataUtils;
@Override
protected boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
ChimeUserOperLogVo textMessage = JsonUtils.fromJson(msg, ChimeUserOperLogVo.class);
if (textMessage == null) {
aBoolean = true;
} else {
//执行计数
chimeDataUtils.updateChimeUser(textMessage);
//创建操作日志
chimeDataUtils.createUserOperLog(textMessage);
aBoolean = true;
}
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
@Override
protected String getRedisStreamKey() {
return MQConst.ChimeQueue.USER_OPERATION_LIKE.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.ChimeQueue.USER_OPERATION_LIKE.getGroup();
}
}
package com.liquidnet.service.consumer.base.util;
import com.liquidnet.service.chime.constant.ChimeConstant;
import com.liquidnet.service.chime.vo.mongo.ChimeUserInfoVo;
import com.liquidnet.service.chime.vo.mongo.ChimeUserOperLogVo;
import com.mongodb.client.result.UpdateResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ChimeDataUtils {
@Autowired
private MongoTemplate mongoTemplate;
/**
* 添加操作日志
* @param chimeUserOperLogVo
*/
public void createUserOperLog(ChimeUserOperLogVo chimeUserOperLogVo){
mongoTemplate.save(chimeUserOperLogVo,ChimeUserOperLogVo.class.getSimpleName());
}
/**
* 修改社交用户操作计数
* @param chimeUserOperLogVo
*/
public void updateChimeUser(ChimeUserOperLogVo chimeUserOperLogVo) {
ChimeUserInfoVo chimeUserInfoVo = null;
if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)||chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
chimeUserInfoVo = this.getUserByUserId(chimeUserOperLogVo.getTargetUserId());
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE_MYSELF_CLICK)||chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE_MYSELF_CLICK)){
chimeUserInfoVo = this.getUserByUserId(chimeUserOperLogVo.getCurrentUserId());
}
if(chimeUserInfoVo == null){
log.error("chimeUserInfoVo is null userId is not exist:");
return;
}
Query query = Query.query(Criteria.where("userId").is(chimeUserInfoVo.getUserId()));
Update update = new Update();
if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)){
update.set("likeCount", chimeUserInfoVo.getLikeCount() + 1);
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
update.set("disLikeCount", chimeUserInfoVo.getDisLikeCount() + 1);
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE_MYSELF_CLICK)){
update.set("clickLikeCount", chimeUserInfoVo.getClickLikeCount() + 1);
}else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE_MYSELF_CLICK)){
update.set("clickDisLikeCount", chimeUserInfoVo.getClickDisLikeCount() + 1);
}
UpdateResult result = mongoTemplate.updateFirst(query,update,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
log.info("updateChimeUser result:{}",result.toString());
// Query query = Query.query(Criteria.where("userId").is(chimeUserInfoVo.getUserId()));
//// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo)));
// Update update = Update.fromDocument(Document.parse(JsonUtils.toJson(chimeUserInfoVo)));
// if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_LIKE)){
// update.set("likeCount", chimeUserInfoVo.getLikeCount() + 1);
// }else if(chimeUserOperLogVo.getOpType().equalsIgnoreCase(ChimeConstant.LOG_USER_OPERATION_DISLIKE)){
// update.set("disLikeCount", chimeUserInfoVo.getDisLikeCount() + 1);
// }
// update.set("likeCount", 8);
// update.set("disLikeCount", 9);
// UpdateResult result = mongoTemplate.updateFirst(query,update,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
// log.info("updateChimeUser result:{}",result.toString());
}
/**
* 获取单个用户信息
* @param userId
* @return
*/
public ChimeUserInfoVo getUserByUserId(String userId){
Query query = Query.query(Criteria.where("userId").is(userId));
ChimeUserInfoVo chimeUserInfoVo = mongoTemplate.findOne(query,ChimeUserInfoVo.class,ChimeUserInfoVo.class.getSimpleName());
return chimeUserInfoVo;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment