增加关闭redis池资源

This commit is contained in:
18326186802 2021-08-13 09:29:59 +08:00
parent 2da88e0656
commit fdacd1b4ab
1 changed files with 84 additions and 44 deletions

View File

@ -96,8 +96,21 @@ public class RedisStreamUtil {
Map<String,String> map = new HashMap<String,String>();
map.put("trans_"+trandId+"@"+userId, transName);
Jedis jedis = jedisPool.getResource();
StreamEntryID id =jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
log.info(userId+"成功增加:trans_"+trandId+"@"+userId+":::"+transName+"[StreamEntryID:"+id+"]");
try{
StreamEntryID id =jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
log.info(userId+"成功增加:trans_"+trandId+"@"+userId+":::"+transName+"[StreamEntryID:"+id+"]");
}catch (Exception e){
log.error(userId+"失败增加:trans"+trandId+"@"+userId+":::"+transName+"]");
}finally {
if (jedis != null) {
try {
jedis.close();
} catch (Exception e) {
}
}
}
}
/**
@ -136,9 +149,21 @@ public class RedisStreamUtil {
Map<String,String> map = new HashMap<String,String>();
map.put("job_"+jobId+"@"+userId, jobName);
Jedis jedis = jedisPool.getResource();
try{
StreamEntryID id = jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
log.info(userId+"成功增加:job_"+jobId+"@"+userId+":::"+jobName+"[StreamEntryID:"+id+"]");
}catch (Exception e){
log.error(userId+"失败增加:job_"+jobId+"@"+userId+":::"+jobName+"]");
}finally {
if (jedis != null) {
try {
jedis.close();
} catch (Exception e) {
}
}
}
StreamEntryID id = jedis.xadd(koneStream, new StreamEntryID().NEW_ENTRY, map);
log.info(userId+"成功增加:job_"+jobId+"@"+userId+":::"+jobName+"[StreamEntryID:"+id+"]");
}
/**
* @Description: 循环重队列中读消息
@ -164,54 +189,69 @@ public class RedisStreamUtil {
log.error("addKettleJob()获取主机ip异常:"+e);
}
while (true){
Jedis jedis = jedisPool.getResource();
Map<String,StreamEntryID> t = new HashMap<String,StreamEntryID>();
List<Map.Entry<String, List<StreamEntry>>> list = new ArrayList<Map.Entry<String, List<StreamEntry>>>();
t.put(koneStream, null);//null 则为 > 重头读起也可以为$接受新消息还可以是上一次未读完的消息id
Map.Entry e = null;
for(Map.Entry c:t.entrySet()){
e=c;
}
//noAck为false的话需要手动acktrue则自动ack. commsumer新建的方式为xreadgroup
log.info("开始读消息");
try{
list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 30000, false, e);
}catch (Exception ex){
log.error("超时了!!!!!!!!");
}
log.info("读消息结束!");
if(list ==null){
log.error("读到的list为空");
Jedis jedis = jedisPool.getResource();
if(jedis ==null){
return;
}else{
for (Map.Entry m : list) {
if (m.getValue() instanceof ArrayList) {
List<StreamEntry> l = (List) m.getValue();
Map<String, String> result = l.get(0).getFields();
for (Map.Entry entry : result.entrySet()) {
System.out.println(entry.getKey() + "---" + entry.getValue());
if(entry.getKey() != null){
String key = String.valueOf(entry.getKey());
String value =String.valueOf(entry.getValue());
String id=key.substring(key.indexOf("_")+1,key.indexOf("@"));
String userId=key.substring(key.indexOf("@")+1);
if(key.startsWith("trans_")){
log.info(value+"的trans:开始执行");
transService.runTransRightNow(Long.valueOf(id),userId);
log.info(value+"的trans:结束执行");
}else if(key.startsWith("job_")){
log.info(value+"的job:开始执行");
jobService.runJobRightNow(Long.valueOf(id),userId);
log.info(value+"的job:结束执行");
try{
Map<String,StreamEntryID> t = new HashMap<String,StreamEntryID>();
List<java.util.Map.Entry<java.lang.String,java.util.List<redis.clients.jedis.StreamEntry>>> list = new ArrayList<java.util.Map.Entry<java.lang.String,java.util.List<redis.clients.jedis.StreamEntry>>>();
t.put(koneStream, null);//null 则为 > 重头读起也可以为$接受新消息还可以是上一次未读完的消息id
Map.Entry<java.lang.String,redis.clients.jedis.StreamEntryID> e = null;
for(Map.Entry<java.lang.String,redis.clients.jedis.StreamEntryID> c:t.entrySet()){
e=c;
}
//noAck为false的话需要手动acktrue则自动ack. commsumer新建的方式为xreadgroup
log.info("开始读消息");
try{
list = jedis.xreadGroup(koneGroup, koneConsumer, 1, 30000L, false, e);
}catch (Exception ex){
log.error("超时了!!!!!!!!");
}
log.info("读消息结束!");
if(list ==null){
log.error("读到的list为空");
}else{
for (Map.Entry m : list) {
if (m.getValue() instanceof ArrayList) {
List<StreamEntry> l = (List<StreamEntry>) m.getValue();
Map<String, String> result = l.get(0).getFields();
for (Map.Entry entry : result.entrySet()) {
System.out.println(entry.getKey() + "---" + entry.getValue());
if(entry.getKey() != null){
String key = String.valueOf(entry.getKey());
String value =String.valueOf(entry.getValue());
String id=key.substring(key.indexOf("_")+1,key.indexOf("@"));
String userId=key.substring(key.indexOf("@")+1);
if(key.startsWith("trans_")){
log.info(value+"的trans:开始执行");
transService.runTransRightNow(Long.valueOf(id),userId);
log.info(value+"的trans:结束执行");
}else if(key.startsWith("job_")){
log.info(value+"的job:开始执行");
jobService.runJobRightNow(Long.valueOf(id),userId);
log.info(value+"的job:结束执行");
}
}
}
long id = jedis.xack(koneStream, koneGroup, l.get(0).getID());
log.info("消息消费成功:"+id);
}
}
long id = jedis.xack(koneStream, koneGroup, l.get(0).getID());
log.info("消息消费成功:"+id);
}
}catch (Exception e){
log.error(e.getMessage());
}finally {
if (jedis != null) {
try {
jedis.close();
} catch (Exception e) {
}
}
}
}
}
}