kettleV2.0.1:增加队列.增加状态和最后成功时间

This commit is contained in:
18326186802 2021-08-12 14:05:23 +08:00
parent 62ece10f60
commit 2ffac19596
11 changed files with 187 additions and 186 deletions

View File

@ -1,105 +0,0 @@
package com.ruoyi.kettle.controller;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.page.TableDataInfo;
import com.ruoyi.kettle.domain.KettleJob;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Controller
@RequestMapping("/redis/stream")
public class RedisStreamController extends BaseController {
String koneConsumer="koneConsumer";
String koneStream = "koneStream2";
String koneGroup= "koneGroup2";
int i=1;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private JedisPool jedisPool;
@GetMapping("/createComsumer")
public AjaxResult createCrgoup() {
Jedis jedis = jedisPool.getResource();
//String key, String groupname, StreamEntryID id, boolean makeStream
/**
* key为stream name, group为消费组id为上次读取的位置如果空则重新读取makeStream是否创建流已有的话就不用创建
*/
System.out.println(jedis.xgroupCreate(koneStream, koneGroup, null,true));
return AjaxResult.success();
}
@GetMapping("/add")
@ResponseBody
public AjaxResult addMessage() {
//这里可以添加更多的属性
Map map = new HashMap();
map.put("date"+i++, System.currentTimeMillis() + "");
Jedis jedis = jedisPool.getResource();
jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
return AjaxResult.success();
}
@ResponseBody
@GetMapping("/read")
public AjaxResult readGroup() {
Jedis jedis = jedisPool.getResource();
Map<String,StreamEntryID> t = new HashMap();
t.put(koneStream, null);//null 则为 > 重头读起也可以为$接受新消息还可以是上一次未读完的消息id
Map.Entry e = null;
for(Map.Entry c:t.entrySet()){
e=c;
}
//noAck为false的话需要手动acktrue则自动ack. commsumer新建的方式为xreadgroup
System.out.println("开始:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms")));
List<Map.Entry<String, StreamEntryID>> list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 0, false, e);
System.out.println("结束:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms")));
if(list ==null){
System.out.println("list为空");
return AjaxResult.error("list为空");
}
for (Map.Entry m : list) {
System.out.println(m.getKey() + "---" + m.getValue().getClass());
if (m.getValue() instanceof ArrayList) {
List<StreamEntry> l = (List) m.getValue();
Map<String, String> result = l.get(0).getFields();
for (Map.Entry entry : result.entrySet()) {
System.out.println(entry.getKey() + "---" + entry.getValue());
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
jedis.xack(koneStream, koneGroup, l.get(0).getID());
System.out.println("消息消费成功");
}
}
return AjaxResult.success();
}
}

View File

@ -69,5 +69,5 @@ public interface IKettleJobService
public AjaxResult runJobQuartz(String id, String jobName); public AjaxResult runJobQuartz(String id, String jobName);
void runJobRightNow(Long valueOf); void runJobRightNow(Long valueOf, String userId);
} }

View File

@ -82,5 +82,5 @@ public interface IKettleTransService
Long checkQuartzExist(String checkStr); Long checkQuartzExist(String checkStr);
void runTransRightNow(Long valueOf); void runTransRightNow(Long valueOf, String userId);
} }

View File

