记得上下班打卡 | git大法好,push需谨慎

Commit 9d9742f6 authored by anjiabin's avatar anjiabin

提交galaxy消费相关

parent 6d8755f2
...@@ -284,6 +284,35 @@ public class MQConst { ...@@ -284,6 +284,35 @@ public class MQConst {
} }
} }
public enum GalaxyQueue {
SQL_USER_INFO("galaxy:stream:rk.sql.userInfo", "group.sql.userInfo", "用户注册信息"),
SQL_SERIES_INFO("galaxy:stream:rk.sql.seriesInfo", "group.sql.seriesInfo", "系列信息"),
SQL_NFT_ORDER_INFO("galaxy:stream:rk.sql.nftOrderInfo", "group.sql.nftOrderInfo", "订单信息"),
SQL_NFT_TRADE_INFO("galaxy:stream:rk.sql.nftTradeInfo", "group.sql.nftTradeInfo", "交易信息"),
;
private final String key;
private final String group;
private final String desc;
GalaxyQueue(String key, String group, String desc) {
this.key = key;
this.group = group;
this.desc = desc;
}
public String getKey() {
return key;
}
public String getGroup() {
return group;
}
public String getDesc() {
return desc;
}
}
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(ChimeQueue.USER_OPERATION_LIKE.name()); System.out.println(ChimeQueue.USER_OPERATION_LIKE.name());
} }
......
liquidnet:
system:
updating:
switch: false
info:
port: 9992
context:
# context: /service-consumer
name: liquidnet-service-consumer-galaxy
logfile:
path: /data/logs
name: service-consumer-galaxy
config: classpath:logback-spring.xml
level: debug
mysql:
database-name: dev_ln_scene
mongodb:
sslEnabled: false
database: dev_ln_scene
#以下为spring各环境个性配置
\ No newline at end of file
liquidnet:
system:
updating:
switch: false
info:
port: 9998
context:
# context: /service-consumer
name: liquidnet-service-consumer-galaxy
logfile:
path: /data/logs
name: service-consumer-galaxy
config: classpath:logback-spring.xml
level: info
mysql:
database-name: test_ln_scene
mongodb:
sslEnabled: false
database: test_ln_scene
#以下为spring各环境个性配置
server:
port: ${liquidnet.info.port}
tomcat:
uri-encoding: UTF-8
servlet:
context-path: ${liquidnet.info.context}
# -----------------------------------------------------------
knife4j:
production: ${liquidnet.knife4j.disable}
basic:
enable: true
username: ${liquidnet.security.username}
password: ${liquidnet.security.password}
# -----------------------------------------------------------
logging:
# config: ${liquidnet.logfile.config}
file:
name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}.log
max-size: 200MB
pattern:
file: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [ %-5level] %thread [%logger{36}:%line] - %msg%n'
rolling-file-name: ${liquidnet.logfile.path}/${liquidnet.logfile.name}-%d{yyyy-MM-dd}.%i.log
level:
root: error
#以下是为指定包设置日志级别
com.liquidnet: ${liquidnet.logfile.level}
# -----------------------------------------------------------
eureka:
# client:
# register-with-eureka: true
# fetch-registry: true
# serviceUrl:
# defaultZone: http://${liquidnet.security.username}:${liquidnet.security.password}@${liquidnet.eureka.host}/eureka-server/eureka
instance:
hostname: ${spring.cloud.client.ip-address}
lease-expiration-duration-in-seconds: 15 #服务过期时间配置,超过这个时间没有接收到心跳EurekaServer就会将这个实例剔除
lease-renewal-interval-in-seconds: 5 #服务刷新时间配置,每隔这个时间会主动心跳一次
prefer-ip-address: true
instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
# -----------------------------------------------------------
#actuator/info
info:
app:
name: ${liquidnet.info.name}
company:
name: zhengzai.tv
build:
groupId: '@project.groupId@'
artifactId: '@project.artifactId@'
version: '@project.version@'
# -----------------------------------------------------------
spring:
application:
name: ${liquidnet.info.name}
profiles:
include: common-service #这里加载management相关公共配置
redis:
database: ${liquidnet.redis.adam.database}
port: ${liquidnet.redis.adam.port}
host: ${liquidnet.redis.adam.host}
password: ${liquidnet.redis.adam.password}
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
datasource:
name: ${liquidnet.mysql.database-name}
url: jdbc:mysql://${liquidnet.mysql.urlHostAndPort}/${liquidnet.mysql.database-name}?serverTimezone=Asia/Shanghai&characterEncoding=utf-8&useSSL=false
username: ${liquidnet.mysql.username}
password: ${liquidnet.mysql.password}
# type: org.apache.tomcat.jdbc.pool.DataSource
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 45
minimum-idle: 8
connection-test-query: SELECT 1
data:
mongodb:
uri: mongodb://${liquidnet.mongodb.user}:${liquidnet.mongodb.pwd}@${liquidnet.mongodb.host}/?authSource=admin&maxPoolSize=200&waitQueueMultiple=100
sslEnabled: ${liquidnet.mongodb.sslEnabled}
database: ${liquidnet.mongodb.database}
# -----------------------------------------------------------
# -----------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>liquidnet-service-consumer-all</artifactId>
<groupId>com.liquidnet</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>liquidnet-service-consumer-galaxy</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-common-cache-redis</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.liquidnet</groupId>
<artifactId>liquidnet-service-galaxy-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</build>
</project>
package com.liquidnet.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.env.Environment;
import java.net.InetAddress;
import java.util.Arrays;
@Slf4j
@SpringBootApplication(scanBasePackages = {"com.liquidnet"})
public class ServiceConsumerGalaxyApplication implements CommandLineRunner {
@Autowired
private Environment environment;
public static void main(String[] args) {
SpringApplication.run(ServiceConsumerGalaxyApplication.class, args);
}
@Override
public void run(String... strings) {
try {
log.info("\n----------------------------------------------------------\n\t" +
"Application '{}' is running! Access URLs:\n\t" +
"Local: \t\thttp://127.0.0.1:{}\n\t" +
"External: \thttp://{}:{}{}/doc.html\n\t" +
"Profile(s): \t{}\n----------------------------------------------------------",
environment.getProperty("spring.application.name"),
environment.getProperty("server.port"),
InetAddress.getLocalHost().getHostAddress(),
environment.getProperty("server.port"),
environment.getProperty("server.servlet.context-path"),
Arrays.toString(environment.getActiveProfiles()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.liquidnet.service.consumer.galaxy.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.galaxy.receiver.RedisSqlSeriesInfoReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.util.ArrayList;
import java.util.List;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: 区块链nft订单相关
* @class: ConsumerSqlNftOrderInfoConfig
* @Package com.liquidnet.service.consumer.galaxy.config
* @Copyright: LightNet @ Copyright (c) 2022
* @date 2022/3/25 14:45
*/
@Configuration
public class ConsumerSqlNftOrderInfoConfig extends RedisStreamConfig {
@Autowired
private RedisSqlSeriesInfoReceiver sqlSeriesInfoReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Bean
public List<Subscription> subscriptionSqlNftOrderInfo(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GalaxyQueue stream = MQConst.GalaxyQueue.SQL_NFT_ORDER_INFO;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), sqlSeriesInfoReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.galaxy.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.galaxy.receiver.RedisSqlSeriesInfoReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.util.ArrayList;
import java.util.List;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: 区块链nft交易相关
* @class: ConsumerSqlNftTradeInfoConfig
* @Package com.liquidnet.service.consumer.galaxy.config
* @Copyright: LightNet @ Copyright (c) 2022
* @date 2022/3/25 14:45
*/
@Configuration
public class ConsumerSqlNftTradeInfoConfig extends RedisStreamConfig {
@Autowired
private RedisSqlSeriesInfoReceiver sqlSeriesInfoReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Bean
public List<Subscription> subscriptionSqlNftTradeInfo(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GalaxyQueue stream = MQConst.GalaxyQueue.SQL_NFT_TRADE_INFO;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), sqlSeriesInfoReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.galaxy.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.galaxy.receiver.RedisSqlSeriesInfoReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.util.ArrayList;
import java.util.List;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: 区块链系列信息
* @class: ConsumerSqlSeriesInfoConfig
* @Package com.liquidnet.service.consumer.galaxy.config
* @Copyright: LightNet @ Copyright (c) 2022
* @date 2022/3/25 14:45
*/
@Configuration
public class ConsumerSqlSeriesInfoConfig extends RedisStreamConfig {
@Autowired
private RedisSqlSeriesInfoReceiver sqlSeriesInfoReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Bean
public List<Subscription> subscriptionSqlSeriesInfo(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GalaxyQueue stream = MQConst.GalaxyQueue.SQL_SERIES_INFO;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), sqlSeriesInfoReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.galaxy.config;
import com.liquidnet.common.cache.redis.config.RedisStreamConfig;
import com.liquidnet.service.base.constant.MQConst;
import com.liquidnet.service.consumer.galaxy.receiver.RedisSqlUserInfoReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.util.ArrayList;
import java.util.List;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: 区块链用户注册
* @class: ConsumerSqlUserInfoConfig
* @Package com.liquidnet.service.consumer.dragon.config
* @Copyright: LightNet @ Copyright (c) 2022
* @date 2022/3/22 14:45
*/
@Configuration
public class ConsumerSqlUserInfoConfig extends RedisStreamConfig {
@Autowired
private RedisSqlUserInfoReceiver sqlUserInfoReceiver;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Bean
public List<Subscription> subscriptionSqlUserInfo(RedisConnectionFactory factory) {
List<Subscription> subscriptionList = new ArrayList<>();
MQConst.GalaxyQueue stream = MQConst.GalaxyQueue.SQL_USER_INFO;
this.initStream(stringRedisTemplate, stream.getKey(), stream.getGroup());
for (int i = 0; i < 10; i++) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = this.buildStreamMessageListenerContainer(factory);
subscriptionList.add(listenerContainer.receiveAutoAck(
Consumer.from(stream.getGroup(), getConsumerName(stream.name() + i)),
StreamOffset.create(stream.getKey(), ReadOffset.lastConsumed()), sqlUserInfoReceiver
));
listenerContainer.start();
}
return subscriptionList;
}
}
package com.liquidnet.service.consumer.galaxy.receiver;
import com.liquidnet.commons.lang.util.CollectionUtil;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.base.SqlMapping;
import com.liquidnet.service.consumer.galaxy.service.IBaseDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.util.HashMap;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: TODO
* @class: AbstractRedisReceiver
* @Package com.liquidnet.service.consumer.dragon.receiver
* @Copyright: LightNet @ Copyright (c) 2022
* @date 2022/7/22 20:28
*/
@Slf4j
public abstract class AbstractRedisReceiver implements StreamListener<String, MapRecord<String, String, String>> {
@Autowired
private IBaseDao baseDao;
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String redisStreamKey = this.getRedisStreamKey();
log.debug("CONSUMER MSG[streamKey:{},messageId:{},stream:{},body:{}]", redisStreamKey, message.getId(), message.getStream(), message.getValue());
boolean result = this.consumerMessageHandler(message.getValue().get("message"));
log.info("CONSUMER MSG RESULT:{} ==> [{}]MESSAGE_ID:{}", result, redisStreamKey, message.getId());
try {
stringRedisTemplate.opsForStream().acknowledge(getRedisStreamGroup(), message);
} catch (Exception e) {
log.error("#CONSUMER MSG EX_ACK ==> [{}]RESULT:{},MESSAGE:{}", this.getRedisStreamKey(), result, message.getValue(), e);
}
try {
stringRedisTemplate.opsForStream().delete(this.getRedisStreamKey(), message.getId());
} catch (Exception e) {
log.error("#CONSUMER MSG EX_DEL ==> [{}]RESULT:{},MESSAGE:{}", this.getRedisStreamKey(), result, message.getValue(), e);
}
}
private boolean consumerMessageHandler(String msg) {
boolean aBoolean = false;
try {
SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(msg, SqlMapping.SqlMessage.class);
aBoolean = null == sqlMessage || baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
} catch (Exception e) {
log.error("CONSUMER MSG EX_HANDLE ==> [{}]:{}", this.getRedisStreamKey(), msg, e);
} finally {
if (!aBoolean) {
HashMap<String, String> map = CollectionUtil.mapStringString();
map.put("message", msg);
stringRedisTemplate.opsForStream().add(StreamRecords.mapBacked(map).withStreamKey(this.getRedisStreamKey()));
}
}
return aBoolean;
}
protected abstract String getRedisStreamKey();
protected abstract String getRedisStreamGroup();
}
package com.liquidnet.service.consumer.galaxy.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisSqlNftOrderInfoReceiver extends AbstractRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GalaxyQueue.SQL_NFT_ORDER_INFO.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GalaxyQueue.SQL_NFT_ORDER_INFO.getGroup();
}
}
package com.liquidnet.service.consumer.galaxy.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisSqlNftTradeInfoReceiver extends AbstractRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GalaxyQueue.SQL_NFT_TRADE_INFO.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GalaxyQueue.SQL_NFT_TRADE_INFO.getGroup();
}
}
package com.liquidnet.service.consumer.galaxy.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisSqlSeriesInfoReceiver extends AbstractRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GalaxyQueue.SQL_SERIES_INFO.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GalaxyQueue.SQL_SERIES_INFO.getGroup();
}
}
package com.liquidnet.service.consumer.galaxy.receiver;
import com.liquidnet.service.base.constant.MQConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisSqlUserInfoReceiver extends AbstractRedisReceiver {
@Override
protected String getRedisStreamKey() {
return MQConst.GalaxyQueue.SQL_USER_INFO.getKey();
}
@Override
protected String getRedisStreamGroup() {
return MQConst.GalaxyQueue.SQL_USER_INFO.getGroup();
}
}
package com.liquidnet.service.consumer.galaxy.service;
import java.util.LinkedList;
public interface IBaseDao {
/**
* 批量执行sql
*
* @param sql
* @param values
* @return
*/
Boolean batchSql(String sql, LinkedList<Object[]> values);
/**
* 批量执行不定量sql
*
* @param sql
* @param values
* @return
*/
Boolean batchSqls(LinkedList<String> sql, LinkedList<Object[]>... values);
/**
* 执行sql语句 无 参数
*
* @param sql
* @return
*/
Boolean batchSqlNoArgs(LinkedList<String> sql);
/**
* xs 新增一条记录且返回主键Id
*
* @param sql 新增待执行sql
* @param param 参数
* @return 主键ID
*/
int insertSqlAndReturnKeyId(final String sql, final Object[] param);
}
package com.liquidnet.service.consumer.galaxy.service.impl;
import com.liquidnet.commons.lang.util.JsonUtils;
import com.liquidnet.service.consumer.galaxy.service.IBaseDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Resource;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.LinkedList;
@Service
public class BaseDao implements IBaseDao {
private static final Logger log = LoggerFactory.getLogger(BaseDao.class);
@Resource
public JdbcTemplate jdbcTemplate;
@Resource(name = "transactionManager")
public DataSourceTransactionManager transactionManager;
@Override
public Boolean batchSql(final String sql, final LinkedList<Object[]> values) {
TransactionCallback<Boolean> callback = transactionStatus -> {
if (values.size() > 0) {
int[] ints = jdbcTemplate.batchUpdate(sql, values);
}
return true;
};
try {
TransactionTemplate tt = new TransactionTemplate(transactionManager);
return tt.execute(callback);
} catch (Exception ex) {
log.error("###\nSQL.Preparing:{}\nParameters:{}", JsonUtils.toJson(sql), JsonUtils.toJson(values), ex);
return false;
}
}
@Override
public Boolean batchSqls(final LinkedList<String> sql,
final LinkedList<Object[]>... values) {
try {
TransactionCallback<Boolean> callback = transactionStatus -> {
int i = 0;
for (LinkedList<Object[]> o : values) {
if (sql.size() < i + 1) {
break;
}
if (!o.isEmpty()) {
jdbcTemplate.batchUpdate(sql.get(i), o);
}
i++;
}
return true;
};
TransactionTemplate tt = new TransactionTemplate(transactionManager);
return tt.execute(callback);
} catch (Exception ex) {
log.error("###Error.Sqls:{}\nParameters:{},Ex:{}", JsonUtils.toJson(sql), JsonUtils.toJson(values), ex.getMessage());
return false;
}
}
@Override
public Boolean batchSqlNoArgs(final LinkedList<String> sql) {
try {
TransactionCallback<Boolean> callback = transactionStatus -> {
for (String o : sql) {
jdbcTemplate.execute(o);
}
return true;
};
TransactionTemplate tt = new TransactionTemplate(transactionManager);
return tt.execute(callback);
} catch (Exception ex) {
log.error("###Error.Sqls:{}\nParameters:{},Ex:{}", sql);
return false;
}
}
/**
* xs 新增一条记录且返回主键Id
*
* @param sql 新增待执行sql
* @param param 参数
* @return 主键ID
*/
public int insertSqlAndReturnKeyId(final String sql, final Object[] param) {
final String innersql = sql;
final Object[] innerO = param;
KeyHolder keyHolder = new GeneratedKeyHolder();
try {
jdbcTemplate.update(con -> {
PreparedStatement ps = con.prepareStatement(innersql,
Statement.RETURN_GENERATED_KEYS);
for (int i = 0; i < innerO.length; i++) {
ps.setObject(i + 1, innerO[i]);
}
return ps;
}, keyHolder);
} catch (Exception e) {
log.error("###\nSQL.Preparing:{}\nParameters:{}", sql, JsonUtils.toJson(param), e);
}
return keyHolder.getKey().intValue();
}
}
//package com.liquidnet.service.consumer.dragon.service.processor;
//
//import com.liquidnet.commons.lang.util.JsonUtils;
//import com.liquidnet.service.base.SqlMapping;
//import com.liquidnet.service.consumer.dragon.service.IBaseDao;
//import com.rabbitmq.client.Channel;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.core.MessageProperties;
//import org.springframework.data.redis.connection.stream.MapRecord;
//import org.springframework.data.redis.stream.StreamListener;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.Resource;
//import java.io.IOException;
//
///**
// * @author AnJiabin <anjiabin@zhengzai.tv>
// * @version V1.0
// * @Description: TODO
// * @class: ConsumerPayProcessor
// * @Package com.liquidnet.service.consumer.dragon.service.processor
// * @Copyright: LightNet @ Copyright (c) 2021
// * @date 2021/7/8 10:59
// */
//@Slf4j
//@Component
//public class ConsumerPayProcessor implements StreamListener<String, MapRecord<String, String, String>> {
// @Resource
// IBaseDao baseDao;
// @Override
// public void onMessage(MapRecord<String, String, String> message) {
// log.info("接受到来自redis PAY 的消息");
// System.out.println("message id "+message.getId());
// System.out.println("stream "+message.getStream());
// System.out.println("body "+message.getValue());
// }
//
//
// private void consumerSqlDaoHandler(Message msg, Channel channel) {
// MessageProperties properties = msg.getMessageProperties();
// String consumerQueue = properties.getConsumerQueue();
// long deliveryTag = properties.getDeliveryTag();
// log.info("CONSUMER SQL ==> [consumerQueue:{},deliveryTag:{}]", consumerQueue, deliveryTag);
// SqlMapping.SqlMessage sqlMessage = JsonUtils.fromJson(new String(msg.getBody()), SqlMapping.SqlMessage.class);
// log.debug("CONSUMER SQL ==> Preparing:{}", JsonUtils.toJson(sqlMessage.getSqls()));
// log.debug("CONSUMER SQL ==> Parameters:{}", JsonUtils.toJson(sqlMessage.getArgs()));
// try {
// Boolean rstBatchSqls = baseDao.batchSqls(sqlMessage.getSqls(), sqlMessage.getArgs());
// log.debug("CONSUMER SQL result of execution:{}", rstBatchSqls);
// if (rstBatchSqls) {
// channel.basicAck(deliveryTag, false);
// } else {
// log.warn("###CONSUMER SQL[consumerQueue:{},deliveryTag={},sqlMessage:{}]", consumerQueue, deliveryTag, JsonUtils.toJson(sqlMessage));
// channel.basicAck(deliveryTag, false);
// }
// } catch (IOException e) {
// log.error("CONSUMER SQL[consumerQueue:{},deliveryTag:{},sqlMessage:{}]", consumerQueue, deliveryTag, JsonUtils.toJson(sqlMessage), e);
// }
// }
//}
package com.liquidnet.service.consumer.galaxy.service.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* @author AnJiabin <anjiabin@zhengzai.tv>
* @version V1.0
* @Description: TODO
* @class: ConsumerRefundProcessor
* @Package com.liquidnet.service.consumer.dragon.service.processor
* @Copyright: LightNet @ Copyright (c) 2021
* @date 2021/7/8 10:59
*/
@Slf4j
@Component
public class ConsumerRefundProcessor implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("接受到来自redis REFUND 的消息");
System.out.println("message id "+message.getId());
System.out.println("stream "+message.getStream());
System.out.println("body "+message.getValue());
}
}
# begin-dev-这里是配置信息基本值
liquidnet:
cloudConfig:
profile: dev
security:
username: user
password: user123
eureka:
host: 127.0.0.1:7001
# end-dev-这里是配置信息基本值
spring:
profiles:
include: service-consumer-galaxy
# begin-prod-这里是配置信息基本值
liquidnet:
cloudConfig:
profile: prod
security:
username: user
password: user123
eureka:
host: 172.17.207.189:7001
# end-prod-这里是配置信息基本值
spring:
profiles:
include: service-consumer-galaxy
\ No newline at end of file
#eurekaServer配置
eureka:
client:
register-with-eureka: true
fetch-registry: true
serviceUrl:
defaultZone: http://${liquidnet.security.username}:${liquidnet.security.password}@${liquidnet.eureka.host}/eureka-server/eureka
#configServer配置
spring:
cloud:
config:
# uri: http://39.106.122.201:7002/support-config
profile: ${liquidnet.cloudConfig.profile}
name: ${spring.application.name} #默认为spring.application.name
discovery:
enabled: true
service-id: liquidnet-support-config
# begin-test-这里是配置信息基本值
liquidnet:
cloudConfig:
profile: test
security:
username: user
password: user123
eureka:
host: 172.17.207.177:7001
#instance:
# prefer-ip-address: true
#host: eureka-test-0.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-1.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka,eureka-test-2.eureka-test-svc.zhengzai-test:7001/eureka-server/eureka
#host: 192.168.193.41:7001
# end-test-这里是配置信息基本值
spring:
profiles:
include: service-consumer-galaxy
\ No newline at end of file
spring:
application:
name: liquidnet-service-consumer-galaxy
profiles:
active: dev
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
<module>liquidnet-service-consumer-sweet</module> <module>liquidnet-service-consumer-sweet</module>
<module>liquidnet-service-consumer-stone</module> <module>liquidnet-service-consumer-stone</module>
<module>liquidnet-service-consumer-candy</module> <module>liquidnet-service-consumer-candy</module>
<module>liquidnet-service-consumer-galaxy</module>
</modules> </modules>
<dependencies> <dependencies>
......
create table galaxy_user_info
(
mid bigint unsigned NOT NULL AUTO_INCREMENT,
user_id varchar(200) NOT NULL COMMENT '用户ID',
user_name varchar(200) NOT NULL COMMENT '用户姓名',
user_type varchar(3) NOT NULL DEFAULT '2' COMMENT '1企业 2个人',
mobile varchar(20) COMMENT '移动手机号',
id_card_type varchar(3) NOT NULL COMMENT '证件类型( 1-身份证 2-护照 3-港澳通行证 4-台湾通行证 5-外国人永居身份证 6-港澳台居民居住证 7-其它)',
id_card varchar(50) NOT NULL COMMENT '证件号码',
mnemonic varchar(200) COMMENT '助记词',
index varchar(3) NOT NULL DEFAULT '0' COMMENT '助记词索引',
user_identification varchar(200) COMMENT '用户链上唯一ID',
user_pub_key varchar(200) COMMENT '用户公钥',
user_pri_key varchar(200) COMMENT '用户私钥',
block_chain_address varchar(200) COMMENT '区块链地址',
router_type varchar(200) NOT NULL COMMENT '路由类型(zxinchain、eth、antchain)',
created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`mid`),
) ENGINE = InnoDB comment '数字藏品用户信息';
create table galaxy_series_info
(
mid bigint unsigned NOT NULL AUTO_INCREMENT,
sku_id varchar(200) NOT NULL COMMENT '本地系列唯一标识id',
series_name varchar(20) NOT NULL COMMENT '本地系列唯一名称(前缀+skuid)',
series_id varchar(3) COMMENT '区块链上系列ID',
total_count varchar(50) COMMENT '系列发行总量',
crt_count varchar(200) COMMENT '系列已发行个数',
original_nft_url varchar(3) COMMENT 'nft素材原始地址',
original_display_url varchar(200) COMMENT 'nft显示素材原始地址',
series_claim_task_id varchar(200) COMMENT '系列声明任务ID',
series_claim_status varchar(200) COMMENT '系列声明状态',
author varchar(200) COMMENT '作者名',
nft_name varchar(200) COMMENT 'nft名字(sku名称)',
nft_url varchar(600) COMMENT 'nft素材地址',
display_url varchar(600) COMMENT '预览图url,不超过1024个字符',
nft_desc varchar(400) COMMENT 'nft简介,500个字符以内',
nft_flag varchar(200) COMMENT '标签,【文创】,游戏,动漫,30个字符以内',
sell_count varchar(200) COMMENT '可售状态下有意义',
cover_url varchar(200) COMMENT '系列显示封面',
series_desc varchar(200) COMMENT '系列描述信息,不超过500个字符',
nft_hash varchar(200) COMMENT ' 系列声明中nftHash',
router_type varchar(200) NOT NULL COMMENT '路由类型(zxinchain、eth、antchain)',
created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`mid`),
) ENGINE = InnoDB comment '数字藏品系列信息';
create table galaxy_nft_order_info
(
mid bigint unsigned NOT NULL AUTO_INCREMENT,
nft_order_pay_id varchar(20) COMMENT '订单支付唯一ID',
user_id varchar(200) NOT NULL COMMENT '用户ID',
sku_id varchar(200) NOT NULL COMMENT '应用系统购买系列唯一ID',
series_name varchar(20) NOT NULL COMMENT '系列的唯一名称(前缀+skuid)',
series_id varchar(3) COMMENT '系列的唯一Id',
nft_id varchar(200) COMMENT 'nftId',
nft_price varchar(50) COMMENT 'nft购买价格',
from_address varchar(200) COMMENT '转出方地址',
to_address varchar(200) COMMENT '转入方地址',
nft_publish_task_id varchar(100) COMMENT 'nft发行任务id',
nft_buy_task_id varchar(100) COMMENT 'nft购买任务id',
nft_buy_pay_task_id varchar(100) COMMENT 'nft购买支付任务id',
router_type varchar(20) NOT NULL COMMENT '路由类型(zxinchain、eth、antchain)',
created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`mid`),
) ENGINE = InnoDB comment '数字藏品订单信息';
create table galaxy_nft_trade_info
(
mid bigint unsigned NOT NULL AUTO_INCREMENT,
nft_id varchar(200) COMMENT 'nftId',
series_id varchar(3) COMMENT '系列的唯一Id',
trade_hash varchar(200) COMMENT '交易hash',
chain_timestamp varchar(20) COMMENT '链上交易时间戳',
from_address varchar(200) COMMENT '转出方地址',
to_address varchar(200) COMMENT '转入方地址',
trade_price bigint COMMENT '交易价格',
trade_type varchar(3) COMMENT '交易类别 1发行 2购买 3转移 4设置价格 5设置状态',
created_at timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`mid`),
) ENGINE = InnoDB comment '数字藏品交易信息';
\ No newline at end of file
...@@ -6,8 +6,8 @@ liquidnet: ...@@ -6,8 +6,8 @@ liquidnet:
username: user username: user
password: user123 password: user123
eureka: eureka:
host: 172.17.192.42:7001 # host: 172.17.192.42:7001
# host: 127.0.0.1:7001 host: 127.0.0.1:7001
# end-dev-这里是配置信息基本值 # end-dev-这里是配置信息基本值
spring: spring:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment