计入队列执行任务
This commit is contained in:
parent
8c7e12e063
commit
a81ca0a5bb
|
|
@ -0,0 +1,195 @@
|
|||
package com.ruoyi.kettle.config;
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.lettuce.core.cluster.ClusterClientOptions;
|
||||
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.RedisNode;
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
||||
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
|
||||
import org.springframework.data.redis.serializer.StringRedisSerializer;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.JedisPoolConfig;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
/**
|
||||
* @author zh
|
||||
* @date 2020/12/1 16:49
|
||||
*/
|
||||
@Configuration
|
||||
public class RedisConfig {
|
||||
/** redis 服务器地址 */
|
||||
@Value("${spring.redis.host}")
|
||||
private String host;
|
||||
|
||||
/** redis 端口号 */
|
||||
@Value("${spring.redis.port}")
|
||||
private int port;
|
||||
|
||||
/** redis 服务器密码 */
|
||||
@Value("${spring.redis.password}")
|
||||
private String password;
|
||||
|
||||
/** redis 连接池最大连接数(使用负值无限制) */
|
||||
@Value("${spring.redis.lettuce.pool.max-active}")
|
||||
private int maxActive;
|
||||
|
||||
/** redis 连接池最大空闲数 */
|
||||
@Value("${spring.redis.lettuce.pool.max-idle}")
|
||||
private int maxIdle;
|
||||
|
||||
/** redis 连接池小空闲数 */
|
||||
@Value("${spring.redis.lettuce.pool.min-idle}")
|
||||
private int minIdle;
|
||||
|
||||
/** redis 连接池最大阻塞等待时间(负值无限制) */
|
||||
@Value("${spring.redis.lettuce.pool.max-wait}")
|
||||
private int maxWait;
|
||||
|
||||
/** redis 数据库索引(默认0) */
|
||||
@Value("${spring.redis.database}")
|
||||
private int database;
|
||||
|
||||
/** redis 超时时间 */
|
||||
@Value("${spring.redis.timeout}")
|
||||
private int timeout;
|
||||
|
||||
@Autowired
|
||||
private RedisProperties redisProperties;
|
||||
|
||||
|
||||
//这是固定的模板
|
||||
//自己定义了一个RedisTemplate
|
||||
@Bean
|
||||
@SuppressWarnings("all")
|
||||
public RedisTemplate<String, Object> redisTemplate(@Qualifier("lettuceConnectionFactoryUvPv") RedisConnectionFactory factory) {
|
||||
RedisTemplate<String, Object> template = new RedisTemplate<>();
|
||||
template.setConnectionFactory(factory);
|
||||
|
||||
//Json序列化配置
|
||||
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
|
||||
ObjectMapper om = new ObjectMapper();
|
||||
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
|
||||
om.activateDefaultTyping(om.getPolymorphicTypeValidator());
|
||||
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
|
||||
//解决序列化问题
|
||||
om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
jackson2JsonRedisSerializer.setObjectMapper(om);
|
||||
|
||||
//String的序列化
|
||||
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
|
||||
|
||||
//key采用String的序列化方式
|
||||
template.setKeySerializer(stringRedisSerializer);
|
||||
//hash的key也采用String的序列化方式
|
||||
template.setHashKeySerializer(stringRedisSerializer);
|
||||
|
||||
//value序列化方式采用jackson
|
||||
template.setValueSerializer(jackson2JsonRedisSerializer);
|
||||
|
||||
//hash的value序列化方式采用jackson
|
||||
template.setHashValueSerializer(jackson2JsonRedisSerializer);
|
||||
template.afterPropertiesSet();
|
||||
|
||||
return template;
|
||||
}
|
||||
|
||||
/**
|
||||
* 为RedisTemplate配置Redis连接工厂实现
|
||||
* LettuceConnectionFactory实现了RedisConnectionFactory接口
|
||||
* UVPV用Redis
|
||||
*
|
||||
* @return 返回LettuceConnectionFactory
|
||||
*/
|
||||
@Bean(destroyMethod = "destroy")
|
||||
//这里要注意的是,在构建LettuceConnectionFactory 时,如果不使用内置的destroyMethod,可能会导致Redis连接早于其它Bean被销毁
|
||||
public LettuceConnectionFactory lettuceConnectionFactoryUvPv() throws Exception {
|
||||
|
||||
// List<String> clusterNodes = redisProperties.getCluster().getNodes();
|
||||
// Set<RedisNode> nodes = new HashSet<>();
|
||||
// clusterNodes.forEach(address -> nodes.add(new RedisNode(address.split(":")[0].trim(), Integer.parseInt(address.split(":")[1]))));
|
||||
// RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration();
|
||||
// clusterConfiguration.setClusterNodes(nodes);
|
||||
// clusterConfiguration.setPassword(RedisPassword.of(redisProperties.getPassword()));
|
||||
// clusterConfiguration.setMaxRedirects(redisProperties.getCluster().getMaxRedirects());
|
||||
|
||||
//我使用的是单机redis,集群使用上面注释的代码
|
||||
Set<RedisNode> nodes = new HashSet<>();
|
||||
nodes.add(new RedisNode(redisProperties.getHost(), redisProperties.getPort()));
|
||||
|
||||
|
||||
RedisStandaloneConfiguration redisStandaloneConfiguration=new RedisStandaloneConfiguration();
|
||||
redisStandaloneConfiguration.setHostName(redisProperties.getHost());
|
||||
redisStandaloneConfiguration.setPassword(redisProperties.getPassword());
|
||||
redisStandaloneConfiguration.setDatabase(redisProperties.getDatabase());
|
||||
redisStandaloneConfiguration.setPort(redisProperties.getPort());
|
||||
|
||||
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
|
||||
poolConfig.setMaxIdle(redisProperties.getLettuce().getPool().getMaxIdle());
|
||||
poolConfig.setMinIdle(redisProperties.getLettuce().getPool().getMinIdle());
|
||||
poolConfig.setMaxTotal(redisProperties.getLettuce().getPool().getMaxActive());
|
||||
|
||||
return new LettuceConnectionFactory(redisStandaloneConfiguration, getLettuceClientConfiguration(poolConfig));
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置LettuceClientConfiguration 包括线程池配置和安全项配置
|
||||
*
|
||||
* @param genericObjectPoolConfig common-pool2线程池
|
||||
* @return lettuceClientConfiguration
|
||||
*/
|
||||
private LettuceClientConfiguration getLettuceClientConfiguration(GenericObjectPoolConfig genericObjectPoolConfig) {
|
||||
/*
|
||||
ClusterTopologyRefreshOptions配置用于开启自适应刷新和定时刷新。如自适应刷新不开启,Redis集群变更时将会导致连接异常!
|
||||
*/
|
||||
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
|
||||
//开启自适应刷新
|
||||
//.enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT, ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)
|
||||
//开启所有自适应刷新,MOVED,ASK,PERSISTENT都会触发
|
||||
.enableAllAdaptiveRefreshTriggers()
|
||||
// 自适应刷新超时时间(默认30秒)
|
||||
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(25)) //默认关闭开启后时间为30秒
|
||||
// 开周期刷新
|
||||
.enablePeriodicRefresh(Duration.ofSeconds(20)) // 默认关闭开启后时间为60秒 ClusterTopologyRefreshOptions.DEFAULT_REFRESH_PERIOD 60 .enablePeriodicRefresh(Duration.ofSeconds(2)) = .enablePeriodicRefresh().refreshPeriod(Duration.ofSeconds(2))
|
||||
.build();
|
||||
return LettucePoolingClientConfiguration.builder()
|
||||
.poolConfig(genericObjectPoolConfig)
|
||||
.clientOptions(ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build())
|
||||
//将appID传入连接,方便Redis监控中查看
|
||||
//.clientName(appName + "_lettuce")
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public JedisPool jedisPool() {
|
||||
JedisPool jedisPool = new JedisPool(getRedisConfig(), host, port, timeout,password);
|
||||
return jedisPool;
|
||||
}
|
||||
@Bean
|
||||
public JedisPoolConfig getRedisConfig(){
|
||||
JedisPoolConfig config = new JedisPoolConfig();
|
||||
config.setMaxTotal(maxActive);
|
||||
config.setMaxIdle(maxIdle);
|
||||
config.setMinIdle(minIdle);
|
||||
config.setMaxWaitMillis(maxWait);
|
||||
return config;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -162,9 +162,9 @@ public class KettleTransController extends BaseController
|
|||
@RequiresPermissions("kettle:trans:run")
|
||||
@PostMapping("/run")
|
||||
@ResponseBody
|
||||
public AjaxResult run(KettleTrans trans)
|
||||
public AjaxResult runToQueue(KettleTrans trans)
|
||||
{
|
||||
AjaxResult result = kettleTransService.run(trans);
|
||||
AjaxResult result = kettleTransService.runToQueue(trans);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,105 @@
|
|||
package com.ruoyi.kettle.controller;
|
||||
|
||||
import com.ruoyi.common.core.controller.BaseController;
|
||||
import com.ruoyi.common.core.domain.AjaxResult;
|
||||
import com.ruoyi.common.core.page.TableDataInfo;
|
||||
import com.ruoyi.kettle.domain.KettleJob;
|
||||
import org.apache.shiro.authz.annotation.RequiresPermissions;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.connection.stream.*;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.format.annotation.DateTimeFormat;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.StreamEntry;
|
||||
import redis.clients.jedis.StreamEntryID;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Controller
|
||||
@RequestMapping("/redis/stream")
|
||||
public class RedisStreamController extends BaseController {
|
||||
String koneConsumer="koneConsumer";
|
||||
|
||||
String koneStream = "koneStream2";
|
||||
|
||||
String koneGroup= "koneGroup2";
|
||||
int i=1;
|
||||
@Autowired
|
||||
private RedisTemplate redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private JedisPool jedisPool;
|
||||
|
||||
|
||||
|
||||
|
||||
@GetMapping("/createComsumer")
|
||||
public AjaxResult createCrgoup() {
|
||||
Jedis jedis = jedisPool.getResource();
|
||||
//String key, String groupname, StreamEntryID id, boolean makeStream
|
||||
/**
|
||||
* key为stream name, group为消费组,id为上次读取的位置,如果空则重新读取,makeStream是否创建流,已有的话就不用创建
|
||||
*/
|
||||
System.out.println(jedis.xgroupCreate(koneStream, koneGroup, null,true));
|
||||
return AjaxResult.success();
|
||||
|
||||
}
|
||||
@GetMapping("/add")
|
||||
@ResponseBody
|
||||
public AjaxResult addMessage() {
|
||||
//这里可以添加更多的属性
|
||||
Map map = new HashMap();
|
||||
map.put("date"+i++, System.currentTimeMillis() + "");
|
||||
Jedis jedis = jedisPool.getResource();
|
||||
|
||||
jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
|
||||
return AjaxResult.success();
|
||||
}
|
||||
@ResponseBody
|
||||
@GetMapping("/read")
|
||||
public AjaxResult readGroup() {
|
||||
Jedis jedis = jedisPool.getResource();
|
||||
Map<String,StreamEntryID> t = new HashMap();
|
||||
t.put(koneStream, null);//null 则为 > 重头读起,也可以为$接受新消息,还可以是上一次未读完的消息id
|
||||
Map.Entry e = null;
|
||||
for(Map.Entry c:t.entrySet()){
|
||||
e=c;
|
||||
}
|
||||
//noAck为false的话需要手动ack,true则自动ack. commsumer新建的方式为xreadgroup。
|
||||
System.out.println("开始:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms")));
|
||||
List<Map.Entry<String, StreamEntryID>> list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 0, false, e);
|
||||
System.out.println("结束:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms")));
|
||||
if(list ==null){
|
||||
System.out.println("list为空");
|
||||
return AjaxResult.error("list为空");
|
||||
}
|
||||
for (Map.Entry m : list) {
|
||||
System.out.println(m.getKey() + "---" + m.getValue().getClass());
|
||||
if (m.getValue() instanceof ArrayList) {
|
||||
List<StreamEntry> l = (List) m.getValue();
|
||||
Map<String, String> result = l.get(0).getFields();
|
||||
for (Map.Entry entry : result.entrySet()) {
|
||||
System.out.println(entry.getKey() + "---" + entry.getValue());
|
||||
}
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
} catch (InterruptedException e1) {
|
||||
e1.printStackTrace();
|
||||
}
|
||||
jedis.xack(koneStream, koneGroup, l.get(0).getID());
|
||||
System.out.println("消息消费成功");
|
||||
}
|
||||
}
|
||||
return AjaxResult.success();
|
||||
}
|
||||
}
|
||||
|
|
@ -68,4 +68,6 @@ public interface IKettleJobService
|
|||
Long checkQuartzExist(String checkStr);
|
||||
|
||||
public AjaxResult runJobQuartz(String id, String jobName);
|
||||
|
||||
void runJobRightNow(Long valueOf);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ public interface IKettleTransService
|
|||
* @param trans :
|
||||
* @return: void
|
||||
**/
|
||||
AjaxResult run(KettleTrans trans);
|
||||
AjaxResult runToQueue(KettleTrans trans);
|
||||
|
||||
List<String> queryTransLog(KettleTrans trans) ;
|
||||
/**
|
||||
|
|
@ -81,4 +81,6 @@ public interface IKettleTransService
|
|||
public AjaxResult runTransQuartz(String id,String transName);
|
||||
|
||||
Long checkQuartzExist(String checkStr);
|
||||
|
||||
void runTransRightNow(Long valueOf);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,9 @@ import com.ruoyi.kettle.domain.KettleTrans;
|
|||
import com.ruoyi.kettle.domain.XRepository;
|
||||
import com.ruoyi.kettle.mapper.XRepositoryMapper;
|
||||
import com.ruoyi.kettle.tools.KettleUtil;
|
||||
import com.ruoyi.kettle.tools.RedisStreamUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import com.ruoyi.kettle.mapper.KettleJobMapper;
|
||||
|
|
@ -28,6 +31,7 @@ import com.ruoyi.common.core.text.Convert;
|
|||
@Service("kettleJobServiceImpl")
|
||||
public class KettleJobServiceImpl implements IKettleJobService
|
||||
{
|
||||
private static final Logger log = LoggerFactory.getLogger(KettleJobServiceImpl.class);
|
||||
@Autowired
|
||||
private KettleJobMapper kettleJobMapper;
|
||||
@Autowired
|
||||
|
|
@ -36,6 +40,9 @@ public class KettleJobServiceImpl implements IKettleJobService
|
|||
|
||||
@Autowired
|
||||
private KettleUtil kettleUtil;
|
||||
|
||||
@Autowired
|
||||
private RedisStreamUtil redisStreamUtil;
|
||||
/**
|
||||
* 查询作业调度
|
||||
*
|
||||
|
|
@ -84,11 +91,14 @@ public class KettleJobServiceImpl implements IKettleJobService
|
|||
}
|
||||
String userName = (String) PermissionUtils.getPrincipalProperty("userName");
|
||||
if(kettleJob.getRoleKey()==null){
|
||||
kettleJob.setRoleKey("admin");
|
||||
kettleJob.setRoleKey("admin,bpsadmin");
|
||||
}else{
|
||||
if(!kettleJob.getRoleKey().contains("admin")){
|
||||
kettleJob.setRoleKey(kettleJob.getRoleKey().concat(",admin"));
|
||||
}
|
||||
if(!kettleJob.getRoleKey().contains("bpsadmin")){
|
||||
kettleJob.setRoleKey(kettleJob.getRoleKey().concat(",bpsadmin"));
|
||||
}
|
||||
}
|
||||
kettleJob.setCreatedBy(userName);
|
||||
kettleJob.setUpdateBy(userName);
|
||||
|
|
@ -105,7 +115,21 @@ public class KettleJobServiceImpl implements IKettleJobService
|
|||
@Override
|
||||
public int updateKettleJob(KettleJob kettleJob)
|
||||
{
|
||||
String userName = (String) PermissionUtils.getPrincipalProperty("userName");
|
||||
|
||||
kettleJob.setUpdateTime(DateUtils.getNowDate());
|
||||
kettleJob.setUpdateBy(userName);
|
||||
kettleJob.setJobType("File");
|
||||
if(kettleJob.getRoleKey()==null){
|
||||
kettleJob.setRoleKey("admin,bpsadmin");
|
||||
}else{
|
||||
if(!kettleJob.getRoleKey().contains("admin")){
|
||||
kettleJob.setRoleKey(kettleJob.getRoleKey().concat(",admin"));
|
||||
}
|
||||
if(!kettleJob.getRoleKey().contains("bpsadmin")){
|
||||
kettleJob.setRoleKey(kettleJob.getRoleKey().concat(",bpsadmin"));
|
||||
}
|
||||
}
|
||||
return kettleJobMapper.updateKettleJob(kettleJob);
|
||||
}
|
||||
|
||||
|
|
@ -144,6 +168,43 @@ public class KettleJobServiceImpl implements IKettleJobService
|
|||
if(repository==null){
|
||||
return AjaxResult.error("资源库不存在!");
|
||||
}
|
||||
//加入队列中,等待执行
|
||||
redisStreamUtil.addKettleJob(kettleJob);
|
||||
//更新一下状态
|
||||
kettleJob.setJobStatus("等待中");
|
||||
kettleJobMapper.updateKettleJob(kettleJob);
|
||||
return AjaxResult.success("已加入执行队列,请等待运行结果通知!");
|
||||
// String path = kettleJob.getJobPath();
|
||||
// try {
|
||||
// kettleUtil.KETTLE_LOG_LEVEL=kettleJob.getJobLogLevel();
|
||||
// kettleUtil.KETTLE_REPO_ID=String.valueOf(kettleJob.getJobRepositoryId());
|
||||
// kettleUtil.KETTLE_REPO_NAME=repository.getRepoName();
|
||||
// kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir();
|
||||
// kettleUtil.callJob(path,kettleJob.getJobName(),null,null);
|
||||
// } catch (Exception e) {
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
//
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runJobRightNow(Long id) {
|
||||
KettleJob kettleJob = kettleJobMapper.selectKettleJobById(id);
|
||||
if(kettleJob ==null){
|
||||
log.error("作业不存在!");
|
||||
return;
|
||||
}
|
||||
XRepository repository=repositoryMapper.selectXRepositoryById(kettleJob.getJobRepositoryId());
|
||||
if(repository==null){
|
||||
log.error("资源库不存在!");
|
||||
return;
|
||||
}
|
||||
//加入队列中,等待执行
|
||||
redisStreamUtil.addKettleJob(kettleJob);
|
||||
//更新一下状态
|
||||
kettleJob.setJobStatus("运行中");
|
||||
kettleJobMapper.updateKettleJob(kettleJob);
|
||||
String path = kettleJob.getJobPath();
|
||||
try {
|
||||
kettleUtil.KETTLE_LOG_LEVEL=kettleJob.getJobLogLevel();
|
||||
|
|
@ -151,13 +212,15 @@ public class KettleJobServiceImpl implements IKettleJobService
|
|||
kettleUtil.KETTLE_REPO_NAME=repository.getRepoName();
|
||||
kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir();
|
||||
kettleUtil.callJob(path,kettleJob.getJobName(),null,null);
|
||||
kettleJob.setJobStatus("已结束");
|
||||
kettleJobMapper.updateKettleJob(kettleJob);
|
||||
} catch (Exception e) {
|
||||
kettleJob.setJobStatus("异常");
|
||||
kettleJobMapper.updateKettleJob(kettleJob);
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
return AjaxResult.success("执行成功!"); }
|
||||
|
||||
}
|
||||
@Override
|
||||
public List<String> queryJobLog(KettleJob kettleJob) {
|
||||
List<String> logs=kettleJobMapper.queryJobLog(kettleJob.getJobName());
|
||||
|
|
@ -173,4 +236,5 @@ public class KettleJobServiceImpl implements IKettleJobService
|
|||
KettleJob kettleJob = kettleJobMapper.selectKettleJobById(Long.valueOf(id));
|
||||
return run(kettleJob);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,9 @@ import com.ruoyi.kettle.domain.XRepository;
|
|||
import com.ruoyi.kettle.mapper.XRepositoryMapper;
|
||||
import com.ruoyi.kettle.service.IKettleTransService;
|
||||
import com.ruoyi.kettle.tools.KettleUtil;
|
||||
import com.ruoyi.kettle.tools.RedisStreamUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import com.ruoyi.kettle.mapper.KettleTransMapper;
|
||||
|
|
@ -27,6 +30,8 @@ import com.ruoyi.common.core.text.Convert;
|
|||
@Service("kettleTransServiceImpl")
|
||||
public class KettleTransServiceImpl implements IKettleTransService
|
||||
{
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(KettleTransServiceImpl.class);
|
||||
@Autowired
|
||||
private KettleTransMapper kettleTransMapper;
|
||||
@Autowired
|
||||
|
|
@ -35,6 +40,9 @@ public class KettleTransServiceImpl implements IKettleTransService
|
|||
@Autowired
|
||||
private KettleUtil kettleUtil;
|
||||
|
||||
@Autowired
|
||||
private RedisStreamUtil redisStreamUtil;
|
||||
|
||||
/**
|
||||
* 查询转换
|
||||
*
|
||||
|
|
@ -86,11 +94,14 @@ public class KettleTransServiceImpl implements IKettleTransService
|
|||
}
|
||||
String userName = (String) PermissionUtils.getPrincipalProperty("userName");
|
||||
if(kettleTrans.getRoleKey()==null){
|
||||
kettleTrans.setRoleKey("admin");
|
||||
kettleTrans.setRoleKey("admin,bpsadmin");
|
||||
}else{
|
||||
if(!kettleTrans.getRoleKey().contains("admin")){
|
||||
kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",admin"));
|
||||
}
|
||||
if(!kettleTrans.getRoleKey().contains("bpsadmin")){
|
||||
kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",bpsadmin"));
|
||||
}
|
||||
}
|
||||
kettleTrans.setCreatedBy(userName);
|
||||
kettleTrans.setUpdateBy(userName);
|
||||
|
|
@ -112,11 +123,14 @@ public class KettleTransServiceImpl implements IKettleTransService
|
|||
kettleTrans.setUpdateTime(DateUtils.getNowDate());
|
||||
kettleTrans.setTransType("File");
|
||||
if(kettleTrans.getRoleKey()==null){
|
||||
kettleTrans.setRoleKey("admin");
|
||||
kettleTrans.setRoleKey("admin,bpsadmin");
|
||||
}else{
|
||||
if(!kettleTrans.getRoleKey().contains("admin")){
|
||||
kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",admin"));
|
||||
}
|
||||
if(!kettleTrans.getRoleKey().contains("bpsadmin")){
|
||||
kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",bpsadmin"));
|
||||
}
|
||||
} return kettleTransMapper.updateKettleTrans(kettleTrans);
|
||||
}
|
||||
|
||||
|
|
@ -146,23 +160,46 @@ public class KettleTransServiceImpl implements IKettleTransService
|
|||
|
||||
|
||||
/**
|
||||
* @Description:立即执行一次转换
|
||||
* @Description:立即执行一次转换,放到redis队列中
|
||||
* @Author: Kone.wang
|
||||
* @Date: 2021/7/15 14:31
|
||||
* @param trans :
|
||||
* @return: void
|
||||
**/
|
||||
@Override
|
||||
public AjaxResult run(KettleTrans trans) {
|
||||
public AjaxResult runToQueue(KettleTrans trans) {
|
||||
Long id = trans.getId();
|
||||
KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(id);
|
||||
if(kettleTrans ==null){
|
||||
if(kettleTrans ==null || kettleTrans.getId()==null){
|
||||
return AjaxResult.error("转换不存在!");
|
||||
}
|
||||
XRepository repository=repositoryMapper.selectXRepositoryById(kettleTrans.getTransRepositoryId());
|
||||
if(repository==null){
|
||||
return AjaxResult.error("资源库不存在!");
|
||||
}
|
||||
//加入队列中,等待执行
|
||||
redisStreamUtil.addKettleTrans(kettleTrans);
|
||||
//更新一下状态
|
||||
trans.setTransStatus("等待中");
|
||||
kettleTransMapper.updateKettleTrans(trans);
|
||||
return AjaxResult.success("已加入执行队列,请等待运行结果通知!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runTransRightNow(Long id) {
|
||||
KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(id);
|
||||
if(kettleTrans ==null || kettleTrans.getId()==null){
|
||||
log.error("转换不存在!:"+id);
|
||||
return;
|
||||
}
|
||||
XRepository repository=repositoryMapper.selectXRepositoryById(kettleTrans.getTransRepositoryId());
|
||||
if(repository==null){
|
||||
log.error("资源库不存在!");
|
||||
return;
|
||||
}
|
||||
//更新状态未运行中
|
||||
kettleTrans.setTransStatus("运行中");
|
||||
kettleTransMapper.updateKettleTrans(kettleTrans);
|
||||
String path = kettleTrans.getTransPath();
|
||||
try {
|
||||
kettleUtil.KETTLE_LOG_LEVEL=kettleTrans.getTransLogLevel();
|
||||
|
|
@ -170,12 +207,13 @@ public class KettleTransServiceImpl implements IKettleTransService
|
|||
kettleUtil.KETTLE_REPO_NAME=repository.getRepoName();
|
||||
kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir();
|
||||
kettleUtil.callTrans(path,kettleTrans.getTransName(),null,null);
|
||||
kettleTrans.setTransStatus("已结束");
|
||||
kettleTransMapper.updateKettleTrans(kettleTrans);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
kettleTrans.setTransStatus("异常");
|
||||
kettleTransMapper.updateKettleTrans(kettleTrans);
|
||||
log.error(id+"的trans执行失败:"+e.getMessage());
|
||||
}
|
||||
|
||||
|
||||
return AjaxResult.success("执行成功!");
|
||||
}
|
||||
/**
|
||||
* @Description:查询抓换执行日志
|
||||
|
|
@ -200,7 +238,7 @@ public class KettleTransServiceImpl implements IKettleTransService
|
|||
@Override
|
||||
public AjaxResult runTransQuartz(String id, String transName) {
|
||||
KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(Long.valueOf(id));
|
||||
return run(kettleTrans);
|
||||
return runToQueue(kettleTrans);
|
||||
}
|
||||
/**
|
||||
* @Description:检查该转换是否设置了定时任务
|
||||
|
|
@ -214,4 +252,5 @@ public class KettleTransServiceImpl implements IKettleTransService
|
|||
|
||||
return kettleTransMapper.checkQuartzExist(checkStr);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
package com.ruoyi.kettle.tools;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class CommandLineRunnerImpl implements CommandLineRunner {
|
||||
|
||||
@Autowired
|
||||
private RedisStreamUtil redisStreamUtil;
|
||||
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
redisStreamUtil.readGroup();
|
||||
}
|
||||
}
|
||||
|
|
@ -22,6 +22,8 @@ import org.springframework.stereotype.Component;
|
|||
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
public class KettleUtil {
|
||||
public static final Logger log = LoggerFactory.getLogger(KettleUtil.class);
|
||||
|
|
@ -43,7 +45,7 @@ public class KettleUtil {
|
|||
public void callTrans(String transPath, String transName, Map<String,String> namedParams, String[] clParams) throws Exception {
|
||||
KettleEnv.init();
|
||||
DatabaseMeta databaseMeta=new DatabaseMeta("kettle_trans_log", "mysql", "Native(JDBC)",
|
||||
"xxx.xxx.x.xx","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "password");
|
||||
"192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123");
|
||||
|
||||
String msg;
|
||||
KettleFileRepository repo = this.fileRepositoryCon();
|
||||
|
|
@ -87,6 +89,7 @@ public class KettleUtil {
|
|||
log.error(msg);
|
||||
throw new Exception(msg);
|
||||
}
|
||||
TimeUnit.SECONDS.sleep(10);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -98,7 +101,7 @@ public class KettleUtil {
|
|||
KettleEnv.init();
|
||||
String msg;
|
||||
DatabaseMeta databaseMeta=new DatabaseMeta("kettle_job_log", "mysql", "Native(JDBC)",
|
||||
"xxx.xxx.x.xx","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "password");
|
||||
"192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123");
|
||||
KettleFileRepository repo = this.fileRepositoryCon();
|
||||
JobMeta jobMeta = this.loadJob(repo, jobPath, jobName);
|
||||
jobMeta.addDatabase(databaseMeta);
|
||||
|
|
@ -155,13 +158,6 @@ public class KettleUtil {
|
|||
}
|
||||
return transMeta;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 加载job
|
||||
* @param repo kettle文件资源库
|
||||
|
|
|
|||
|
|
@ -0,0 +1,141 @@
|
|||
package com.ruoyi.kettle.tools;
|
||||
|
||||
import com.ruoyi.common.core.domain.AjaxResult;
|
||||
import com.ruoyi.kettle.domain.KettleJob;
|
||||
import com.ruoyi.kettle.domain.KettleTrans;
|
||||
import com.ruoyi.kettle.service.IKettleJobService;
|
||||
import com.ruoyi.kettle.service.IKettleTransService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.StreamEntry;
|
||||
import redis.clients.jedis.StreamEntryID;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
public class RedisStreamUtil {
|
||||
private static final Logger log = LoggerFactory.getLogger(RedisStreamUtil.class);
|
||||
String koneConsumer="koneConsumer";
|
||||
|
||||
String koneStream = "koneStream2";
|
||||
|
||||
String koneGroup= "koneGroup2";
|
||||
|
||||
@Autowired
|
||||
private JedisPool jedisPool;
|
||||
|
||||
@Autowired
|
||||
private IKettleTransService transService;
|
||||
|
||||
@Autowired
|
||||
private IKettleJobService jobService;
|
||||
|
||||
/**
|
||||
* @Description: 往队列中插入trans
|
||||
* @Author: Kone.wang
|
||||
* @Date: 2021/8/6 13:50
|
||||
* @param trans:
|
||||
* @return: com.ruoyi.common.core.domain.AjaxResult
|
||||
**/
|
||||
public void addKettleTrans(KettleTrans trans) {
|
||||
String transName=trans.getTransName();
|
||||
Long trandId = trans.getId();
|
||||
|
||||
//这里可以添加更多的属性
|
||||
Map map = new HashMap();
|
||||
map.put("trans_"+trandId, transName);
|
||||
Jedis jedis = jedisPool.getResource();
|
||||
|
||||
jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
|
||||
}
|
||||
|
||||
/**
|
||||
* @Description: 往队列中插入job
|
||||
* @Author: Kone.wang
|
||||
* @Date: 2021/8/6 13:50
|
||||
* @param job:
|
||||
* @return: com.ruoyi.common.core.domain.AjaxResult
|
||||
**/
|
||||
public void addKettleJob(KettleJob job) {
|
||||
String jobName=job.getJobName();
|
||||
Long jobId = job.getId();
|
||||
//这里可以添加更多的属性
|
||||
Map map = new HashMap();
|
||||
map.put("job_"+jobId, jobName);
|
||||
Jedis jedis = jedisPool.getResource();
|
||||
|
||||
jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
|
||||
}
|
||||
/**
|
||||
* @Description: 循环重队列中读消息
|
||||
* @Author: Kone.wang
|
||||
* @Date: 2021/8/6 13:50
|
||||
* @return: void
|
||||
**/
|
||||
public void readGroup() {
|
||||
|
||||
while (true){
|
||||
Jedis jedis = jedisPool.getResource();
|
||||
Map<String,StreamEntryID> t = new HashMap();
|
||||
List<Map.Entry<String, StreamEntryID>> list = new ArrayList<>();
|
||||
t.put(koneStream, null);//null 则为 > 重头读起,也可以为$接受新消息,还可以是上一次未读完的消息id
|
||||
Map.Entry e = null;
|
||||
for(Map.Entry c:t.entrySet()){
|
||||
e=c;
|
||||
}
|
||||
//noAck为false的话需要手动ack,true则自动ack. commsumer新建的方式为xreadgroup。
|
||||
System.out.println("开始:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms")));
|
||||
try{
|
||||
list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 3600000, false, e);
|
||||
|
||||
}catch (Exception ex){
|
||||
log.error("超时了!!!!!!!!");
|
||||
}
|
||||
System.out.println("结束:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
|
||||
if(list ==null){
|
||||
log.error("list为空");
|
||||
}else{
|
||||
for (Map.Entry m : list) {
|
||||
if (m.getValue() instanceof ArrayList) {
|
||||
List<StreamEntry> l = (List) m.getValue();
|
||||
Map<String, String> result = l.get(0).getFields();
|
||||
for (Map.Entry entry : result.entrySet()) {
|
||||
System.out.println(entry.getKey() + "---" + entry.getValue());
|
||||
if(entry.getKey() != null){
|
||||
String key = String.valueOf(entry.getKey());
|
||||
String value =String.valueOf(entry.getValue());
|
||||
String id=key.substring(key.indexOf("_")+1);
|
||||
if(key.startsWith("trans_")){
|
||||
log.info(value+"的trans:开始执行");
|
||||
transService.runTransRightNow(Long.valueOf(id));
|
||||
log.info(value+"的trans:结束执行");
|
||||
}else if(key.startsWith("job_")){
|
||||
log.info(value+"的job:开始执行");
|
||||
jobService.runJobRightNow(Long.valueOf(id));
|
||||
log.info(value+"的job:结束执行");
|
||||
}
|
||||
}
|
||||
}
|
||||
jedis.xack(koneStream, koneGroup, l.get(0).getID());
|
||||
System.out.println("消息消费成功");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -75,7 +75,23 @@ spring:
|
|||
base: OU=bp,DC=bpsemi,DC=com
|
||||
username: administrator@bpsemi.com
|
||||
password: Bps@2831!
|
||||
|
||||
redis:
|
||||
host: 127.0.0.1
|
||||
port: 6379
|
||||
password: "2129"
|
||||
timeout: 3600000
|
||||
database: 2
|
||||
lettuce:
|
||||
pool:
|
||||
max-active: 100
|
||||
max-idle: 10
|
||||
min-idle: 0
|
||||
max-wait: 3600000
|
||||
cluster:
|
||||
refresh:
|
||||
adaptive: true
|
||||
#20秒自动刷新一次
|
||||
period: 20
|
||||
# MyBatis
|
||||
mybatis:
|
||||
# 搜索指定包别名
|
||||
|
|
|
|||
Loading…
Reference in New Issue