1.java.util.concurrent & ThreadPoolExecutor 相关概念 及参考
http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html
http://heipark.iteye.com/blog/1156011
http://dongxuan.iteye.com/blog/901689
http://blog.csdn.net/wangwenhui11/article/details/6760474
2.
public final class ThreadExecutorFactory { static TraceLogger logger = new TraceLogger(ThreadExecutorFactory.class); //参看user-config.xml private static int corePoolSize ; private static int maximumPoolSize ; private static long keepAliveTime ; private static TimeUnit unit = TimeUnit.SECONDS; //线程池维护线程所允许的空闲时间的单位 private static BlockingQueue<Runnable> blockingQueue = null; static{ init(); } private static void init() { //读取配置,也可以写死 corePoolSize = Integer.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "core_pool_size")); maximumPoolSize = Integer.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "maximum_pool_size")); keepAliveTime = Long.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "keep_alive_time")); String queueSize = ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "queueSize"); if (!StringUtil.isBlank(queueSize)) { blockingQueue = new ArrayBlockingQueue<Runnable>(Integer.valueOf(queueSize)); } if (FactoryHolder.threadPool == null) { FactoryHolder.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime ,unit ,blockingQueue ,new ThreadPoolExecutor.AbortPolicy()); } } private static class FactoryHolder{ static ThreadPoolExecutor threadPool = null; } public static void execute(Runnable runnable){ FactoryHolder.threadPool.execute(runnable); logger.info(runnable + " 加入线程池!"); System.out.println(runnable + " 加入线程池!"); } }
简单监听器的运用,首先产生线程,将要监听的对象放入map中, 循环Map,当map中的元素都移出后,结束线程, 再次将要监听的对象放入map时,又会产生新的线程监听,如果在此线程没结束时 加入一个新的监听对象到map 不会启动新线程。
public class BillTaskStatusListener implements Runnable ,Serializable{ private static final long serialVersionUID = -6416044623618376341L; static TraceLogger logger = new TraceLogger(BillTaskStatusListener.class); private static final String RUN_STATUS_SUCCESS = "Y"; //成功 private static final String RUN_STATUS_FAILED = "N"; //失败 private static final String RUN_STATUS_RUNNING = "R"; //执行中 //线程安全集合,用于存储待查询结果的任务实例Id private static Map<String,DataObject> billTaskMaps = new ConcurrentHashMap<String, DataObject>(); /** * 同步添加到监听循环 * @param taskInstId */ private synchronized static void addToListener(Long taskInstId,DataObject paramObj){ if (billTaskMaps.isEmpty()) { billTaskMaps.put(String.valueOf(taskInstId), paramObj); ThreadExecutorFactory.execute(new BillTaskStatusListener()); }else { billTaskMaps.put(String.valueOf(taskInstId), paramObj); } logger.info("任务实例:" + paramObj.getString("taskInstId") + " 加入<作业-任务结果监听>"); System.out.println("任务实例:" + paramObj.getString("taskInstId") + " 加入<作业-任务结果监听>"); } /** * */ public void run() { while (!billTaskMaps.isEmpty()) { try { //获取所有监听的任务实例对象 DataObject[] taskInstancesArray = taskInstanceService.getTaskExcuteResultByTaskInstanceId(getInstanceIds(billTaskMaps)); if (null != taskInstancesArray && taskInstancesArray.length>0) { for (DataObject object : taskInstancesArray) { DataObject paramObject = billTaskMaps.get(object.getString("taskInstanceId")); //查询对应作业实例 DataObject tmpTaskDO = queryBillItemByItemId(paramObject.getString("billItemId")); if (null ==tmpTaskDO) { continue; } //任务正常完成 if (6 == object.getInt("exeState")) { //将任务结果插入作业表 updateFillinContent(object , billTaskMaps.get(object.getString("taskInstanceId")),tmpTaskDO , RUN_STATUS_SUCCESS); //移除当前监听的任务实例 billTaskMaps.remove(object.getString("taskInstanceId")); logger.info("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>"); System.out.println("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>"); //任务未正常结束 重启 }else if (4 <= object.getInt("exeState") ) { //任务终止 }else if ("1".equals(paramObject.get("exceptionProcessMode"))) { //将任务结果插入作业表 updateFillinContent(object , billTaskMaps.get(object.getString("taskInstanceId")),tmpTaskDO,RUN_STATUS_FAILED); billTaskMaps.remove(object.getString("taskInstanceId")); logger.info("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>"); System.out.println("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>"); } } } Thread.sleep(5000); //System.out.println("监听作业-任务 sleep 5s!"); } catch (Exception e) { logger.info("监听作业-任务 执行结果出错!"); billTaskMaps.clear(); manager.rollback(); e.printStackTrace(); } } } }
在其他地方只需调用BillTaskStatusListener的静态方法addToListener(Long taskInstId,DataObject paramObj),就可以了
注意:改类不能够拿过去直接用,需根据自己的时间情况修改,像读配置文件,读监听状态等等。。。
相关推荐
JDK1[1].5中的线程池(ThreadPoolExecutor)使用简介
JDK1.5中的线程池(ThreadPoolExecutor)使用简介
而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,...
本文档对Java中的线程和线程池进行了简单介绍。首先,阐述了为什么需要线程、...然后,介绍了为什么需要线程池,JDK自带的线程池实现方式ThreadPoolExecutor的使用及其原理,最后强调ThreadPoolExecutor应用的注意点。
JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用
本案例详细介绍了JDK自带线程池,与spring的线程池相比,其更好,希望对大家有帮助
JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用简介
corePoolSize: 线程池维护线程的最少数量 maximumPoolSize:线程池维护线程的最大数量 keepAliveTime: 线程池维护线程所允许的空闲时间 unit: 线程池维护线程所允许的空闲时间的单位 workQueue: 线程池所使用的...
主要介绍了Java8并行流中自定义线程池操作,结合实例形式分析了并行流的相关概念、定义及自定义线程池的相关操作技巧,需要的朋友可以参考下
介绍一个通用多线程服务模块。是利用jdk线程池,多线程并行处理多任务,以提高执行效率。
java jdk自带的线程并发框架使用帮助文档
JDK7多线程部分类(接口)关系图,根据官网得出
利用jdk自带算法实现的AES加解密工具类及Base64编解码工具类、 文件操作工具类、aes扩展无限制权限策略文件等。 在jdk1.7环境亲测通过。
对java多线程进行了全面的总结,包括了JDK 5的新特性。
目标:Java中多线程技术是一个难点,但是也是一个核心技术。因为Java本身就是一个多线程语言。本人目前在给46班讲授Swing的网络编程--使用Swing来模拟真实的QQ实时聊天软件。因为涉及到Socket编程,所以一定会使用多...
Reference: 《创建Java线程池》[1],《Java线程:新特征-线程池》[2], 《Java线程池学习》[3],《线程池ThreadPoolExecutor使用简介》[4],《Java5中的线程池实例讲解》[5],《ThreadPoolExecutor使用和思考》[6] ...
在多线程大师Doug Lea的贡献下,在JDK1.5中加入了许多对并发特性的支持,例如:线程池。.......................................JAVA线程、线程池资料----下载不扣分,回帖加1分,欢迎下载,童叟无欺JAVA线程、...
javaJDK8javaJDK8
在最近做的一个项目中,需要大量的使用到多线程和线程池,下面就java自带的线程池和大家一起分享
JAVA线程总结,包含线程池,显示使用线程实现异步编程,基于JDK中的Future实现异步编程,JDK中的FutureTask等