diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/controller/RedisStreamController.java b/bps-kettle/src/main/java/com/ruoyi/kettle/controller/RedisStreamController.java deleted file mode 100644 index aed3f2ef5..000000000 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/controller/RedisStreamController.java +++ /dev/null @@ -1,105 +0,0 @@ -package com.ruoyi.kettle.controller; - -import com.ruoyi.common.core.controller.BaseController; -import com.ruoyi.common.core.domain.AjaxResult; -import com.ruoyi.common.core.page.TableDataInfo; -import com.ruoyi.kettle.domain.KettleJob; -import org.apache.shiro.authz.annotation.RequiresPermissions; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.stream.*; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.format.annotation.DateTimeFormat; -import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.StreamEntry; -import redis.clients.jedis.StreamEntryID; - -import java.time.Duration; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.concurrent.TimeUnit; - -@Controller -@RequestMapping("/redis/stream") -public class RedisStreamController extends BaseController { - String koneConsumer="koneConsumer"; - - String koneStream = "koneStream2"; - - String koneGroup= "koneGroup2"; - int i=1; - @Autowired - private RedisTemplate redisTemplate; - - @Autowired - private JedisPool jedisPool; - - - - - @GetMapping("/createComsumer") - public AjaxResult createCrgoup() { - Jedis jedis = jedisPool.getResource(); - //String key, String groupname, StreamEntryID id, boolean makeStream - /** - * key为stream name, group为消费组,id为上次读取的位置,如果空则重新读取,makeStream是否创建流,已有的话就不用创建 - */ - System.out.println(jedis.xgroupCreate(koneStream, koneGroup, null,true)); - return AjaxResult.success(); - - } - @GetMapping("/add") - @ResponseBody - public AjaxResult addMessage() { - //这里可以添加更多的属性 - Map map = new HashMap(); - map.put("date"+i++, System.currentTimeMillis() + ""); - Jedis jedis = jedisPool.getResource(); - - jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map); - return AjaxResult.success(); - } - @ResponseBody - @GetMapping("/read") - public AjaxResult readGroup() { - Jedis jedis = jedisPool.getResource(); - Map t = new HashMap(); - t.put(koneStream, null);//null 则为 > 重头读起,也可以为$接受新消息,还可以是上一次未读完的消息id - Map.Entry e = null; - for(Map.Entry c:t.entrySet()){ - e=c; - } - //noAck为false的话需要手动ack,true则自动ack. commsumer新建的方式为xreadgroup。 - System.out.println("开始:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms"))); - List> list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 0, false, e); - System.out.println("结束:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms"))); - if(list ==null){ - System.out.println("list为空"); - return AjaxResult.error("list为空"); - } - for (Map.Entry m : list) { - System.out.println(m.getKey() + "---" + m.getValue().getClass()); - if (m.getValue() instanceof ArrayList) { - List l = (List) m.getValue(); - Map result = l.get(0).getFields(); - for (Map.Entry entry : result.entrySet()) { - System.out.println(entry.getKey() + "---" + entry.getValue()); - } - try { - TimeUnit.SECONDS.sleep(2); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - jedis.xack(koneStream, koneGroup, l.get(0).getID()); - System.out.println("消息消费成功"); - } - } - return AjaxResult.success(); - } -} diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleJobService.java b/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleJobService.java index ddbff9ebc..83d83e014 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleJobService.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleJobService.java @@ -69,5 +69,5 @@ public interface IKettleJobService public AjaxResult runJobQuartz(String id, String jobName); - void runJobRightNow(Long valueOf); + void runJobRightNow(Long valueOf, String userId); } diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleTransService.java b/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleTransService.java index 94e7a57f4..3a950b7a4 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleTransService.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/service/IKettleTransService.java @@ -82,5 +82,5 @@ public interface IKettleTransService Long checkQuartzExist(String checkStr); - void runTransRightNow(Long valueOf); + void runTransRightNow(Long valueOf, String userId); } diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleJobServiceImpl.java b/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleJobServiceImpl.java index ec038c2a4..e1f71949e 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleJobServiceImpl.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleJobServiceImpl.java @@ -2,17 +2,18 @@ package com.ruoyi.kettle.service.impl; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.core.domain.entity.SysRole; import com.ruoyi.common.utils.DateUtils; import com.ruoyi.common.utils.security.PermissionUtils; -import com.ruoyi.kettle.domain.KettleTrans; import com.ruoyi.kettle.domain.XRepository; import com.ruoyi.kettle.mapper.XRepositoryMapper; import com.ruoyi.kettle.tools.KettleUtil; import com.ruoyi.kettle.tools.RedisStreamUtil; +import com.ruoyi.system.service.IWechatApiService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -43,6 +44,8 @@ public class KettleJobServiceImpl implements IKettleJobService @Autowired private RedisStreamUtil redisStreamUtil; + @Autowired + IWechatApiService wechatApiService; /** * 查询作业调度 * @@ -189,7 +192,7 @@ public class KettleJobServiceImpl implements IKettleJobService } @Override - public void runJobRightNow(Long id) { + public void runJobRightNow(Long id, String userId) { KettleJob kettleJob = kettleJobMapper.selectKettleJobById(id); if(kettleJob ==null){ log.error("作业不存在!"); @@ -204,22 +207,26 @@ public class KettleJobServiceImpl implements IKettleJobService //更新一下状态 kettleJob.setJobStatus("运行中"); kettleJobMapper.updateKettleJob(kettleJob); - String path = kettleJob.getJobPath(); + StringBuilder title = new StringBuilder(kettleJob.getJobName()).append(".kjb 执行结果:"); + StringBuilder msg = new StringBuilder(kettleJob.getJobName()).append(".kjb 执行结果:"); try { - kettleUtil.KETTLE_LOG_LEVEL=kettleJob.getJobLogLevel(); - kettleUtil.KETTLE_REPO_ID=String.valueOf(kettleJob.getJobRepositoryId()); - kettleUtil.KETTLE_REPO_NAME=repository.getRepoName(); - kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir(); - kettleUtil.callJob(path,kettleJob.getJobName(),null,null); + kettleUtil.callJob(kettleJob,repository,null,null); kettleJob.setJobStatus("成功"); kettleJob.setLastSucceedTime(DateUtils.getNowDate()); kettleJobMapper.updateKettleJob(kettleJob); + title.append("成功!"); + msg.append("成功!"); } catch (Exception e) { kettleJob.setJobStatus("异常"); kettleJobMapper.updateKettleJob(kettleJob); + title.append("异常!"); + msg.append("异常!"); e.printStackTrace(); } - + List userIdList = new ArrayList<>(); + userIdList.add(userId); + Map resultMap = wechatApiService.SendTextCardMessageToWechatUser(userIdList,title.toString(),msg.toString(),"http://report.bpsemi.cn:8081/it_war"); + log.info("job微信消息发送结果"+resultMap); } @Override public List queryJobLog(KettleJob kettleJob) { diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleTransServiceImpl.java b/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleTransServiceImpl.java index 8e732ce23..990bcce37 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleTransServiceImpl.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/service/impl/KettleTransServiceImpl.java @@ -2,6 +2,7 @@ package com.ruoyi.kettle.service.impl; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import com.ruoyi.common.core.domain.AjaxResult; @@ -13,6 +14,7 @@ import com.ruoyi.kettle.mapper.XRepositoryMapper; import com.ruoyi.kettle.service.IKettleTransService; import com.ruoyi.kettle.tools.KettleUtil; import com.ruoyi.kettle.tools.RedisStreamUtil; +import com.ruoyi.system.service.IWechatApiService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -42,7 +44,8 @@ public class KettleTransServiceImpl implements IKettleTransService @Autowired private RedisStreamUtil redisStreamUtil; - + @Autowired + IWechatApiService wechatApiService; /** * 查询转换 * @@ -186,7 +189,7 @@ public class KettleTransServiceImpl implements IKettleTransService } @Override - public void runTransRightNow(Long id) { + public void runTransRightNow(Long id, String userId) { KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(id); if(kettleTrans ==null || kettleTrans.getId()==null){ log.error("转换不存在!:"+id); @@ -200,21 +203,27 @@ public class KettleTransServiceImpl implements IKettleTransService //更新状态未运行中 kettleTrans.setTransStatus("运行中"); kettleTransMapper.updateKettleTrans(kettleTrans); - String path = kettleTrans.getTransPath(); + StringBuilder title = new StringBuilder(kettleTrans.getTransName()).append(".ktr 执行结果:"); + StringBuilder msg = new StringBuilder(kettleTrans.getTransName()).append(".ktr 执行结果:"); try { - kettleUtil.KETTLE_LOG_LEVEL=kettleTrans.getTransLogLevel(); - kettleUtil.KETTLE_REPO_ID=String.valueOf(kettleTrans.getTransRepositoryId()); - kettleUtil.KETTLE_REPO_NAME=repository.getRepoName(); - kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir(); - kettleUtil.callTrans(path,kettleTrans.getTransName(),null,null); + kettleUtil.callTrans(kettleTrans,repository,null,null); kettleTrans.setTransStatus("成功"); kettleTrans.setLastSucceedTime(DateUtils.getNowDate()); kettleTransMapper.updateKettleTrans(kettleTrans); + title.append("成功!"); + msg.append("成功!"); } catch (Exception e) { kettleTrans.setTransStatus("异常"); kettleTransMapper.updateKettleTrans(kettleTrans); + title.append("异常!"); + msg.append("异常!"); log.error(id+"的trans执行失败:"+e.getMessage()); } + List userIdList = new ArrayList<>(); + userIdList.add(userId); + Map resultMap = wechatApiService.SendTextCardMessageToWechatUser(userIdList,title.toString(),msg.toString(),"http://report.bpsemi.cn:8081/it_war"); + log.info("trans微信消息发送结果"+resultMap); + } /** * @Description:查询抓换执行日志 diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/CommandLineRunnerImpl.java b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/CommandLineRunnerImpl.java index 053e429d7..4790e04a2 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/CommandLineRunnerImpl.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/CommandLineRunnerImpl.java @@ -1,7 +1,10 @@ package com.ruoyi.kettle.tools; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.CommandLineRunner; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; @Component @@ -13,6 +16,12 @@ public class CommandLineRunnerImpl implements CommandLineRunner { @Override public void run(String... args) throws Exception { - redisStreamUtil.readGroup(); + + new Thread(){ + public void run() { + redisStreamUtil.readGroup(); + } + }.start(); } + } diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/KettleUtil.java b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/KettleUtil.java index 11261303c..08424a27f 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/KettleUtil.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/KettleUtil.java @@ -1,6 +1,9 @@ package com.ruoyi.kettle.tools; import com.ruoyi.common.config.datasource.DynamicDataSourceContextHolder; +import com.ruoyi.kettle.domain.KettleJob; +import com.ruoyi.kettle.domain.KettleTrans; +import com.ruoyi.kettle.domain.XRepository; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; @@ -29,27 +32,21 @@ public class KettleUtil { public static final Logger log = LoggerFactory.getLogger(KettleUtil.class); - public String KETTLE_LOG_LEVEL = "basic"; - public String KETTLE_REPO_ID = "2"; - public String KETTLE_REPO_NAME = "koneTest"; - public String KETTLE_REPO_DESC = "DESC"; - public String KETTLE_REPO_PATH = "D:\\etl"; + /** * 执行文件资源库转换 - * @param transPath 转换路径(相对于资源库) - * @param transName 转换名称(不需要后缀) * @param namedParams 命名参数 * @param clParams 命令行参数 */ - public void callTrans(String transPath, String transName, Map namedParams, String[] clParams) throws Exception { + public void callTrans(KettleTrans kettleTrans, XRepository xrepository, Map namedParams, String[] clParams) throws Exception { KettleEnv.init(); DatabaseMeta databaseMeta=new DatabaseMeta("kettle_trans_log", "mysql", "Native(JDBC)", "192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123"); String msg; - KettleFileRepository repo = this.fileRepositoryCon(); - TransMeta transMeta = this.loadTrans(repo, transPath, transName); + KettleFileRepository repo = this.fileRepositoryCon(xrepository); + TransMeta transMeta = this.loadTrans(repo, kettleTrans.getTransPath(), kettleTrans.getTransName()); transMeta.addDatabase(databaseMeta); VariableSpace space=new Variables(); @@ -71,7 +68,7 @@ public class KettleUtil { trans.setParameterValue(entry.getKey(), entry.getValue()); } } - trans.setLogLevel(this.getLogerLevel(KETTLE_LOG_LEVEL)); + trans.setLogLevel(this.getLogerLevel(kettleTrans.getTransLogLevel())); //执行 trans.execute(clParams); trans.waitUntilFinished(); @@ -94,16 +91,15 @@ public class KettleUtil { /** * 执行文件资源库job - * @param jobName * @throws Exception */ - public boolean callJob(String jobPath, String jobName, Map variables, String[] clParams) throws Exception { + public boolean callJob(KettleJob kettleJob,XRepository xRepository, Map variables, String[] clParams) throws Exception { KettleEnv.init(); String msg; DatabaseMeta databaseMeta=new DatabaseMeta("kettle_job_log", "mysql", "Native(JDBC)", "192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123"); - KettleFileRepository repo = this.fileRepositoryCon(); - JobMeta jobMeta = this.loadJob(repo, jobPath, jobName); + KettleFileRepository repo = this.fileRepositoryCon(xRepository); + JobMeta jobMeta = this.loadJob(repo, kettleJob.getJobPath(), kettleJob.getJobName()); jobMeta.addDatabase(databaseMeta); VariableSpace space=new Variables(); space.setVariable("test","fromDbName"); @@ -121,7 +117,7 @@ public class KettleUtil { } } //设置日志级别 - job.setLogLevel(this.getLogerLevel(KETTLE_LOG_LEVEL)); + job.setLogLevel(this.getLogerLevel(kettleJob.getJobLogLevel())); job.setArguments(clParams); job.start(); job.waitUntilFinished(); @@ -208,9 +204,9 @@ public class KettleUtil { * @param jobName * @throws Exception */ - public void callNativeJob(String jobName) throws Exception { +/* public void callNativeJob(String jobName) throws Exception { // 初始化 - /*KettleEnvironment.init();*/ + *//*KettleEnvironment.init();*//* JobMeta jobMeta = new JobMeta(jobName, null); Job job = new Job(null, jobMeta); @@ -223,7 +219,7 @@ public class KettleUtil { if (job.getErrors() > 0) { throw new Exception("There are errors during job exception!(执行job发生异常)"); } - } + }*/ /** * 取得kettle的日志级别 @@ -253,14 +249,14 @@ public class KettleUtil { /** * 配置kettle文件库资源库环境 **/ - public KettleFileRepository fileRepositoryCon() throws KettleException { + public KettleFileRepository fileRepositoryCon(XRepository xRepository) throws KettleException { String msg; //初始化 /*EnvUtil.environmentInit(); KettleEnvironment.init();*/ //资源库元对象 - KettleFileRepositoryMeta fileRepositoryMeta = new KettleFileRepositoryMeta(this.KETTLE_REPO_ID, this.KETTLE_REPO_NAME, this.KETTLE_REPO_DESC, this.KETTLE_REPO_PATH); + KettleFileRepositoryMeta fileRepositoryMeta = new KettleFileRepositoryMeta(String.valueOf(xRepository.getId()), xRepository.getRepoName(), xRepository.getRemark(), xRepository.getBaseDir()); // 文件形式的资源库 KettleFileRepository repo = new KettleFileRepository(); repo.init(fileRepositoryMeta); @@ -268,11 +264,11 @@ public class KettleUtil { repo.connect("", "");//默认的连接资源库的用户名和密码 if (repo.isConnected()) { - msg = "kettle文件库资源库【" + KETTLE_REPO_PATH + "】连接成功"; + msg = "kettle文件库资源库【" + xRepository.getBaseDir() + "】连接成功"; log.info(msg); return repo; } else { - msg = "kettle文件库资源库【" + KETTLE_REPO_PATH + "】连接失败"; + msg = "kettle文件库资源库【" + xRepository.getBaseDir() + "】连接失败"; log.error(msg); throw new KettleException(msg); } diff --git a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/RedisStreamUtil.java b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/RedisStreamUtil.java index b2162d643..5da7532b3 100644 --- a/bps-kettle/src/main/java/com/ruoyi/kettle/tools/RedisStreamUtil.java +++ b/bps-kettle/src/main/java/com/ruoyi/kettle/tools/RedisStreamUtil.java @@ -1,18 +1,23 @@ package com.ruoyi.kettle.tools; +import com.ruoyi.common.utils.security.PermissionUtils; import com.ruoyi.kettle.domain.KettleJob; import com.ruoyi.kettle.domain.KettleTrans; import com.ruoyi.kettle.service.IKettleJobService; import com.ruoyi.kettle.service.IKettleTransService; +import com.ruoyi.system.service.ISysConfigService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; +import java.net.InetAddress; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -20,15 +25,26 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; - +/** + * @Description: + * 现在redis安装后新建一个stream: XADD koneStream * user kang msg Hello + *再把他读掉:XREAD streams koneStream 0 + *最后创建一个这个steam的消费者:XGROUP CREATE koneStream koneGroup 0 + * @Author: Kone.wang + * @Date: 2021/8/10 13:19 + **/ @Component public class RedisStreamUtil { private static final Logger log = LoggerFactory.getLogger(RedisStreamUtil.class); - String koneConsumer="koneConsumer"; - String koneStream = "koneStream2"; - - String koneGroup= "koneGroup2"; + String koneConsumer="bpsemi_consumer"; +// +// @Value("${stream.key}") +// String koneStream ; +// @Value("${stream.group}") +// String koneGroup ; + @Value("${spring.redis.timeout}") + Long waitTIme; @Autowired private JedisPool jedisPool; @@ -38,7 +54,8 @@ public class RedisStreamUtil { @Autowired private IKettleJobService jobService; - + @Autowired + private ISysConfigService configService; /** * @Description: 往队列中插入trans * @Author: Kone.wang @@ -47,15 +64,40 @@ public class RedisStreamUtil { * @return: com.ruoyi.common.core.domain.AjaxResult **/ public void addKettleTrans(KettleTrans trans) { + //获取主机ip + String localAddr = configService.selectConfigByKey("sys.local.addr"); + localAddr =localAddr!=null?localAddr:"192.168.2.84"; + String koneStream="bpsemi_test"; + try{ + InetAddress addr = InetAddress.getLocalHost(); + String address = addr.getHostAddress(); + if(address.equals(localAddr)){ + koneStream="bpsemi"; + } + }catch (Exception e){ + log.error("addKettleTrans()获取主机ip异常:"+e); + } + String transName=trans.getTransName(); Long trandId = trans.getId(); - //这里可以添加更多的属性 - Map map = new HashMap(); - map.put("trans_"+trandId, transName); - Jedis jedis = jedisPool.getResource(); + //定时任务跑的时候这个会报错,所以捕获一下然后设置默认的 + String userId =""; + try{ + userId = String.valueOf(PermissionUtils.getPrincipalProperty("userId")); + }catch (Exception e){ + log.warn("定时任务执行的,默认发送给天宁吧408"); + userId="454"; + } + + log.info(userId+"开始增加:trans_"+trandId+"@"+userId+":::"+transName); + //这里可以添加更多的属性 + Map map = new HashMap(); + map.put("trans_"+trandId+"@"+userId, transName); + Jedis jedis = jedisPool.getResource(); StreamEntryID id =jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map); + log.info(userId+"成功增加:trans_"+trandId+"@"+userId+":::"+transName+"[StreamEntryID:"+id+"]"); } /** @@ -66,14 +108,37 @@ public class RedisStreamUtil { * @return: com.ruoyi.common.core.domain.AjaxResult **/ public void addKettleJob(KettleJob job) { + //获取主机ip + String localAddr = configService.selectConfigByKey("sys.local.addr"); + localAddr =localAddr!=null?localAddr:"192.168.2.84"; + String koneStream="bpsemi_test"; + try{ + InetAddress addr = InetAddress.getLocalHost(); + String address = addr.getHostAddress(); + if(address.equals(localAddr)){ + koneStream="bpsemi"; + } + }catch (Exception e){ + log.error("addKettleJob()获取主机ip异常:"+e); + } String jobName=job.getJobName(); Long jobId = job.getId(); + String userId =""; + try{ + userId = String.valueOf(PermissionUtils.getPrincipalProperty("userId")); + }catch (Exception e){ + log.warn("定时任务执行的,默认发送给天宁吧408"); + userId="454"; + } + + log.info(userId+"开始增加:job_"+jobId+"@"+userId+":::"+jobName); //这里可以添加更多的属性 - Map map = new HashMap(); - map.put("job_"+jobId, jobName); + Map map = new HashMap(); + map.put("job_"+jobId+"@"+userId, jobName); Jedis jedis = jedisPool.getResource(); - jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map); + StreamEntryID id = jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map); + log.info(userId+"成功增加:job_"+jobId+"@"+userId+":::"+jobName+"[StreamEntryID:"+id+"]"); } /** * @Description: 循环重队列中读消息 @@ -82,27 +147,42 @@ public class RedisStreamUtil { * @return: void **/ public void readGroup() { - + //获取主机ip + String localAddr = configService.selectConfigByKey("sys.local.addr"); + localAddr =localAddr!=null?localAddr:"192.168.2.84"; + String koneStream="bpsemi_test"; + String koneGroup="bpsemi_group_test"; + String koneConsumer="bpsemi_consumer"; + try{ + InetAddress addr = InetAddress.getLocalHost(); + String address = addr.getHostAddress(); + if(address.equals(localAddr)){ + koneStream="bpsemi"; + koneGroup="bpsemi_group"; + } + }catch (Exception e){ + log.error("addKettleJob()获取主机ip异常:"+e); + } while (true){ Jedis jedis = jedisPool.getResource(); - Map t = new HashMap(); - List> list = new ArrayList<>(); + Map t = new HashMap(); + List>> list = new ArrayList>>(); t.put(koneStream, null);//null 则为 > 重头读起,也可以为$接受新消息,还可以是上一次未读完的消息id Map.Entry e = null; for(Map.Entry c:t.entrySet()){ e=c; } //noAck为false的话需要手动ack,true则自动ack. commsumer新建的方式为xreadgroup。 - System.out.println("开始:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms"))); + log.info("开始读消息"); try{ - list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 3600000, false, e); + list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 30000, false, e); }catch (Exception ex){ log.error("超时了!!!!!!!!"); } - System.out.println("结束:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"))); + log.info("读消息结束!"); if(list ==null){ - log.error("list为空"); + log.error("读到的list为空"); }else{ for (Map.Entry m : list) { if (m.getValue() instanceof ArrayList) { @@ -113,20 +193,21 @@ public class RedisStreamUtil { if(entry.getKey() != null){ String key = String.valueOf(entry.getKey()); String value =String.valueOf(entry.getValue()); - String id=key.substring(key.indexOf("_")+1); + String id=key.substring(key.indexOf("_")+1,key.indexOf("@")); + String userId=key.substring(key.indexOf("@")+1); if(key.startsWith("trans_")){ log.info(value+"的trans:开始执行"); - transService.runTransRightNow(Long.valueOf(id)); + transService.runTransRightNow(Long.valueOf(id),userId); log.info(value+"的trans:结束执行"); }else if(key.startsWith("job_")){ log.info(value+"的job:开始执行"); - jobService.runJobRightNow(Long.valueOf(id)); + jobService.runJobRightNow(Long.valueOf(id),userId); log.info(value+"的job:结束执行"); } } } - jedis.xack(koneStream, koneGroup, l.get(0).getID()); - System.out.println("消息消费成功"); + long id = jedis.xack(koneStream, koneGroup, l.get(0).getID()); + log.info("消息消费成功:"+id); } } } diff --git a/pom.xml b/pom.xml index e5a734bfc..7ba912e57 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,4 @@ - + 4.0.0 @@ -30,7 +30,7 @@ 5.8.0 2.11.0 1.4 - 3.17 + 4.1.2 1.7 diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index 66d0b7258..d9cb9ffb9 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -12,7 +12,6 @@ ruoyi: profile: C:/bps-it/uploadPath # 获取ip地址开关 addressEnabled: false - # 开发环境配置 server: # 服务器的HTTP端口,默认为80 @@ -75,24 +74,27 @@ spring: base: OU=bp,DC=bpsemi,DC=com username: administrator@bpsemi.com password: Bps@2831! + redis: - host: 127.0.0.1 + host: 192.168.2.88 port: 6379 - password: "2129" - timeout: 3600000 + password: "bpsemi2021" + timeout: 30000 database: 2 lettuce: pool: max-active: 100 max-idle: 10 min-idle: 0 - max-wait: 3600000 + max-wait: 30000 cluster: refresh: adaptive: true #20秒自动刷新一次 period: 20 + # MyBatis + mybatis: # 搜索指定包别名 typeAliasesPackage: com.ruoyi.**.domain @@ -183,4 +185,6 @@ express: #云平台相关(非必填) #登录云平台 https://cloud.kuaidi100.com/buyer/user/info secret_key: - secret_secret: \ No newline at end of file + secret_secret: + + diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/utils/poi/ExcelUtil.java b/ruoyi-common/src/main/java/com/ruoyi/common/utils/poi/ExcelUtil.java index 1456fff81..f60fd299a 100644 --- a/ruoyi-common/src/main/java/com/ruoyi/common/utils/poi/ExcelUtil.java +++ b/ruoyi-common/src/main/java/com/ruoyi/common/utils/poi/ExcelUtil.java @@ -1102,7 +1102,7 @@ public class ExcelUtil Cell cell = row.getCell(column); if (StringUtils.isNotNull(cell)) { - if (cell.getCellTypeEnum() == CellType.NUMERIC || cell.getCellTypeEnum() == CellType.FORMULA) + if (cell.getCellType() == CellType.NUMERIC || cell.getCellType() == CellType.FORMULA) { val = cell.getNumericCellValue(); if (DateUtil.isCellDateFormatted(cell)) @@ -1121,15 +1121,15 @@ public class ExcelUtil } } } - else if (cell.getCellTypeEnum() == CellType.STRING) + else if (cell.getCellType() == CellType.STRING) { val = cell.getStringCellValue(); } - else if (cell.getCellTypeEnum() == CellType.BOOLEAN) + else if (cell.getCellType() == CellType.BOOLEAN) { val = cell.getBooleanCellValue(); } - else if (cell.getCellTypeEnum() == CellType.ERROR) + else if (cell.getCellType() == CellType.ERROR) { val = cell.getErrorCellValue(); } @@ -1158,7 +1158,7 @@ public class ExcelUtil for (int i = row.getFirstCellNum(); i < row.getLastCellNum(); i++) { Cell cell = row.getCell(i); - if (cell != null && cell.getCellTypeEnum() != CellType.BLANK) + if (cell != null && cell.getCellType() != CellType.BLANK) { return false; }