手写一个线程池(2)拒绝策略的实现

上一次实现了一个简单的线程池,这一次我们来实现一些简单的拒绝策略

模拟任务数超过任务队列的情况

首先为了模拟这种情况,我们需要多增加一些任务,修改一下之前的代码添加一些命令输出方便观察

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10);
for(int i = 0; i < 15;i++){
int j = i;
threadPool.execute(()-> {
try {
Thread.sleep(100000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}
}
@Slf4j(topic = "c.ThreadPool")
class ThreadPool{
//任务队列
private BlockingQueue<Runnable> taskQueue;

//线程集合
private HashSet<Worker> workers = new HashSet<>();

//核心线程数
private int coreSize;

//获取任务的超时时间
private long timeout;

private TimeUnit timeUnit;

//执行任务
public void execute(Runnable task){
//任务数没有超过coreSize时,直接交给Worker对象执行
//如果任务数超过coreSize时,加入任务队列暂存
synchronized (workers){
if(workers.size() < coreSize){
log.debug("新增 worker{},{}",workers,task);
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}else {
taskQueue.put(task);
}
}
}

public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
}


class Worker extends Thread{
private Runnable task;

public Worker(Runnable task) {
this.task = task;
}

@Override
public void run() {
//执行任务
// 1.当task不为空,执行任务
// 2.当task执行完毕,再接着从任务队列获取任务并执行
//while (task != null || (task = taskQueue.take()) != null){
while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
try {
log.debug("正在执行...{}",task);
task.run();
}catch (Exception e){
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers){
log.debug("worker 被移除{}",this);
workers.remove(this);
}
}
}
}


@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T>{
//1.任务队列
private Deque<T> queue = new ArrayDeque<>();

//2.锁
private ReentrantLock lock = new ReentrantLock();

//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//5.容量
private int capcity;

public BlockingQueue(int capcity) {
this.capcity = capcity;
}

//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
//把其他时间单位统一转换位纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
//awaitNanos()的返回值是等待时间减去已经经过的时间,也就是剩余时间
//返回的是剩余时间,防止了虚假唤醒的问题
if(nanos <= 0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//元素获取完从队列移除
T t = queue.removeFirst();
//获取完需要唤醒等待空位的线程
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}

//阻塞获取的方法
public T take(){
lock.lock();
try {
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//元素获取完从队列移除
T t = queue.removeFirst();
//获取完需要唤醒等待空位的线程
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}

//阻塞添加的方法
public void put(T element){
lock.lock();
try {
while(queue.size() == capcity){
try {
log.debug("等待加入任务队列{}...",element);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}",element);
queue.addLast(element);
// 添加完毕后,当队列不为空时需要唤醒消费者线程
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}

//获取队列大小的方法
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}

模拟队列满了

我们可以看出来任务已经满了,正在等待加入队列,目前我们的策略是让任务死等,这样非常不人性化,接下来我们实现一些不同的拒绝策略

拒绝策略的介绍

ThreadPoolExecutor是一个典型的缓存池化设计的产物,因为池子有大小,当池子体积不够承载时,就涉及到拒绝策略。JDK中已经预设了4种线程池拒绝策略,下面结合场景详细聊聊这些策略的使用场景,以及我们还能扩展哪些拒绝策略。
当前提交任务数大于(maxPoolSize + queueCapacity)时就会触发线程池的拒绝策略了。

实现拒绝策略

那么接下来实现几个策略,我们需要使用设计模式里的策略模式,我们先来实现一个接口

  • 1.一直等待
  • 2.带超时等待
  • 3.让调用者放弃任务执行
  • 4.让调用者抛出异常
  • 5.让调用者自己执行任务

那么我们来改造代码,首先我们需要添加一个拒绝策略的参数在ThreadPool类里

1
2
//拒绝策略的属性
private RejectPolicy<Runnable> rejectPolicy;

记得把构造方法里添加拒绝策略的参数

增加一个接口

1
2
3
4
@FunctionalInterface //拒绝策略
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue, T task);
}

将execute方法修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void execute(Runnable task){
//任务数没有超过coreSize时,直接交给Worker对象执行
//如果任务数超过coreSize时,加入任务队列暂存
synchronized (workers){
if(workers.size() < coreSize){
log.debug("新增 worker{},{}",workers,task);
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}else {
//taskQueue.put(task);
//1.死等
//2.带超时等待
//3.让调用者放弃任务执行
//4.让调用者抛出异常
//5.让调用者自己执行任务
taskQueue.tryPut(rejectPolicy,task);
}
}
}

接下来我们来实现taskQueue的tryPut方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//判断队列是否已满
if(queue.size() == capcity){
rejectPolicy.reject(this,task);
}else {
//队列有空闲
log.debug("加入任务队列{}",task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}

实现让线程一直等待的策略

接下来我们就可以实现拒绝策略了,来试试第一种拒绝策略,让线程一直等待。编写测试类,把响应的参数调好就可以开始测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,((queue, task) -> {
//1.实现了一直等待的逻辑(死等)
queue.put(task);
//2.带超时等待
//3.让调用者放弃任务执行
//4.让调用者抛出异常
//5.让调用者自己执行任务

}));
for(int i = 0; i < 3;i++){
int j = i;
threadPool.execute(()-> {
try {
Thread.sleep(100000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}
}


@FunctionalInterface //拒绝策略
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue, T task);
}


@Slf4j(topic = "c.ThreadPool")
class ThreadPool{
//任务队列
private BlockingQueue<Runnable> taskQueue;

//线程集合
private HashSet<Worker> workers = new HashSet<>();

//核心线程数
private int coreSize;

//获取任务的超时时间
private long timeout;

private TimeUnit timeUnit;

//拒绝策略的属性
private RejectPolicy<Runnable> rejectPolicy;

//执行任务
public void execute(Runnable task){
//任务数没有超过coreSize时,直接交给Worker对象执行
//如果任务数超过coreSize时,加入任务队列暂存
synchronized (workers){
if(workers.size() < coreSize){
log.debug("新增 worker{},{}",workers,task);
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}else {
//taskQueue.put(task);
//1.死等
//2.带超时等待
//3.让调用者放弃任务执行
//4.让调用者抛出异常
//5.让调用者自己执行任务
taskQueue.tryPut(rejectPolicy,task);
}
}
}

public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}


class Worker extends Thread{
private Runnable task;

public Worker(Runnable task) {
this.task = task;
}

@Override
public void run() {
//执行任务
// 1.当task不为空,执行任务
// 2.当task执行完毕,再接着从任务队列获取任务并执行
//while (task != null || (task = taskQueue.take()) != null){
while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
try {
log.debug("正在执行...{}",task);
task.run();
}catch (Exception e){
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers){
log.debug("worker 被移除{}",this);
workers.remove(this);
}
}
}
}


@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T>{
//1.任务队列
private Deque<T> queue = new ArrayDeque<>();

//2.锁
private ReentrantLock lock = new ReentrantLock();

//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//5.容量
private int capcity;

public BlockingQueue(int capcity) {
this.capcity = capcity;
}

//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
//把其他时间单位统一转换位纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
//awaitNanos()的返回值是等待时间减去已经经过的时间,也就是剩余时间
//返回的是剩余时间,防止了虚假唤醒的问题
if(nanos <= 0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//元素获取完从队列移除
T t = queue.removeFirst();
//获取完需要唤醒等待空位的线程
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}

//阻塞获取的方法
public T take(){
lock.lock();
try {
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//元素获取完从队列移除
T t = queue.removeFirst();
//获取完需要唤醒等待空位的线程
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}

//阻塞添加的方法
public void put(T element){
lock.lock();
try {
while(queue.size() == capcity){
try {
log.debug("等待加入任务队列{}...",element);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}",element);
queue.addLast(element);
// 添加完毕后,当队列不为空时需要唤醒消费者线程
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}

//获取队列大小的方法
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}

public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//判断队列是否已满
if(queue.size() == capcity){
rejectPolicy.reject(this,task);
}else {
//队列有空闲
log.debug("加入任务队列{}",task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}


一直等待的拒绝策略

实现让线程超时后移除的策略(带超时等待的策略)

接下来让我们实现其他的拒绝策略吧

添加一个offer方法在BlookQueue里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 带超时时间阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if(nanos <= 0) {
return false;
}
log.debug("等待加入任务队列 {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}

我们将睡眠时间设置为一秒就可以开始我们的测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,((queue, task) -> {
//1.实现了一直等待的逻辑(死等)
//queue.put(task);

//2.带超时等待
queue.offer(task,500,TimeUnit.MILLISECONDS);

//3.让调用者放弃任务执行
//4.让调用者抛出异常
//5.让调用者自己执行任务

}));
for(int i = 0; i < 3;i++){
int j = i;
threadPool.execute(()-> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}

带超时等待的策略

这下我们可以看出,线程并没有死等,一旦超过时间便会移除线程

线程放弃执行的策略

这种策略是最简单的,只要你什么都不做,线程就会自动放弃,我们可以打印一句话来观察一下结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,((queue, task) -> {
//1.实现了一直等待的逻辑(死等)
//queue.put(task);

//2.带超时等待
//queue.offer(task,1500,TimeUnit.MILLISECONDS);

//3.让调用者放弃任务执行
log.debug("放弃{}",task);

//4.让调用者抛出异常
//5.让调用者自己执行任务

}));
for(int i = 0; i < 3;i++){
int j = i;
threadPool.execute(()-> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}

放弃执行的策略

让线程抛出异常的策略

我们只需要在执行时抛出一个异常就好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,((queue, task) -> {
//1.实现了一直等待的逻辑(死等)
//queue.put(task);

//2.带超时等待
//queue.offer(task,1500,TimeUnit.MILLISECONDS);

//3.让调用者放弃任务执行
//log.debug("放弃{}",task);

//4.让调用者抛出异常
throw new RuntimeException("任务执行失败"+ task);

//5.让调用者自己执行任务

}));
for(int i = 0; i < 4;i++){
int j = i;
threadPool.execute(()-> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}

抛异常的方式

注意这种策略和第三种放弃的策略有一些不同,假如有4个线程,第三种放弃策略的情况下,第三个任务和第四个任务是会被放弃了,是会进入执行log.debug打印出来的
而第四种抛异常的策略,第三个任务和第四个任务是没有进入程序的,第四个任务是不会显示的因为抛出异常程序直接就中断了

第五种策略 让调用者自己执行任务

让调用者自己执行任务的策略也非常简单,只需要调用task.run()就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,((queue, task) -> {
//1.实现了一直等待的逻辑(死等)
//queue.put(task);

//2.带超时等待
//queue.offer(task,1500,TimeUnit.MILLISECONDS);

//3.让调用者放弃任务执行
//log.debug("放弃{}",task);

//4.让调用者抛出异常
//throw new RuntimeException("任务执行失败"+ task);

//5.让调用者自己执行任务
task.run();

}));
for(int i = 0; i < 4;i++){
int j = i;
threadPool.execute(()-> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}

让调用者自己执行任务
可以看到2,3任务被main线程执行了,到现在为止我们已经实现了5种策略模式,还有其他的策略模式大家可以自己探索一下。下面我会将完整代码贴出来,感谢大家的观看,最近忙着秋招实在太忙了,等秋招后会保持一定的更新频率的

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,((queue, task) -> {
//1.实现了一直等待的逻辑(死等)
//queue.put(task);

//2.带超时等待
//queue.offer(task,1500,TimeUnit.MILLISECONDS);

//3.让调用者放弃任务执行
//log.debug("放弃{}",task);

//4.让调用者抛出异常
//throw new RuntimeException("任务执行失败"+ task);

//5.让调用者自己执行任务
task.run();

}));
for(int i = 0; i < 4;i++){
int j = i;
threadPool.execute(()-> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}
}


@FunctionalInterface //拒绝策略
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue, T task);
}


@Slf4j(topic = "c.ThreadPool")
class ThreadPool{
//任务队列
private BlockingQueue<Runnable> taskQueue;

//线程集合
private HashSet<Worker> workers = new HashSet<>();

//核心线程数
private int coreSize;

//获取任务的超时时间
private long timeout;

private TimeUnit timeUnit;

//拒绝策略的属性
private RejectPolicy<Runnable> rejectPolicy;

//执行任务
public void execute(Runnable task){
//任务数没有超过coreSize时,直接交给Worker对象执行
//如果任务数超过coreSize时,加入任务队列暂存
synchronized (workers){
if(workers.size() < coreSize){
log.debug("新增 worker{},{}",workers,task);
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
}else {
//taskQueue.put(task);
//1.死等
//2.带超时等待
//3.让调用者放弃任务执行
//4.让调用者抛出异常
//5.让调用者自己执行任务
taskQueue.tryPut(rejectPolicy,task);
}
}
}

public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}


class Worker extends Thread{
private Runnable task;

public Worker(Runnable task) {
this.task = task;
}

@Override
public void run() {
//执行任务
// 1.当task不为空,执行任务
// 2.当task执行完毕,再接着从任务队列获取任务并执行
//while (task != null || (task = taskQueue.take()) != null){
while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
try {
log.debug("正在执行...{}",task);
task.run();
}catch (Exception e){
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers){
log.debug("worker 被移除{}",this);
workers.remove(this);
}
}
}
}


@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T>{
//1.任务队列
private Deque<T> queue = new ArrayDeque<>();

//2.锁
private ReentrantLock lock = new ReentrantLock();

//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//5.容量
private int capcity;

public BlockingQueue(int capcity) {
this.capcity = capcity;
}

//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
//把其他时间单位统一转换位纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
//awaitNanos()的返回值是等待时间减去已经经过的时间,也就是剩余时间
//返回的是剩余时间,防止了虚假唤醒的问题
if(nanos <= 0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//元素获取完从队列移除
T t = queue.removeFirst();
//获取完需要唤醒等待空位的线程
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}

//阻塞获取的方法
public T take(){
lock.lock();
try {
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//元素获取完从队列移除
T t = queue.removeFirst();
//获取完需要唤醒等待空位的线程
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}

//阻塞添加的方法
public void put(T element){
lock.lock();
try {
while(queue.size() == capcity){
try {
log.debug("等待加入任务队列{}...",element);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}",element);
queue.addLast(element);
// 添加完毕后,当队列不为空时需要唤醒消费者线程
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}

// 带超时时间阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if(nanos <= 0) {
return false;
}
log.debug("等待加入任务队列 {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}

//获取队列大小的方法
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}

public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//判断队列是否已满
if(queue.size() == capcity){
rejectPolicy.reject(this,task);
}else {
//队列有空闲
log.debug("加入任务队列{}",task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}

感谢观看


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!