Posts Java多线程 - 简单的线程池模式
Post
Cancel

Java多线程 - 简单的线程池模式

在中间件等容器中,线程池是用的最多的管理线程与任务请求分配的方式。创建线程是需要消耗时间和内存的。在服务启动之初,就初始化一定数量的线程,其优点是保证这些数量的线程在内存设定的情况下一定会创建成功,并且节约了临时创建线程的时间。

线程池的工作原理大致为:为可以预期的大量请求创建一定数目的线程,当请求到来时,请求会被放入一个FIFO的列表中,线程池中的线程按照一定的优先调度获取这些请求,执行完后,再从请求列表中挨个获取下一个请求加以处理。在线程与请求处理过程中,会有两种情形出现:

  • 一定数目的线程来不及处理大量的请求,请求会被挂起,直到线程有能力可以处理
  • 没有请求到来,所有线程处于等待状态,等待请求列表中出现第一个请求

这两种情形,都可以通过wait()来模拟,并使用notify()来唤醒对方,条件已经满足。

这里涉及到几个类:

  • 线程池管理
  • 任务处理线程
  • 请求线程
  • 请求对象

先从主程序开始看,通过ThreadPoolManager线程池管理类创建一个线程池,池内分配2个线程,通过setup()初始化这2个线程。然后通过RequesterThread请求线程类模拟三个用户发出的任务请求,每个人发出的任务请求都是大量的。

1
2
3
4
5
6
7
8
9
public class Main {
	public static void main(String args[]) {
		ThreadPoolManager tpMgr = new ThreadPoolManager(2);
		tpMgr.setup();  //启动池内的所有线程
		new RequesterThread("Tom", tpMgr).start();
		new RequesterThread("Jerry", tpMgr).start();
		new RequesterThread("Peter", tpMgr).start();
	}
}

请求对象Task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Task {
	private String requester;
	private int reqID;
	public Task(String requester, int reqID) {
		this.requester = requester;
		this.reqID = reqID;        
	}
	public void execute(){
		System.out.println(Thread.currentThread().getName() + " executes " + this);
		try{
			Thread.sleep(1000);
		}catch(Exception e){
			//
		}
	}
	public String toString(){
		return requester + " @taskID=" + reqID;
	}
}

Task类,简单地通过execute()方法来假设执行一段工作,耗时1秒。在构造函数中,requester代表的是请求者名称, 就是Tom/Jerry/Peter等。reqID代表的是每个人发出的请求ID号。

请求线程RequesterThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RequesterThread extends Thread{
	private ThreadPoolManager tpMgr;
	RequesterThread(String name, ThreadPoolManager threadPoolManager) {
		super(name);
		this.tpMgr = threadPoolManager;
	}
	public void run(){
		for (int i=0; true; i++){
			tpMgr.putTask(new Task(getName(), i));
			try {
				Thread.sleep(50);
			} catch (InterruptedException e) {
			}
			;
		}
	}
}

在Main主程序中,通过new RequesterThread(“Tom”, tpMgr).start()来创建Tom这个人发出的多个任务请求。在run()中的for循环中,不停地创建new Task(),i代表的是请求ID号,递增。for循环没有终止条件,模拟不停的用户请求。而新创建的任务通过ThreadPoolManager.putTask()来添加到FIFO的请求列表中。

任务处理线程WorkerThread

1
2
3
4
5
6
7
8
9
10
11
12
13
public class WorkerThread extends Thread{
   private ThreadPoolManager tpMgr;
   public WorkerThread(String thName, ThreadPoolManager tpMgr){
	   super(thName);
	   this.tpMgr = tpMgr;
   }
   public void run(){
	   while(true){
		   Task task = tpMgr.getTask();
		   task.execute();
	   }
   }
}

WorkerThread正好和RequesterThread相反,它是从ThreadPoolManager内置的请求列表中按照先进先出通过getTask()获取一个任务,并执行。

线程池管理ThreadPoolManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadPoolManager {
	private static final int MAX_REQUESTS = 10;
	LinkedList<Task> tasks = new LinkedList<Task>();
	
	private WorkerThread[] threadPool;
	
	// count: threads number
	public ThreadPoolManager(int threads) {
		threadPool = new WorkerThread[threads];
		for (int i=0; i<threads; i++){
			threadPool[i] = new WorkerThread("Thread-" + i, this);
		}        
	}
	public void setup(){
		for (int i = 0; i < threadPool.length; i++) {
			threadPool[i].start();
		}
	}

	public synchronized Task getTask() {
		while (tasks.isEmpty()){
			try {
				wait();
			} catch (InterruptedException e) {
			}
		}
		Task task = tasks.removeFirst();
		notifyAll();
		return task;
	}
	public synchronized void putTask(Task task){
		while (tasks.size() >= MAX_REQUESTS){
			try {
				System.out.println("!! New task blocked due to maximum tasks size: " + tasks.size());
				wait();
			} catch (InterruptedException e) {
			}
		}
		tasks.add(task);
		System.out.println("+ New task added, size: " + tasks.size());
		notifyAll();        
	}
}

线程池管理是最复杂的,它负责处理线程池的管理,任务请求的先进先出列表等。

Main主程序中,通过ThreadPoolManager的构造函数和setup方法,初始化了一定数量的线程,放到池里。

在任务请求类中,for无限循环里,通过tpMgr.putTask(new Task(getName(), i));添加新建的任务。在具体的putTask()方法中,需要处理一个情形:任务列表中的数量超出线程池能接受的最大任务数。此时需要等待。等待的条件是任务数量小于最大任务数。

同样,在任务处理线程中,调用了ThreadPoolManager的getTask()。当任务请求列表为空时,线程暂挂,需要等待条件成熟:任务请求列表不为空。

执行过程

从执行结果中,可以很清晰地看到请求任务列表在空和满的情况下等待的状态。因为我们只创建了2个线程,所以线程池中只有Thread-0和Thread-1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
+ New task added, size: 1
Thread-1 executes Tom @taskID=0
+ New task added, size: 1
Thread-0 executes Jerry @taskID=0
+ New task added, size: 1
+ New task added, size: 2
+ New task added, size: 3
+ New task added, size: 4
+ New task added, size: 5
+ New task added, size: 6
+ New task added, size: 7
+ New task added, size: 8
+ New task added, size: 9
+ New task added, size: 10
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
Thread-1 executes Peter @taskID=0
...
...
...
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
Thread-0 executes Peter @taskID=13
+ New task added, size: 10
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
Thread-1 executes Jerry @taskID=11
+ New task added, size: 10
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
Thread-0 executes Jerry @taskID=12
+ New task added, size: 10
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
+ New task added, size: 10
Thread-1 executes Tom @taskID=16
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
!! New task blocked due to maximum tasks size: 10
...
...
This post is licensed under CC BY 4.0 by the author.

Java多线程 - 任务指派的临时线程

Oracle数据库学习-手动创建数据库