`

java多线程任务生成

 
阅读更多
protected final void exeTaskList(final List<Task> tasklist) {
        if (tasklist == null || tasklist.size() == 0) {
            log.info(getTaskName() + ":当前没有需要同步 整帐户数据的任务!");
            return;
        }
        final SemaphoreCountDownExecutor executor = new SemaphoreCountDownExecutor(SEMAPHORE_PERMIT, tasklist.size());
        try {
            for (final Task task : tasklist) {
                try {
                    if (taskIsExecuting(task)) {//任务正在执行中
                        log.info("task:正在执行中," + task);
                        executor.countDown();
                        continue;
                    }
                    synchronized (log) {
                        if (taskRuleService.canExec(task, taskService.findExecuting())) {
                            log.info("doTask after canExec" + task);
                            updateTaskExecuting(task);
                            log.info("doTask after updateExecuting" + task);
                            executor.submitTask(new Runnable() {
//                                @Transactional
                                @Override
                                public void run() {
                                    try {
                                        execute(task);//执行任务
                                        updateTaskSuccess(task);//更新任务状态
                                        log.info("doTask after updateSuccess" + task);
                                    } catch (Exception e) {
                                        log.error("executing error!", e);
                                        if (e.getMessage() != null) {
                                            taskService.updateErrorInfo(task.getId(), e.getMessage());
                                        }
                                        Integer count = task.getRetryCount();
                                        if (null != count && count.intValue() >= 5) {
                                            updateTaskFailed(task);
                                        } else {
                                            updateTaskRetry(task);
                                        }
                                    }
                                }
                            });
                        }
                    }
                } catch (Exception e) {//某个线程异常
                    log.error("exeTaskList exception"+e.getMessage(), e);
                }
            }
            log.info("exeTaskList before executor.await");
            executor.await(60 * 60);
        } catch (Exception e) {
            log.error(e.getMessage());
        } finally {
            log.info(getTaskName() + ", end");
        }
    }

public class SemaphoreCountDownExecutor {
	/**
	 * Logger for this class
	 */
	private static final Logger logger = LoggerFactory.getLogger(SemaphoreCountDownExecutor.class);

	private static final ExecutorService executor = 
		new ThreadPoolExecutor(8, 10000,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
	private final Semaphore semaphore;
	private final CountDownLatch countDown;

	/**
	 * @param semaphorePermits :new Semaphore(semaphorePermits);
	 * @param count : new CountDownLatch(count);
	 */
	public SemaphoreCountDownExecutor(final int semaphorePermits,final int count) {
		super();
		this.semaphore = new Semaphore(semaphorePermits);
		this.countDown = new CountDownLatch(count);
	}

	/**
	 * <p>
	 * 堵塞,等待计数器为0 ,然后 关闭线程池
	 * </p>
	 * @author chenlong
	 * @version 2013-8-8
	 */
	public void await(final long timeSeconds) {
		try {
			if (timeSeconds <= 0){
				countDown.await(60 * 60, TimeUnit.SECONDS);
			}else{
				countDown.await(timeSeconds, TimeUnit.SECONDS);
			}
		} catch (InterruptedException e) {
			logger.error(e.getMessage());
		} 
		
	}

	/**
	 * <p>计数器减一</p>
	 */
	public void countDown(){
		countDown.countDown();
	}
	/***
	 * <p>
	 * 提交一个任务到带有信号变量的线程池,执行完后及异常情况释放信号变量且计数器自动减一
	 * </p>
	 * @param command
	 * @throws InterruptedException
	 */
	public void submitTask(final Runnable command) throws InterruptedException {
		semaphore.acquire(); // 接收一个信号量,此处可能抛出InterruptedException
		try {
			executor.execute(new Runnable() {
				@Override
				public void run() {
					// run中是同步执行
					try {
						command.run();
					} finally {
						semaphore.release();// 线程异常时释放信号量
						countDown.countDown();// 执行结束计数器-1
					}
				}
			});
		} catch (Exception e) {// 捕获线程池异常时, 释放信号及计数器-1
			logger.error(e.getMessage());
			countDown.countDown();// 执行结束计数器-1
			semaphore.release();
		}

	}

}
说明:
这里主要是从数据库里查询出当前需要执行的任务,然后判断当前任务是否能被执行,如果能被执行,则submit执行。
问题分析:
待续
 

 

分享到:
评论

相关推荐

    java-多线程下载器(支持断点续传、线程加减)包含源码和可运行jar包 第二版

    2、支持多任务多线程同时下载; 3、每个任务的线程数由用户在新建任务时自定义,缺省为5个线程; 4、任务下载过程中可以点击“线程+”或“线程-”即时增减线程; 5、选择任务,可以在任务信息栏中查看任务下载的信息...

    Java多线程编程的Java中的线程.docx

    程序、进程和线程 程序是一组指令的有序集合,也可以将其通俗地理解为若干行代码。...一个进程可以包含多个线程,每个线程执行自己的任务,同一个进程中的所有线程共享该进程中的资源,如内存空间、文件句柄等。

    SM2 SM3 X.509 Cert 国密 数字签名 算法 国密证书 生成 签发 证书请求 keystore 纯java.zip

    多线程支持: Java内置了对多线程的支持,允许程序同时执行多个任务。这对于开发需要高并发性能的应用程序(如服务器端应用、网络应用等)非常重要。 自动内存管理(垃圾回收): Java具有自动内存管理机制,通过...

    Linux系统设计-Java编写的用于多线程获取腾讯云Linux系统磁盘分区状态的小工具,可以生成Excel或者HTML文件

    Linux系统是一个免费使用和自由传播的类Unix操作系统,基于POSIX和UNIX的多用户、多任务、支持多线程和多CPU的操作系统。它继承了Unix以网络为核心的设计思想,是一个性能稳定的多用户网络操作系统,Linux是许多企业...

    JAVA上百实例源码以及开源项目

     Java局域网通信——飞鸽传书源代码,大家都知道VB版、VC版还有Delphi版的飞鸽传书软件,但是Java版的确实不多,因此这个Java文件传输实例不可错过,Java网络编程技能的提升很有帮助。 Java聊天程序,包括服务端和...

    JAVA上百实例源码以及开源项目源代码

     Java局域网通信——飞鸽传书源代码,大家都知道VB版、VC版还有Delphi版的飞鸽传书软件,但是Java版的确实不多,因此这个Java文件传输实例不可错过,Java网络编程技能的提升很有帮助。 Java聊天程序,包括服务端和...

    Java开发技术大全(500个源代码).

    ThreadImRunnable.java 继承Runnable接口实现多线程 mulThread.java 创建多个线程对象的类 demoJoin.java 演示使用join()以确保主线程最后结束 clicker.java 一个计数用的线程类 demoPri.java 调用上面这个类...

    java开源包11

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    java开源包6

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    java开源包4

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

    java开源包9

    Java多线程程序死锁检查 JCarder JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 Java...

Global site tag (gtag.js) - Google Analytics