@ -2,17 +2,18 @@ package com.ruoyi.kettle.service.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.domain.entity.SysRole; import com.ruoyi.common.core.domain.entity.SysRole;
import com.ruoyi.common.utils.DateUtils; import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.common.utils.security.PermissionUtils; import com.ruoyi.common.utils.security.PermissionUtils;
import com.ruoyi.kettle.domain.KettleTrans;
import com.ruoyi.kettle.domain.XRepository; import com.ruoyi.kettle.domain.XRepository;
import com.ruoyi.kettle.mapper.XRepositoryMapper; import com.ruoyi.kettle.mapper.XRepositoryMapper;
import com.ruoyi.kettle.tools.KettleUtil; import com.ruoyi.kettle.tools.KettleUtil;
import com.ruoyi.kettle.tools.RedisStreamUtil; import com.ruoyi.kettle.tools.RedisStreamUtil;
import com.ruoyi.system.service.IWechatApiService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -43,6 +44,8 @@ public class KettleJobServiceImpl implements IKettleJobService
@Autowired @Autowired
private RedisStreamUtil redisStreamUtil; private RedisStreamUtil redisStreamUtil;
@Autowired
IWechatApiService wechatApiService;
/** /**
* 查询作业调度 * 查询作业调度
* *
@ -189,7 +192,7 @@ public class KettleJobServiceImpl implements IKettleJobService
} }
@Override @Override
public void runJobRightNow(Long id) { public void runJobRightNow(Long id, String userId) {
KettleJob kettleJob = kettleJobMapper.selectKettleJobById(id); KettleJob kettleJob = kettleJobMapper.selectKettleJobById(id);
if(kettleJob ==null){ if(kettleJob ==null){
log.error("作业不存在!"); log.error("作业不存在!");
@ -204,22 +207,26 @@ public class KettleJobServiceImpl implements IKettleJobService
//更新一下状态 //更新一下状态
kettleJob.setJobStatus("运行中"); kettleJob.setJobStatus("运行中");
kettleJobMapper.updateKettleJob(kettleJob); kettleJobMapper.updateKettleJob(kettleJob);
String path = kettleJob.getJobPath(); StringBuilder title = new StringBuilder(kettleJob.getJobName()).append(".kjb 执行结果:");
StringBuilder msg = new StringBuilder(kettleJob.getJobName()).append(".kjb 执行结果:");
try { try {
kettleUtil.KETTLE_LOG_LEVEL=kettleJob.getJobLogLevel(); kettleUtil.callJob(kettleJob,repository,null,null);
kettleUtil.KETTLE_REPO_ID=String.valueOf(kettleJob.getJobRepositoryId());
kettleUtil.KETTLE_REPO_NAME=repository.getRepoName();
kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir();
kettleUtil.callJob(path,kettleJob.getJobName(),null,null);
kettleJob.setJobStatus("成功"); kettleJob.setJobStatus("成功");
kettleJob.setLastSucceedTime(DateUtils.getNowDate()); kettleJob.setLastSucceedTime(DateUtils.getNowDate());
kettleJobMapper.updateKettleJob(kettleJob); kettleJobMapper.updateKettleJob(kettleJob);
title.append("成功!");
msg.append("成功!");
} catch (Exception e) { } catch (Exception e) {
kettleJob.setJobStatus("异常"); kettleJob.setJobStatus("异常");
kettleJobMapper.updateKettleJob(kettleJob); kettleJobMapper.updateKettleJob(kettleJob);
title.append("异常!");
msg.append("异常!");
e.printStackTrace(); e.printStackTrace();
} }
List<String> userIdList = new ArrayList<>();
userIdList.add(userId);
Map<String, String> resultMap = wechatApiService.SendTextCardMessageToWechatUser(userIdList,title.toString(),msg.toString(),"http://report.bpsemi.cn:8081/it_war");
log.info("job微信消息发送结果"+resultMap);
} }
@Override @Override
public List<String> queryJobLog(KettleJob kettleJob) { public List<String> queryJobLog(KettleJob kettleJob) {

View File

@ -2,6 +2,7 @@ package com.ruoyi.kettle.service.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.core.domain.AjaxResult;
@ -13,6 +14,7 @@ import com.ruoyi.kettle.mapper.XRepositoryMapper;
import com.ruoyi.kettle.service.IKettleTransService; import com.ruoyi.kettle.service.IKettleTransService;
import com.ruoyi.kettle.tools.KettleUtil; import com.ruoyi.kettle.tools.KettleUtil;
import com.ruoyi.kettle.tools.RedisStreamUtil; import com.ruoyi.kettle.tools.RedisStreamUtil;
import com.ruoyi.system.service.IWechatApiService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -42,7 +44,8 @@ public class KettleTransServiceImpl implements IKettleTransService
@Autowired @Autowired
private RedisStreamUtil redisStreamUtil; private RedisStreamUtil redisStreamUtil;
@Autowired
IWechatApiService wechatApiService;
/** /**
* 查询转换 * 查询转换
* *
@ -186,7 +189,7 @@ public class KettleTransServiceImpl implements IKettleTransService
} }
@Override @Override
public void runTransRightNow(Long id) { public void runTransRightNow(Long id, String userId) {
KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(id); KettleTrans kettleTrans = kettleTransMapper.selectKettleTransById(id);
if(kettleTrans ==null || kettleTrans.getId()==null){ if(kettleTrans ==null || kettleTrans.getId()==null){
log.error("转换不存在!:"+id); log.error("转换不存在!:"+id);
@ -200,21 +203,27 @@ public class KettleTransServiceImpl implements IKettleTransService
//更新状态未运行中 //更新状态未运行中
kettleTrans.setTransStatus("运行中"); kettleTrans.setTransStatus("运行中");
kettleTransMapper.updateKettleTrans(kettleTrans); kettleTransMapper.updateKettleTrans(kettleTrans);
String path = kettleTrans.getTransPath(); StringBuilder title = new StringBuilder(kettleTrans.getTransName()).append(".ktr 执行结果:");
StringBuilder msg = new StringBuilder(kettleTrans.getTransName()).append(".ktr 执行结果:");
try { try {
kettleUtil.KETTLE_LOG_LEVEL=kettleTrans.getTransLogLevel(); kettleUtil.callTrans(kettleTrans,repository,null,null);
kettleUtil.KETTLE_REPO_ID=String.valueOf(kettleTrans.getTransRepositoryId());
kettleUtil.KETTLE_REPO_NAME=repository.getRepoName();
kettleUtil.KETTLE_REPO_PATH=repository.getBaseDir();
kettleUtil.callTrans(path,kettleTrans.getTransName(),null,null);
kettleTrans.setTransStatus("成功"); kettleTrans.setTransStatus("成功");
kettleTrans.setLastSucceedTime(DateUtils.getNowDate()); kettleTrans.setLastSucceedTime(DateUtils.getNowDate());
kettleTransMapper.updateKettleTrans(kettleTrans); kettleTransMapper.updateKettleTrans(kettleTrans);
title.append("成功!");
msg.append("成功!");
} catch (Exception e) { } catch (Exception e) {
kettleTrans.setTransStatus("异常"); kettleTrans.setTransStatus("异常");
kettleTransMapper.updateKettleTrans(kettleTrans); kettleTransMapper.updateKettleTrans(kettleTrans);
title.append("异常!");
msg.append("异常!");
log.error(id+"的trans执行失败:"+e.getMessage()); log.error(id+"的trans执行失败:"+e.getMessage());
} }
List<String> userIdList = new ArrayList<>();
userIdList.add(userId);
Map<String, String> resultMap = wechatApiService.SendTextCardMessageToWechatUser(userIdList,title.toString(),msg.toString(),"http://report.bpsemi.cn:8081/it_war");
log.info("trans微信消息发送结果"+resultMap);
} }
/** /**
* @Description:查询抓换执行日志 * @Description:查询抓换执行日志

View File

@ -1,7 +1,10 @@
package com.ruoyi.kettle.tools; package com.ruoyi.kettle.tools;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
@ -13,6 +16,12 @@ public class CommandLineRunnerImpl implements CommandLineRunner {
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
redisStreamUtil.readGroup();
new Thread(){
public void run() {
redisStreamUtil.readGroup();
}
}.start();
} }
} }

View File

@ -1,6 +1,9 @@
package com.ruoyi.kettle.tools; package com.ruoyi.kettle.tools;
import com.ruoyi.common.config.datasource.DynamicDataSourceContextHolder; import com.ruoyi.common.config.datasource.DynamicDataSourceContextHolder;
import com.ruoyi.kettle.domain.KettleJob;
import com.ruoyi.kettle.domain.KettleTrans;
import com.ruoyi.kettle.domain.XRepository;
import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleException;
@ -29,27 +32,21 @@ public class KettleUtil {
public static final Logger log = LoggerFactory.getLogger(KettleUtil.class); public static final Logger log = LoggerFactory.getLogger(KettleUtil.class);
public String KETTLE_LOG_LEVEL = "basic";
public String KETTLE_REPO_ID = "2";
public String KETTLE_REPO_NAME = "koneTest";
public String KETTLE_REPO_DESC = "DESC";
public String KETTLE_REPO_PATH = "D:\\etl";
/** /**
* 执行文件资源库转换 * 执行文件资源库转换
* @param transPath 转换路径相对于资源库
* @param transName 转换名称不需要后缀
* @param namedParams 命名参数 * @param namedParams 命名参数
* @param clParams 命令行参数 * @param clParams 命令行参数
*/ */
public void callTrans(String transPath, String transName, Map<String,String> namedParams, String[] clParams) throws Exception { public void callTrans(KettleTrans kettleTrans, XRepository xrepository, Map<String, String> namedParams, String[] clParams) throws Exception {
KettleEnv.init(); KettleEnv.init();
DatabaseMeta databaseMeta=new DatabaseMeta("kettle_trans_log", "mysql", "Native(JDBC)", DatabaseMeta databaseMeta=new DatabaseMeta("kettle_trans_log", "mysql", "Native(JDBC)",
"192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123"); "192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123");
String msg; String msg;
KettleFileRepository repo = this.fileRepositoryCon(); KettleFileRepository repo = this.fileRepositoryCon(xrepository);
TransMeta transMeta = this.loadTrans(repo, transPath, transName); TransMeta transMeta = this.loadTrans(repo, kettleTrans.getTransPath(), kettleTrans.getTransName());
transMeta.addDatabase(databaseMeta); transMeta.addDatabase(databaseMeta);
VariableSpace space=new Variables(); VariableSpace space=new Variables();
@ -71,7 +68,7 @@ public class KettleUtil {
trans.setParameterValue(entry.getKey(), entry.getValue()); trans.setParameterValue(entry.getKey(), entry.getValue());
} }
} }
trans.setLogLevel(this.getLogerLevel(KETTLE_LOG_LEVEL)); trans.setLogLevel(this.getLogerLevel(kettleTrans.getTransLogLevel()));
//执行 //执行
trans.execute(clParams); trans.execute(clParams);
trans.waitUntilFinished(); trans.waitUntilFinished();
@ -94,16 +91,15 @@ public class KettleUtil {
/** /**
* 执行文件资源库job * 执行文件资源库job
* @param jobName
* @throws Exception * @throws Exception
*/ */
public boolean callJob(String jobPath, String jobName, Map<String,String> variables, String[] clParams) throws Exception { public boolean callJob(KettleJob kettleJob,XRepository xRepository, Map<String,String> variables, String[] clParams) throws Exception {
KettleEnv.init(); KettleEnv.init();
String msg; String msg;
DatabaseMeta databaseMeta=new DatabaseMeta("kettle_job_log", "mysql", "Native(JDBC)", DatabaseMeta databaseMeta=new DatabaseMeta("kettle_job_log", "mysql", "Native(JDBC)",
"192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123"); "192.168.2.18","bps?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8", "3306", "root", "abc.123");
KettleFileRepository repo = this.fileRepositoryCon(); KettleFileRepository repo = this.fileRepositoryCon(xRepository);
JobMeta jobMeta = this.loadJob(repo, jobPath, jobName); JobMeta jobMeta = this.loadJob(repo, kettleJob.getJobPath(), kettleJob.getJobName());
jobMeta.addDatabase(databaseMeta); jobMeta.addDatabase(databaseMeta);
VariableSpace space=new Variables(); VariableSpace space=new Variables();
space.setVariable("test","fromDbName"); space.setVariable("test","fromDbName");
@ -121,7 +117,7 @@ public class KettleUtil {
} }
} }
//设置日志级别 //设置日志级别
job.setLogLevel(this.getLogerLevel(KETTLE_LOG_LEVEL)); job.setLogLevel(this.getLogerLevel(kettleJob.getJobLogLevel()));
job.setArguments(clParams); job.setArguments(clParams);
job.start(); job.start();
job.waitUntilFinished(); job.waitUntilFinished();
@ -208,9 +204,9 @@ public class KettleUtil {
* @param jobName * @param jobName
* @throws Exception * @throws Exception
*/ */
public void callNativeJob(String jobName) throws Exception { /* public void callNativeJob(String jobName) throws Exception {
// 初始化 // 初始化
/*KettleEnvironment.init();*/ *//*KettleEnvironment.init();*//*
JobMeta jobMeta = new JobMeta(jobName, null); JobMeta jobMeta = new JobMeta(jobName, null);
Job job = new Job(null, jobMeta); Job job = new Job(null, jobMeta);
@ -223,7 +219,7 @@ public class KettleUtil {
if (job.getErrors() > 0) { if (job.getErrors() > 0) {
throw new Exception("There are errors during job exception!(执行job发生异常)"); throw new Exception("There are errors during job exception!(执行job发生异常)");
} }
} }*/
/** /**
* 取得kettle的日志级别 * 取得kettle的日志级别
@ -253,14 +249,14 @@ public class KettleUtil {
/** /**
* 配置kettle文件库资源库环境 * 配置kettle文件库资源库环境
**/ **/
public KettleFileRepository fileRepositoryCon() throws KettleException { public KettleFileRepository fileRepositoryCon(XRepository xRepository) throws KettleException {
String msg; String msg;
//初始化 //初始化
/*EnvUtil.environmentInit(); /*EnvUtil.environmentInit();
KettleEnvironment.init();*/ KettleEnvironment.init();*/
//资源库元对象 //资源库元对象
KettleFileRepositoryMeta fileRepositoryMeta = new KettleFileRepositoryMeta(this.KETTLE_REPO_ID, this.KETTLE_REPO_NAME, this.KETTLE_REPO_DESC, this.KETTLE_REPO_PATH); KettleFileRepositoryMeta fileRepositoryMeta = new KettleFileRepositoryMeta(String.valueOf(xRepository.getId()), xRepository.getRepoName(), xRepository.getRemark(), xRepository.getBaseDir());
// 文件形式的资源库 // 文件形式的资源库
KettleFileRepository repo = new KettleFileRepository(); KettleFileRepository repo = new KettleFileRepository();
repo.init(fileRepositoryMeta); repo.init(fileRepositoryMeta);
@ -268,11 +264,11 @@ public class KettleUtil {
repo.connect("", "");//默认的连接资源库的用户名和密码 repo.connect("", "");//默认的连接资源库的用户名和密码
if (repo.isConnected()) { if (repo.isConnected()) {
msg = "kettle文件库资源库【" + KETTLE_REPO_PATH + "】连接成功"; msg = "kettle文件库资源库【" + xRepository.getBaseDir() + "】连接成功";
log.info(msg); log.info(msg);
return repo; return repo;
} else { } else {
msg = "kettle文件库资源库【" + KETTLE_REPO_PATH + "】连接失败"; msg = "kettle文件库资源库【" + xRepository.getBaseDir() + "】连接失败";
log.error(msg); log.error(msg);
throw new KettleException(msg); throw new KettleException(msg);
} }

View File

@ -1,18 +1,23 @@
package com.ruoyi.kettle.tools; package com.ruoyi.kettle.tools;
import com.ruoyi.common.utils.security.PermissionUtils;
import com.ruoyi.kettle.domain.KettleJob; import com.ruoyi.kettle.domain.KettleJob;
import com.ruoyi.kettle.domain.KettleTrans; import com.ruoyi.kettle.domain.KettleTrans;
import com.ruoyi.kettle.service.IKettleJobService; import com.ruoyi.kettle.service.IKettleJobService;
import com.ruoyi.kettle.service.IKettleTransService; import com.ruoyi.kettle.service.IKettleTransService;
import com.ruoyi.system.service.ISysConfigService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPool;
import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.StreamEntryID;
import java.net.InetAddress;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
@ -20,15 +25,26 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* @Description:
* 现在redis安装后新建一个stream: XADD koneStream * user kang msg Hello
*再把他读掉:XREAD streams koneStream 0
*最后创建一个这个steam的消费者:XGROUP CREATE koneStream koneGroup 0
* @Author: Kone.wang
* @Date: 2021/8/10 13:19
**/
@Component @Component
public class RedisStreamUtil { public class RedisStreamUtil {
private static final Logger log = LoggerFactory.getLogger(RedisStreamUtil.class); private static final Logger log = LoggerFactory.getLogger(RedisStreamUtil.class);
String koneConsumer="koneConsumer";
String koneStream = "koneStream2"; String koneConsumer="bpsemi_consumer";
//
String koneGroup= "koneGroup2"; // @Value("${stream.key}")
// String koneStream ;
// @Value("${stream.group}")
// String koneGroup ;
@Value("${spring.redis.timeout}")
Long waitTIme;
@Autowired @Autowired
private JedisPool jedisPool; private JedisPool jedisPool;
@ -38,7 +54,8 @@ public class RedisStreamUtil {
@Autowired @Autowired
private IKettleJobService jobService; private IKettleJobService jobService;
@Autowired
private ISysConfigService configService;
/** /**
* @Description: 往队列中插入trans * @Description: 往队列中插入trans
* @Author: Kone.wang * @Author: Kone.wang
@ -47,15 +64,40 @@ public class RedisStreamUtil {
* @return: com.ruoyi.common.core.domain.AjaxResult * @return: com.ruoyi.common.core.domain.AjaxResult
**/ **/
public void addKettleTrans(KettleTrans trans) { public void addKettleTrans(KettleTrans trans) {
//获取主机ip
String localAddr = configService.selectConfigByKey("sys.local.addr");
localAddr =localAddr!=null?localAddr:"192.168.2.84";
String koneStream="bpsemi_test";
try{
InetAddress addr = InetAddress.getLocalHost();
String address = addr.getHostAddress();
if(address.equals(localAddr)){
koneStream="bpsemi";
}
}catch (Exception e){
log.error("addKettleTrans()获取主机ip异常:"+e);
}
String transName=trans.getTransName(); String transName=trans.getTransName();
Long trandId = trans.getId(); Long trandId = trans.getId();
//这里可以添加更多的属性 //定时任务跑的时候这个会报错,所以捕获一下然后设置默认的
Map map = new HashMap(); String userId ="";
map.put("trans_"+trandId, transName); try{
Jedis jedis = jedisPool.getResource(); userId = String.valueOf(PermissionUtils.getPrincipalProperty("userId"));
}catch (Exception e){
log.warn("定时任务执行的,默认发送给天宁吧408");
userId="454";
}
log.info(userId+"开始增加:trans_"+trandId+"@"+userId+":::"+transName);
//这里可以添加更多的属性
Map<String,String> map = new HashMap<String,String>();
map.put("trans_"+trandId+"@"+userId, transName);
Jedis jedis = jedisPool.getResource();
StreamEntryID id =jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map); StreamEntryID id =jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
log.info(userId+"成功增加:trans_"+trandId+"@"+userId+":::"+transName+"[StreamEntryID:"+id+"]");
} }
/** /**
@ -66,14 +108,37 @@ public class RedisStreamUtil {
* @return: com.ruoyi.common.core.domain.AjaxResult * @return: com.ruoyi.common.core.domain.AjaxResult
**/ **/
public void addKettleJob(KettleJob job) { public void addKettleJob(KettleJob job) {
//获取主机ip
String localAddr = configService.selectConfigByKey("sys.local.addr");
localAddr =localAddr!=null?localAddr:"192.168.2.84";
String koneStream="bpsemi_test";
try{
InetAddress addr = InetAddress.getLocalHost();
String address = addr.getHostAddress();
if(address.equals(localAddr)){
koneStream="bpsemi";
}
}catch (Exception e){
log.error("addKettleJob()获取主机ip异常:"+e);
}
String jobName=job.getJobName(); String jobName=job.getJobName();
Long jobId = job.getId(); Long jobId = job.getId();
String userId ="";
try{
userId = String.valueOf(PermissionUtils.getPrincipalProperty("userId"));
}catch (Exception e){
log.warn("定时任务执行的,默认发送给天宁吧408");
userId="454";
}
log.info(userId+"开始增加:job_"+jobId+"@"+userId+":::"+jobName);
//这里可以添加更多的属性 //这里可以添加更多的属性
Map map = new HashMap(); Map<String,String> map = new HashMap<String,String>();
map.put("job_"+jobId, jobName); map.put("job_"+jobId+"@"+userId, jobName);
Jedis jedis = jedisPool.getResource(); Jedis jedis = jedisPool.getResource();
jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map); StreamEntryID id = jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
log.info(userId+"成功增加:job_"+jobId+"@"+userId+":::"+jobName+"[StreamEntryID:"+id+"]");
} }
/** /**
* @Description: 循环重队列中读消息 * @Description: 循环重队列中读消息
@ -82,27 +147,42 @@ public class RedisStreamUtil {
* @return: void * @return: void
**/ **/
public void readGroup() { public void readGroup() {
//获取主机ip
String localAddr = configService.selectConfigByKey("sys.local.addr");
localAddr =localAddr!=null?localAddr:"192.168.2.84";
String koneStream="bpsemi_test";
String koneGroup="bpsemi_group_test";
String koneConsumer="bpsemi_consumer";
try{
InetAddress addr = InetAddress.getLocalHost();
String address = addr.getHostAddress();
if(address.equals(localAddr)){
koneStream="bpsemi";
koneGroup="bpsemi_group";
}
}catch (Exception e){
log.error("addKettleJob()获取主机ip异常:"+e);
}
while (true){ while (true){
Jedis jedis = jedisPool.getResource(); Jedis jedis = jedisPool.getResource();
Map<String,StreamEntryID> t = new HashMap(); Map<String,StreamEntryID> t = new HashMap<String,StreamEntryID>();
List<Map.Entry<String, StreamEntryID>> list = new ArrayList<>(); List<Map.Entry<String, List<StreamEntry>>> list = new ArrayList<Map.Entry<String, List<StreamEntry>>>();
t.put(koneStream, null);//null 则为 > 重头读起也可以为$接受新消息还可以是上一次未读完的消息id t.put(koneStream, null);//null 则为 > 重头读起也可以为$接受新消息还可以是上一次未读完的消息id
Map.Entry e = null; Map.Entry e = null;
for(Map.Entry c:t.entrySet()){ for(Map.Entry c:t.entrySet()){
e=c; e=c;
} }
//noAck为false的话需要手动acktrue则自动ack. commsumer新建的方式为xreadgroup //noAck为false的话需要手动acktrue则自动ack. commsumer新建的方式为xreadgroup
System.out.println("开始:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss ms"))); log.info("开始读消息");
try{ try{
list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 3600000, false, e); list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 30000, false, e);
}catch (Exception ex){ }catch (Exception ex){
log.error("超时了!!!!!!!!"); log.error("超时了!!!!!!!!");
} }
System.out.println("结束:"+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"))); log.info("读消息结束!");
if(list ==null){ if(list ==null){
log.error("list为空"); log.error("读到的list为空");
}else{ }else{
for (Map.Entry m : list) { for (Map.Entry m : list) {
if (m.getValue() instanceof ArrayList) { if (m.getValue() instanceof ArrayList) {
@ -113,20 +193,21 @@ public class RedisStreamUtil {
if(entry.getKey() != null){ if(entry.getKey() != null){
String key = String.valueOf(entry.getKey()); String key = String.valueOf(entry.getKey());
String value =String.valueOf(entry.getValue()); String value =String.valueOf(entry.getValue());
String id=key.substring(key.indexOf("_")+1); String id=key.substring(key.indexOf("_")+1,key.indexOf("@"));
String userId=key.substring(key.indexOf("@")+1);
if(key.startsWith("trans_")){ if(key.startsWith("trans_")){
log.info(value+"的trans:开始执行"); log.info(value+"的trans:开始执行");
transService.runTransRightNow(Long.valueOf(id)); transService.runTransRightNow(Long.valueOf(id),userId);
log.info(value+"的trans:结束执行"); log.info(value+"的trans:结束执行");
}else if(key.startsWith("job_")){ }else if(key.startsWith("job_")){
log.info(value+"的job:开始执行"); log.info(value+"的job:开始执行");
jobService.runJobRightNow(Long.valueOf(id)); jobService.runJobRightNow(Long.valueOf(id),userId);
log.info(value+"的job:结束执行"); log.info(value+"的job:结束执行");
} }
} }
} }
jedis.xack(koneStream, koneGroup, l.get(0).getID()); long id = jedis.xack(koneStream, koneGroup, l.get(0).getID());
System.out.println("消息消费成功"); log.info("消息消费成功:"+id);
} }
} }
} }

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@ -30,7 +30,7 @@
<jna.version>5.8.0</jna.version> <jna.version>5.8.0</jna.version>
<commons.io.version>2.11.0</commons.io.version> <commons.io.version>2.11.0</commons.io.version>
<commons.fileupload.version>1.4</commons.fileupload.version> <commons.fileupload.version>1.4</commons.fileupload.version>
<poi.version>3.17</poi.version> <poi.version>4.1.2</poi.version>
<velocity.version>1.7</velocity.version> <velocity.version>1.7</velocity.version>
</properties> </properties>

View File

@ -12,7 +12,6 @@ ruoyi:
profile: C:/bps-it/uploadPath profile: C:/bps-it/uploadPath
# 获取ip地址开关 # 获取ip地址开关
addressEnabled: false addressEnabled: false
# 开发环境配置 # 开发环境配置
server: server:
# 服务器的HTTP端口默认为80 # 服务器的HTTP端口默认为80
@ -75,24 +74,27 @@ spring:
base: OU=bp,DC=bpsemi,DC=com base: OU=bp,DC=bpsemi,DC=com
username: administrator@bpsemi.com username: administrator@bpsemi.com
password: Bps@2831! password: Bps@2831!
redis: redis:
host: 127.0.0.1 host: 192.168.2.88
port: 6379 port: 6379
password: "2129" password: "bpsemi2021"
timeout: 3600000 timeout: 30000
database: 2 database: 2
lettuce: lettuce:
pool: pool:
max-active: 100 max-active: 100
max-idle: 10 max-idle: 10
min-idle: 0 min-idle: 0
max-wait: 3600000 max-wait: 30000
cluster: cluster:
refresh: refresh:
adaptive: true adaptive: true
#20秒自动刷新一次 #20秒自动刷新一次
period: 20 period: 20
# MyBatis # MyBatis
mybatis: mybatis:
# 搜索指定包别名 # 搜索指定包别名
typeAliasesPackage: com.ruoyi.**.domain typeAliasesPackage: com.ruoyi.**.domain
@ -184,3 +186,5 @@ express:
#登录云平台 https://cloud.kuaidi100.com/buyer/user/info #登录云平台 https://cloud.kuaidi100.com/buyer/user/info
secret_key: secret_key:
secret_secret: secret_secret:

View File

@ -1102,7 +1102,7 @@ public class ExcelUtil<T>
Cell cell = row.getCell(column); Cell cell = row.getCell(column);
if (StringUtils.isNotNull(cell)) if (StringUtils.isNotNull(cell))
{ {
if (cell.getCellTypeEnum() == CellType.NUMERIC || cell.getCellTypeEnum() == CellType.FORMULA) if (cell.getCellType() == CellType.NUMERIC || cell.getCellType() == CellType.FORMULA)
{ {
val = cell.getNumericCellValue(); val = cell.getNumericCellValue();
if (DateUtil.isCellDateFormatted(cell)) if (DateUtil.isCellDateFormatted(cell))
@ -1121,15 +1121,15 @@ public class ExcelUtil<T>
} }
} }
} }
else if (cell.getCellTypeEnum() == CellType.STRING) else if (cell.getCellType() == CellType.STRING)
{ {
val = cell.getStringCellValue(); val = cell.getStringCellValue();
} }
else if (cell.getCellTypeEnum() == CellType.BOOLEAN) else if (cell.getCellType() == CellType.BOOLEAN)
{ {
val = cell.getBooleanCellValue(); val = cell.getBooleanCellValue();
} }
else if (cell.getCellTypeEnum() == CellType.ERROR) else if (cell.getCellType() == CellType.ERROR)
{ {
val = cell.getErrorCellValue(); val = cell.getErrorCellValue();
} }
@ -1158,7 +1158,7 @@ public class ExcelUtil<T>
for (int i = row.getFirstCellNum(); i < row.getLastCellNum(); i++) for (int i = row.getFirstCellNum(); i < row.getLastCellNum(); i++)
{ {
Cell cell = row.getCell(i); Cell cell = row.getCell(i);
if (cell != null && cell.getCellTypeEnum() != CellType.BLANK) if (cell != null && cell.getCellType() != CellType.BLANK)
{ {
return false; return false;
} }