diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/config/RedisConfig.java b/bps-kettle/src/main/java/com/ruoyi/kettle/config/RedisConfig.java new file mode 100644 index 000000000..dcf2025c2 --- /dev/null +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/config/RedisConfig.java @@ -0,0 +1,195 @@ +package com.ruoyi.kettle.config; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.data.redis.RedisProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisNode; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; + + +/** + * @author zh + * @date 2020/12/1 16:49 + */ +@Configuration +public class RedisConfig { + /** redis 服务器地址 */ + @Value("${spring.redis.host}") + private String host; + + /** redis 端口号 */ + @Value("${spring.redis.port}") + private int port; + + /** redis 服务器密码 */ + @Value("${spring.redis.password}") + private String password; + + /** redis 连接池最大连接数(使用负值无限制) */ + @Value("${spring.redis.lettuce.pool.max-active}") + private int maxActive; + + /** redis 连接池最大空闲数 */ + @Value("${spring.redis.lettuce.pool.max-idle}") + private int maxIdle; + + /** redis 连接池小空闲数 */ + @Value("${spring.redis.lettuce.pool.min-idle}") + private int minIdle; + + /** redis 连接池最大阻塞等待时间(负值无限制) */ + @Value("${spring.redis.lettuce.pool.max-wait}") + private int maxWait; + + /** redis 数据库索引(默认0) */ + @Value("${spring.redis.database}") + private int database; + + /** redis 超时时间 */ + @Value("${spring.redis.timeout}") + private int timeout; + + @Autowired + private RedisProperties redisProperties; + + + //这是固定的模板 + //自己定义了一个RedisTemplate + @Bean + @SuppressWarnings("all") + public RedisTemplate redisTemplate(@Qualifier("lettuceConnectionFactoryUvPv") RedisConnectionFactory factory) { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(factory); + + //Json序列化配置 + Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); + ObjectMapper om = new ObjectMapper(); + om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + om.activateDefaultTyping(om.getPolymorphicTypeValidator()); + om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); + //解决序列化问题 + om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + jackson2JsonRedisSerializer.setObjectMapper(om); + + //String的序列化 + StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); + + //key采用String的序列化方式 + template.setKeySerializer(stringRedisSerializer); + //hash的key也采用String的序列化方式 + template.setHashKeySerializer(stringRedisSerializer); + + //value序列化方式采用jackson + template.setValueSerializer(jackson2JsonRedisSerializer); + + //hash的value序列化方式采用jackson + template.setHashValueSerializer(jackson2JsonRedisSerializer); + template.afterPropertiesSet(); + + return template; + } + + /** + * 为RedisTemplate配置Redis连接工厂实现 + * LettuceConnectionFactory实现了RedisConnectionFactory接口 + * UVPV用Redis + * + * @return 返回LettuceConnectionFactory + */ + @Bean(destroyMethod = "destroy") + //这里要注意的是,在构建LettuceConnectionFactory 时,如果不使用内置的destroyMethod,可能会导致Redis连接早于其它Bean被销毁 + public LettuceConnectionFactory lettuceConnectionFactoryUvPv() throws Exception { + +// List clusterNodes = redisProperties.getCluster().getNodes(); +// Set nodes = new HashSet<>(); +// clusterNodes.forEach(address -> nodes.add(new RedisNode(address.split(":")[0].trim(), Integer.parseInt(address.split(":")[1])))); +// RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(); +// clusterConfiguration.setClusterNodes(nodes); +// clusterConfiguration.setPassword(RedisPassword.of(redisProperties.getPassword())); +// clusterConfiguration.setMaxRedirects(redisProperties.getCluster().getMaxRedirects()); + + //我使用的是单机redis,集群使用上面注释的代码 + Set nodes = new HashSet<>(); + nodes.add(new RedisNode(redisProperties.getHost(), redisProperties.getPort())); + + + RedisStandaloneConfiguration redisStandaloneConfiguration=new RedisStandaloneConfiguration(); + redisStandaloneConfiguration.setHostName(redisProperties.getHost()); + redisStandaloneConfiguration.setPassword(redisProperties.getPassword()); + redisStandaloneConfiguration.setDatabase(redisProperties.getDatabase()); + redisStandaloneConfiguration.setPort(redisProperties.getPort()); + + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMaxIdle(redisProperties.getLettuce().getPool().getMaxIdle()); + poolConfig.setMinIdle(redisProperties.getLettuce().getPool().getMinIdle()); + poolConfig.setMaxTotal(redisProperties.getLettuce().getPool().getMaxActive()); + + return new LettuceConnectionFactory(redisStandaloneConfiguration, getLettuceClientConfiguration(poolConfig)); + } + + /** + * 配置LettuceClientConfiguration 包括线程池配置和安全项配置 + * + * @param genericObjectPoolConfig common-pool2线程池 + * @return lettuceClientConfiguration + */ + private LettuceClientConfiguration getLettuceClientConfiguration(GenericObjectPoolConfig genericObjectPoolConfig) { + /* + ClusterTopologyRefreshOptions配置用于开启自适应刷新和定时刷新。如自适应刷新不开启,Redis集群变更时将会导致连接异常! + */ + ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() + //开启自适应刷新 + //.enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT, ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS) + //开启所有自适应刷新,MOVED,ASK,PERSISTENT都会触发 + .enableAllAdaptiveRefreshTriggers() + // 自适应刷新超时时间(默认30秒) + .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(25)) //默认关闭开启后时间为30秒 + // 开周期刷新 + .enablePeriodicRefresh(Duration.ofSeconds(20)) // 默认关闭开启后时间为60秒 ClusterTopologyRefreshOptions.DEFAULT_REFRESH_PERIOD 60 .enablePeriodicRefresh(Duration.ofSeconds(2)) = .enablePeriodicRefresh().refreshPeriod(Duration.ofSeconds(2)) + .build(); + return LettucePoolingClientConfiguration.builder() + .poolConfig(genericObjectPoolConfig) + .clientOptions(ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build()) + //将appID传入连接,方便Redis监控中查看 + //.clientName(appName + "_lettuce") + .build(); + } + + @Bean + public JedisPool jedisPool() { + JedisPool jedisPool = new JedisPool(getRedisConfig(), host, port, timeout,password); + return jedisPool; + } + @Bean + public JedisPoolConfig getRedisConfig(){ + JedisPoolConfig config = new JedisPoolConfig(); + config.setMaxTotal(maxActive); + config.setMaxIdle(maxIdle); + config.setMinIdle(minIdle); + config.setMaxWaitMillis(maxWait); + return config; + } + +} diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/controller/KettleTransController.java b/bps-kettle/src/main/java/com/ruoyi/kettle/controller/KettleTransController.java index dfe56dae6..1245529fd 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/controller/KettleTransController.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/controller/KettleTransController.java @@ -162,9 +162,9 @@ public class KettleTransController extends BaseController @RequiresPermissions("kettle:trans:run") @PostMapping("/run") @ResponseBody - public AjaxResult run(KettleTrans trans) + public AjaxResult runToQueue(KettleTrans trans) { - AjaxResult result = kettleTransService.run(trans); + AjaxResult result = kettleTransService.runToQueue(trans); return result; } diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/controller/RedisStreamController.java b/bps-kettle/src/main/java/com/ruoyi/kettle/controller/RedisStreamController.java new file mode 100644 index 000000000..aed3f2ef5 --- /dev/null +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/controller/RedisStreamController.java @@ -0,0 +1,105 @@ +package com.ruoyi.kettle.controller; + +import com.ruoyi.common.core.controller.BaseController; +import com.ruoyi.common.core.domain.AjaxResult; +import com.ruoyi.common.core.page.TableDataInfo; +import com.ruoyi.kettle.domain.KettleJob; +import org.apache.shiro.authz.annotation.RequiresPermissions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.*; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.format.annotation.DateTimeFormat; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamEntryID; + +import java.time.Duration; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; + +@Controller +@RequestMapping("/redis/stream") +public class RedisStreamController extends BaseController { + String koneConsumer="koneConsumer"; + + String koneStream = "koneStream2"; + + String koneGroup= "koneGroup2"; + int i=1; + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private JedisPool jedisPool; + + + + + @GetMapping("/createComsumer") + public AjaxResult createCrgoup() { + Jedis jedis = jedisPool.getResource(); + //String key, String groupname, StreamEntryID id, boolean makeStream + /** + * key为stream name, group为消费组,id为上次读取的位置,如果空则重新读取,makeStream是否创建流,已有的话就不用创建 + */ + System.out.println(jedis.xgroupCreate(koneStream, koneGroup, null,true)); + return AjaxResult.success(); + + } + @GetMapping("/add") + @ResponseBody + public AjaxResult addMessage() { + //这里可以添加更多的属性 + Map map = new HashMap(); + map.put("date"+i++, System.currentTimeMillis() + ""); + Jedis jedis = jedisPool.getResource(); + + jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map); + return AjaxResult.success(); + } + @ResponseBody + @GetMapping("/read") + public AjaxResult readGroup() { + Jedis jedis = jedisPool.getResource(); + Map t = new HashMap(); + t.put(koneStream, null);//null 则为 > 重头读起,也可以为$接受新消息,还可以是上一次未读完的消息id + Map.Entry e = null; + for(Map.Entry c:t.entrySet()){ + e=c; + } + //noAck为false的话需要手动ack,true则自动ack. commsumer新建的方式为xreadgroup。 + System.out.println("开始:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms"))); + List> list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 0, false, e); + System.out.println("结束:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms"))); + if(list ==null){ + System.out.println("list为空"); + return AjaxResult.error("list为空"); + } + for (Map.Entry m : list) { + System.out.println(m.getKey() + "---" + m.getValue().getClass()); + if (m.getValue() instanceof ArrayList) { + List l = (List) m.getValue(); + Map result = l.get(0).getFields(); + for (Map.Entry entry : result.entrySet()) { + System.out.println(entry.getKey() + "---" + entry.getValue()); + } + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + jedis.xack(koneStream, koneGroup, l.get(0).getID()); + System.out.println("消息消费成功"); + } + } + return AjaxResult.success(); + } +} diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleJobService.java b/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleJobService.java index a9a6b69a0..ddbff9ebc 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleJobService.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleJobService.java @@ -68,4 +68,6 @@ public interface IKettleJobService Long checkQuartzExist(String checkStr); public AjaxResult runJobQuartz(String id, String jobName); + + void runJobRightNow(Long valueOf); } diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleTransService.java b/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleTransService.java index 37629babe..94e7a57f4 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleTransService.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleTransService.java @@ -67,7 +67,7 @@ public interface IKettleTransService * @param trans : * @return: void **/ - AjaxResult run(KettleTrans trans); + AjaxResult runToQueue(KettleTrans trans); List queryTransLog(KettleTrans trans) ; /** @@ -81,4 +81,6 @@ public interface IKettleTransService public AjaxResult runTransQuartz(String id,String transName); Long checkQuartzExist(String checkStr); + + void runTransRightNow(Long valueOf); } diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleJobServiceImpl.java b/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleJobServiceImpl.java index 2a1451d99..3eef4bcfc 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleJobServiceImpl.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleJobServiceImpl.java @@ -12,6 +12,9 @@ import com.ruoyi.kettle.domain.KettleTrans; import com.ruoyi.kettle.domain.XRepository; import com.ruoyi.kettle.mapper.XRepositoryMapper; import com.ruoyi.kettle.tools.KettleUtil; +import com.ruoyi.kettle.tools.RedisStreamUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.ruoyi.kettle.mapper.KettleJobMapper; @@ -28,6 +31,7 @@ import com.ruoyi.common.core.text.Convert; @Service("kettleJobServiceImpl") public class KettleJobServiceImpl implements IKettleJobService { + private static final Logger log = LoggerFactory.getLogger(KettleJobServiceImpl.class); @Autowired private KettleJobMapper kettleJobMapper; @Autowired @@ -36,6 +40,9 @@ public class KettleJobServiceImpl implements IKettleJobService @Autowired private KettleUtil kettleUtil; + + @Autowired + private RedisStreamUtil redisStreamUtil; /** * 查询作业调度 * @@ -84,11 +91,14 @@ public class KettleJobServiceImpl implements IKettleJobService } String userName = (String) PermissionUtils.getPrincipalProperty("userName"); if(kettleJob.getRoleKey()==null){ - kettleJob.setRoleKey("admin"); + kettleJob.setRoleKey("admin,bpsadmin"); }else{ if(!kettleJob.getRoleKey().contains("admin")){ kettleJob.setRoleKey(kettleJob.getRoleKey().concat(",admin")); } + if(!kettleJob.getRoleKey().contains("bpsadmin")){ + kettleJob.setRoleKey(kettleJob.getRoleKey().concat(",bpsadmin")); + } } kettleJob.setCreatedBy(userName); kettleJob.setUpdateBy(userName); @@ -105,7 +115,21 @@ public class KettleJobServiceImpl implements IKettleJobService @Override public int updateKettleJob(KettleJob kettleJob) { + String userName = (String) PermissionUtils.getPrincipalProperty("userName"); + kettleJob.setUpdateTime(DateUtils.getNowDate()); + kettleJob.setUpdateBy(userName); + kettleJob.setJobType("File"); + if(kettleJob.getRoleKey()==null){ + kettleJob.setRoleKey("admin,bpsadmin"); + }else{ + if(!kettleJob.getRoleKey().contains("admin")){ + kettleJob.setRoleKey(kettleJob.getRoleKey().concat(",admin")); + } + if(!kettleJob.getRoleKey().contains("bpsadmin")){ + kettleJob.setRoleKey(kettleJob.getRoleKey().concat(",bpsadmin")); + } + } return kettleJobMapper.updateKettleJob(kettleJob); } @@ -144,6 +168,43 @@ public class KettleJobServiceImpl implements IKettleJobService if(repository==null){ return AjaxResult.error("资源库不存在!"); } + //加入队列中,等待执行 + redisStreamUtil.addKettleJob(kettleJob); + //更新一下状态 + kettleJob.setJobStatus("等待中"); + kettleJobMapper.updateKettleJob(kettleJob); + return AjaxResult.success("已加入执行队列,请等待运行结果通知!"); +// String path = kettleJob.getJobPath(); +// try { +// kettleUtil.KETTLE_LOG_LEVEL=kettleJob.getJobLogLevel(); +// kettleUtil.KETTLE_REPO_ID=String.valueOf(kettleJob.getJobRepositoryId()); +// kettleUtil.KETTLE_REPO_NAME=repository.getRepoName(); +// kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir(); +// kettleUtil.callJob(path,kettleJob.getJobName(),null,null); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// + + } + + @Override + public void runJobRightNow(Long id) { + KettleJob kettleJob = kettleJobMapper.selectKettleJobById(id); + if(kettleJob ==null){ + log.error("作业不存在!"); + return; + } + XRepository repository=repositoryMapper.selectXRepositoryById(kettleJob.getJobRepositoryId()); + if(repository==null){ + log.error("资源库不存在!"); + return; + } + //加入队列中,等待执行 + redisStreamUtil.addKettleJob(kettleJob); + //更新一下状态 + kettleJob.setJobStatus("运行中"); + kettleJobMapper.updateKettleJob(kettleJob); String path = kettleJob.getJobPath(); try { kettleUtil.KETTLE_LOG_LEVEL=kettleJob.getJobLogLevel(); @@ -151,13 +212,15 @@ public class KettleJobServiceImpl implements IKettleJobService kettleUtil.KETTLE_REPO_NAME=repository.getRepoName(); kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir(); kettleUtil.callJob(path,kettleJob.getJobName(),null,null); + kettleJob.setJobStatus("已结束"); + kettleJobMapper.updateKettleJob(kettleJob); } catch (Exception e) { + kettleJob.setJobStatus("异常"); + kettleJobMapper.updateKettleJob(kettleJob); e.printStackTrace(); } - - return AjaxResult.success("执行成功!"); } - + } @Override public List queryJobLog(KettleJob kettleJob) { List logs=kettleJobMapper.queryJobLog(kettleJob.getJobName()); @@ -173,4 +236,5 @@ public class KettleJobServiceImpl implements IKettleJobService KettleJob kettleJob = kettleJobMapper.selectKettleJobById(Long.valueOf(id)); return run(kettleJob); } + } diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleTransServiceImpl.java b/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleTransServiceImpl.java index 499236de8..6c5f32787 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleTransServiceImpl.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleTransServiceImpl.java @@ -12,6 +12,9 @@ import com.ruoyi.kettle.domain.XRepository; import com.ruoyi.kettle.mapper.XRepositoryMapper; import com.ruoyi.kettle.service.IKettleTransService; import com.ruoyi.kettle.tools.KettleUtil; +import com.ruoyi.kettle.tools.RedisStreamUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.ruoyi.kettle.mapper.KettleTransMapper; @@ -27,6 +30,8 @@ import com.ruoyi.common.core.text.Convert; @Service("kettleTransServiceImpl") public class KettleTransServiceImpl implements IKettleTransService { + + private static final Logger log = LoggerFactory.getLogger(KettleTransServiceImpl.class); @Autowired private KettleTransMapper kettleTransMapper; @Autowired @@ -35,6 +40,9 @@ public class KettleTransServiceImpl implements IKettleTransService @Autowired private KettleUtil kettleUtil; + @Autowired + private RedisStreamUtil redisStreamUtil; + /** * 查询转换 * @@ -86,11 +94,14 @@ public class KettleTransServiceImpl implements IKettleTransService } String userName = (String) PermissionUtils.getPrincipalProperty("userName"); if(kettleTrans.getRoleKey()==null){ - kettleTrans.setRoleKey("admin"); + kettleTrans.setRoleKey("admin,bpsadmin"); }else{ if(!kettleTrans.getRoleKey().contains("admin")){ kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",admin")); } + if(!kettleTrans.getRoleKey().contains("bpsadmin")){ + kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",bpsadmin")); + } } kettleTrans.setCreatedBy(userName); kettleTrans.setUpdateBy(userName); @@ -112,11 +123,14 @@ public class KettleTransServiceImpl implements IKettleTransService kettleTrans.setUpdateTime(DateUtils.getNowDate()); kettleTrans.setTransType("File"); if(kettleTrans.getRoleKey()==null){ - kettleTrans.setRoleKey("admin"); + kettleTrans.setRoleKey("admin,bpsadmin"); }else{ if(!kettleTrans.getRoleKey().contains("admin")){ kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",admin")); } + if(!kettleTrans.getRoleKey().contains("bpsadmin")){ + kettleTrans.setRoleKey(kettleTrans.getRoleKey().concat(",bpsadmin")); + } } return kettleTransMapper.updateKettleTrans(kettleTrans); } @@ -146,23 +160,46 @@ public class KettleTransServiceImpl implements IKettleTransService /** - * @Description:立即执行一次转换 + * @Description:立即执行一次转换,放到redis队列中 * @Author: Kone.wang * @Date: 2021/7/15 14:31 * @param trans : * @return: void **/ @Override - public AjaxResult run(KettleTrans trans) { + public AjaxResult runToQueue(KettleTrans trans) { Long id = trans.getId(); KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(id); - if(kettleTrans ==null){ + if(kettleTrans ==null || kettleTrans.getId()==null){ return AjaxResult.error("转换不存在!"); } XRepository repository=repositoryMapper.selectXRepositoryById(kettleTrans.getTransRepositoryId()); if(repository==null){ return AjaxResult.error("资源库不存在!"); } + //加入队列中,等待执行 + redisStreamUtil.addKettleTrans(kettleTrans); + //更新一下状态 + trans.setTransStatus("等待中"); + kettleTransMapper.updateKettleTrans(trans); + return AjaxResult.success("已加入执行队列,请等待运行结果通知!"); + } + + @Override + public void runTransRightNow(Long id) { + KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(id); + if(kettleTrans ==null || kettleTrans.getId()==null){ + log.error("转换不存在!:"+id); + return; + } + XRepository repository=repositoryMapper.selectXRepositoryById(kettleTrans.getTransRepositoryId()); + if(repository==null){ + log.error("资源库不存在!"); + return; + } + //更新状态未运行中 + kettleTrans.setTransStatus("运行中"); + kettleTransMapper.updateKettleTrans(kettleTrans); String path = kettleTrans.getTransPath(); try { kettleUtil.KETTLE_LOG_LEVEL=kettleTrans.getTransLogLevel(); @@ -170,12 +207,13 @@ public class KettleTransServiceImpl implements IKettleTransService kettleUtil.KETTLE_REPO_NAME=repository.getRepoName(); kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir(); kettleUtil.callTrans(path,kettleTrans.getTransName(),null,null); + kettleTrans.setTransStatus("已结束"); + kettleTransMapper.updateKettleTrans(kettleTrans); } catch (Exception e) { - e.printStackTrace(); + kettleTrans.setTransStatus("异常"); + kettleTransMapper.updateKettleTrans(kettleTrans); + log.error(id+"的trans执行失败:"+e.getMessage()); } - - - return AjaxResult.success("执行成功!"); } /** * @Description:查询抓换执行日志 @@ -200,7 +238,7 @@ public class KettleTransServiceImpl implements IKettleTransService @Override public AjaxResult runTransQuartz(String id, String transName) { KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(Long.valueOf(id)); - return run(kettleTrans); + return runToQueue(kettleTrans); } /** * @Description:检查该转换是否设置了定时任务 @@ -214,4 +252,5 @@ public class KettleTransServiceImpl implements IKettleTransService return kettleTransMapper.checkQuartzExist(checkStr); } + } diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/CommandLineRunnerImpl.java b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/CommandLineRunnerImpl.java new file mode 100644 index 000000000..053e429d7 --- /dev/null +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/CommandLineRunnerImpl.java @@ -0,0 +1,18 @@ +package com.ruoyi.kettle.tools; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +@Component +public class CommandLineRunnerImpl implements CommandLineRunner { + + @Autowired + private RedisStreamUtil redisStreamUtil; + + + @Override + public void run(String... args) throws Exception { + redisStreamUtil.readGroup(); + } +} diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/KettleUtil.java b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/KettleUtil.java index c94960847..11261303c 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/KettleUtil.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/KettleUtil.java @@ -22,6 +22,8 @@ import org.springframework.stereotype.Component; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.TimeUnit; + @Component public class KettleUtil { public static final Logger log = LoggerFactory.getLogger(KettleUtil.class); @@ -43,7 +45,7 @@ public class KettleUtil { public void callTrans(String transPath, String transName, Map namedParams, String[] clParams) throws Exception { KettleEnv.init(); DatabaseMeta databaseMeta=new DatabaseMeta("kettle_trans_log", "mysql", "Native(JDBC)", - "xxx.xxx.x.xx","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "password"); + "192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123"); String msg; KettleFileRepository repo = this.fileRepositoryCon(); @@ -87,6 +89,7 @@ public class KettleUtil { log.error(msg); throw new Exception(msg); } + TimeUnit.SECONDS.sleep(10); } /** @@ -98,7 +101,7 @@ public class KettleUtil { KettleEnv.init(); String msg; DatabaseMeta databaseMeta=new DatabaseMeta("kettle_job_log", "mysql", "Native(JDBC)", - "xxx.xxx.x.xx","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "password"); + "192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123"); KettleFileRepository repo = this.fileRepositoryCon(); JobMeta jobMeta = this.loadJob(repo, jobPath, jobName); jobMeta.addDatabase(databaseMeta); @@ -155,13 +158,6 @@ public class KettleUtil { } return transMeta; } - - - - - - - /** * 加载job * @param repo kettle文件资源库 diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/RedisStreamUtil.java b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/RedisStreamUtil.java new file mode 100644 index 000000000..fcdccf153 --- /dev/null +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/RedisStreamUtil.java @@ -0,0 +1,141 @@ +package com.ruoyi.kettle.tools; + +import com.ruoyi.common.core.domain.AjaxResult; +import com.ruoyi.kettle.domain.KettleJob; +import com.ruoyi.kettle.domain.KettleTrans; +import com.ruoyi.kettle.service.IKettleJobService; +import com.ruoyi.kettle.service.IKettleTransService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamEntryID; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Component +public class RedisStreamUtil { + private static final Logger log = LoggerFactory.getLogger(RedisStreamUtil.class); + String koneConsumer="koneConsumer"; + + String koneStream = "koneStream2"; + + String koneGroup= "koneGroup2"; + + @Autowired + private JedisPool jedisPool; + + @Autowired + private IKettleTransService transService; + + @Autowired + private IKettleJobService jobService; + + /** + * @Description: 往队列中插入trans + * @Author: Kone.wang + * @Date: 2021/8/6 13:50 + * @param trans: + * @return: com.ruoyi.common.core.domain.AjaxResult + **/ + public void addKettleTrans(KettleTrans trans) { + String transName=trans.getTransName(); + Long trandId = trans.getId(); + + //这里可以添加更多的属性 + Map map = new HashMap(); + map.put("trans_"+trandId, transName); + Jedis jedis = jedisPool.getResource(); + + jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map); + } + + /** + * @Description: 往队列中插入job + * @Author: Kone.wang + * @Date: 2021/8/6 13:50 + * @param job: + * @return: com.ruoyi.common.core.domain.AjaxResult + **/ + public void addKettleJob(KettleJob job) { + String jobName=job.getJobName(); + Long jobId = job.getId(); + //这里可以添加更多的属性 + Map map = new HashMap(); + map.put("job_"+jobId, jobName); + Jedis jedis = jedisPool.getResource(); + + jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map); + } + /** + * @Description: 循环重队列中读消息 + * @Author: Kone.wang + * @Date: 2021/8/6 13:50 + * @return: void + **/ + public void readGroup() { + + while (true){ + Jedis jedis = jedisPool.getResource(); + Map t = new HashMap(); + List> list = new ArrayList<>(); + t.put(koneStream, null);//null 则为 > 重头读起,也可以为$接受新消息,还可以是上一次未读完的消息id + Map.Entry e = null; + for(Map.Entry c:t.entrySet()){ + e=c; + } + //noAck为false的话需要手动ack,true则自动ack. commsumer新建的方式为xreadgroup。 + System.out.println("开始:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms"))); + try{ + list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 3600000, false, e); + + }catch (Exception ex){ + log.error("超时了!!!!!!!!"); + } + System.out.println("结束:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"))); + if(list ==null){ + log.error("list为空"); + }else{ + for (Map.Entry m : list) { + if (m.getValue() instanceof ArrayList) { + List l = (List) m.getValue(); + Map result = l.get(0).getFields(); + for (Map.Entry entry : result.entrySet()) { + System.out.println(entry.getKey() + "---" + entry.getValue()); + if(entry.getKey() != null){ + String key = String.valueOf(entry.getKey()); + String value =String.valueOf(entry.getValue()); + String id=key.substring(key.indexOf("_")+1); + if(key.startsWith("trans_")){ + log.info(value+"的trans:开始执行"); + transService.runTransRightNow(Long.valueOf(id)); + log.info(value+"的trans:结束执行"); + }else if(key.startsWith("job_")){ + log.info(value+"的job:开始执行"); + jobService.runJobRightNow(Long.valueOf(id)); + log.info(value+"的job:结束执行"); + } + } + } + jedis.xack(koneStream, koneGroup, l.get(0).getID()); + System.out.println("消息消费成功"); + } + } + } + + } + } + +} diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index fefe9f965..37267eb23 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -75,7 +75,23 @@ spring: base: OU=bp,DC=bpsemi,DC=com username: administrator@bpsemi.com password: Bps@2831! - + redis: + host: 127.0.0.1 + port: 6379 + password: "2129" + timeout: 3600000 + database: 2 + lettuce: + pool: + max-active: 100 + max-idle: 10 + min-idle: 0 + max-wait: 3600000 + cluster: + refresh: + adaptive: true + #20秒自动刷新一次 + period: 20 # MyBatis mybatis: # 搜索指定包别名