1 /* 2 线程池解决创建大量线程的问题 3 */ 4 interface ThreadPool{ 5 //执行一个Job,这个Job需要实现Runnable 6 void execute(Job job); 7 //关闭线程 8 void shutDown(); 9 //增加工作线程 10 void addWorkers(int num); 11 //减少工作线程 12 void removeWorker(int num); 13 //得到正在等待执行的任务数量 14 int getJobSize(); 15 } 16 17 /* 18 客户端通过execute(Job)方法将Job提交入线程池执行,而客户端自身不用等待Job的执行。 19 除了该方法外,线程池接口提供了增大/减少工作线程以及关闭线程的方法。 20 这里的工作线程代表着一个重复执行的Job线程,而每个客户提交的Job都将进入到一个工作队列中等待工作者线程处理 21 */ 22 class DefaultThreadPoll implements ThreadPool { 23 //线程池最大限制数 24 private static final int MAX_WORKER_NUMBERS = 10; 25 //线程池默认数量 26 private static final int DEFAULT_WORKER_NUMBERS = 5; 27 //线程池最小数量 28 private static final int MIN_WORKER_NUMBERS = 1; 29 //这是一个工作列表,将会向里面插入工作 30 private final LinkedList jobs = new LinkedList (); 31 //工作者列表 32 private final List workers = Collections.synchronizedList(new ArrayList ()); 33 //工作者线程的数量 34 private int workerNum = DEFAULT_WORKER_NUMBERS; 35 //线程编号生成 36 private AtomicLong threadNum = new AtomicLong(); 37 38 public DefaultThreadPoll(){ 39 initializeWorkers(DEFAULT_WORKER_NUMBERS); 40 } 41 public DefaultThreadPoll(int num){ 42 workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num; 43 initializeWorkers(workerNum); 44 } 45 public void execute(Job job){ 46 if (job != null){ 47 synchronized (jobs){ 48 jobs.addLast(job); 49 jobs.notify(); 50 } 51 } 52 } 53 public void shutDown(){ 54 for (Worker worker : workers){ 55 worker.shutDown(); 56 } 57 } 58 public void addWorkers(int num){ 59 synchronized (jobs){ 60 //限制新增的Worker数量不超过最大值 61 if (num + this.workerNum > MAX_WORKER_NUMBERS){ 62 num = MAX_WORKER_NUMBERS - workerNum; 63 } 64 initializeWorkers(num); 65 this.workerNum += num; 66 } 67 } 68 public void removeWorker(int num){ 69 synchronized (jobs){ 70 if (num > this.workerNum){ 71 throw new IllegalArgumentException("beyong workNum"); 72 } 73 //按照给定的数量停止Worker 74 int count = 0; 75 while (count < num){ 76 Worker worker = workers.get(count); 77 if (workers.remove(worker)){ 78 worker.shutDown(); 79 count++; 80 } 81 } 82 this.workerNum -= count; 83 } 84 } 85 public int getJobSize(){ 86 return jobs.size(); 87 } 88 89 //初始化线程工作者 90 private void initializeWorkers(int num){ 91 for (int i = 0; i < num; i++){ 92 Worker worker = new Worker(); 93 workers.add(worker); 94 Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet()); 95 thread.start(); 96 } 97 } 98 99 class Worker implements Runnable{100 //是否工作101 private volatile boolean running = true;102 @Override103 public void run(){104 while (running){105 Job job = null;106 synchronized (jobs){107 //如果工作列表是空的,那么就wait108 while (jobs.isEmpty()){109 try {110 jobs.wait();111 }catch (InterruptedException e){112 //感知到外部对WorkerThread的中断操作,返回113 Thread.currentThread().interrupt();114 return;115 }116 }117 //取出一个job118 job = jobs.removeFirst();119 }120 if (job != null){121 try {122 job.run();123 }catch (Exception e){124 //忽略Job执行中的异常125 }126 }127 }128 }129 public void shutDown(){130 running = false;131 }132 }133 }