`
juforg
  • 浏览: 44661 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

java jdk 自带多线程 线程池 框架 ThreadPoolExecutor

 
阅读更多

1.java.util.concurrent & ThreadPoolExecutor 相关概念 及参考

  http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html

  http://heipark.iteye.com/blog/1156011

  http://dongxuan.iteye.com/blog/901689

  http://blog.csdn.net/wangwenhui11/article/details/6760474

2.

public final class ThreadExecutorFactory {
	
	static TraceLogger logger = new TraceLogger(ThreadExecutorFactory.class);
	//参看user-config.xml
	private static int corePoolSize ;
	private static int maximumPoolSize ;
	private static long keepAliveTime ;
	
	private static TimeUnit unit = TimeUnit.SECONDS; //线程池维护线程所允许的空闲时间的单位

	private static BlockingQueue<Runnable> blockingQueue = null;
	static{
		init();
	}
	private static void init() {
		//读取配置,也可以写死
		corePoolSize = Integer.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "core_pool_size"));
		maximumPoolSize = Integer.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "maximum_pool_size"));
		keepAliveTime = Long.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "keep_alive_time"));
		String queueSize = ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "queueSize");
		
		if (!StringUtil.isBlank(queueSize)) {
			blockingQueue = new ArrayBlockingQueue<Runnable>(Integer.valueOf(queueSize));
		}
		if (FactoryHolder.threadPool == null) {
			FactoryHolder.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime ,unit ,blockingQueue ,new ThreadPoolExecutor.AbortPolicy());
		}
	}
	
	
	private static class FactoryHolder{
		static ThreadPoolExecutor threadPool = null;
	}
	
	public static void execute(Runnable runnable){
		FactoryHolder.threadPool.execute(runnable);
		logger.info(runnable + " 加入线程池!");
		System.out.println(runnable + " 加入线程池!");
	}
}

 简单监听器的运用,首先产生线程,将要监听的对象放入map中, 循环Map,当map中的元素都移出后,结束线程, 再次将要监听的对象放入map时,又会产生新的线程监听,如果在此线程没结束时 加入一个新的监听对象到map 不会启动新线程。

public class BillTaskStatusListener implements Runnable ,Serializable{
	private static final long serialVersionUID = -6416044623618376341L;
	static TraceLogger logger = new TraceLogger(BillTaskStatusListener.class);
	
	private static final String RUN_STATUS_SUCCESS = "Y";	//成功
	private static final String RUN_STATUS_FAILED = "N";	//失败
	private static final String RUN_STATUS_RUNNING = "R"; //执行中
	//线程安全集合,用于存储待查询结果的任务实例Id

	private static Map<String,DataObject> billTaskMaps = new ConcurrentHashMap<String, DataObject>();
		
	/**
	 * 同步添加到监听循环
	 * @param taskInstId
	 */
	private synchronized static void addToListener(Long taskInstId,DataObject paramObj){
		if (billTaskMaps.isEmpty()) {
			billTaskMaps.put(String.valueOf(taskInstId), paramObj);
			ThreadExecutorFactory.execute(new BillTaskStatusListener());
		}else {
			billTaskMaps.put(String.valueOf(taskInstId), paramObj);
		}
		logger.info("任务实例:" + paramObj.getString("taskInstId") + " 加入<作业-任务结果监听>");
		System.out.println("任务实例:" + paramObj.getString("taskInstId") + " 加入<作业-任务结果监听>");
	}
	
	/**
	 * 
	 */
	public void run() {
		
		while (!billTaskMaps.isEmpty()) {
			
			try {
				//获取所有监听的任务实例对象
				DataObject[] taskInstancesArray = taskInstanceService.getTaskExcuteResultByTaskInstanceId(getInstanceIds(billTaskMaps));
				
				if (null != taskInstancesArray && taskInstancesArray.length>0) {
					
					for (DataObject object : taskInstancesArray) {
						DataObject paramObject = billTaskMaps.get(object.getString("taskInstanceId"));
						
						//查询对应作业实例
						DataObject tmpTaskDO = queryBillItemByItemId(paramObject.getString("billItemId"));
						if (null ==tmpTaskDO) {
							continue;
						}
						//任务正常完成
						if (6 == object.getInt("exeState")) {
							//将任务结果插入作业表
							updateFillinContent(object , billTaskMaps.get(object.getString("taskInstanceId")),tmpTaskDO , RUN_STATUS_SUCCESS);
							
							//移除当前监听的任务实例
							billTaskMaps.remove(object.getString("taskInstanceId"));
							
							
							logger.info("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>");
							System.out.println("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>");
							//任务未正常结束 重启
						}else if (4 <= object.getInt("exeState") ) {
							
							
								
								//任务终止
						}else if ("1".equals(paramObject.get("exceptionProcessMode"))) {
								//将任务结果插入作业表
								updateFillinContent(object , billTaskMaps.get(object.getString("taskInstanceId")),tmpTaskDO,RUN_STATUS_FAILED);
								billTaskMaps.remove(object.getString("taskInstanceId"));
								logger.info("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>");
								System.out.println("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>");
							
						}
					}
					
				}
				Thread.sleep(5000);
				//System.out.println("监听作业-任务 sleep 5s!");
			} catch (Exception e) {
				logger.info("监听作业-任务 执行结果出错!");
				billTaskMaps.clear();
				manager.rollback();
				e.printStackTrace();
			}
		}

	}
	
	
}

 

 

在其他地方只需调用BillTaskStatusListener的静态方法addToListener(Long taskInstId,DataObject paramObj),就可以了

注意:改类不能够拿过去直接用,需根据自己的时间情况修改,像读配置文件,读监听状态等等。。。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics