1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import org.apache.hadoop.hbase.testclassification.SmallTests;
22 import org.junit.Before;
23 import org.junit.Test;
24 import org.junit.experimental.categories.Category;
25
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import static org.junit.Assert.*;
33
34
35 @Category(SmallTests.class)
36 public class TestStealJobQueue {
37
38 StealJobQueue<Integer> stealJobQueue;
39 BlockingQueue stealFromQueue;
40
41 @Before
42 public void setup() {
43 stealJobQueue = new StealJobQueue<>();
44 stealFromQueue = stealJobQueue.getStealFromQueue();
45
46 }
47
48
49 @Test
50 public void testTake() throws InterruptedException {
51 stealJobQueue.offer(3);
52 stealFromQueue.offer(10);
53 stealJobQueue.offer(15);
54 stealJobQueue.offer(4);
55 assertEquals(3, stealJobQueue.take().intValue());
56 assertEquals(4, stealJobQueue.take().intValue());
57 assertEquals("always take from the main queue before trying to steal", 15,
58 stealJobQueue.take().intValue());
59 assertEquals(10, stealJobQueue.take().intValue());
60 assertTrue(stealFromQueue.isEmpty());
61 assertTrue(stealJobQueue.isEmpty());
62 }
63
64 @Test
65 public void testOfferInStealQueueFromShouldUnblock() throws InterruptedException {
66 final AtomicInteger taken = new AtomicInteger();
67 Thread consumer = new Thread() {
68 @Override
69 public void run() {
70 try {
71 Integer n = stealJobQueue.take();
72 taken.set(n);
73 } catch (InterruptedException e) {
74 e.printStackTrace();
75 }
76 }
77 };
78 consumer.start();
79 stealFromQueue.offer(3);
80 consumer.join(1000);
81 assertEquals(3, taken.get());
82 consumer.interrupt();
83 }
84
85
86 @Test
87 public void testOfferInStealJobQueueShouldUnblock() throws InterruptedException {
88 final AtomicInteger taken = new AtomicInteger();
89 Thread consumer = new Thread() {
90 @Override
91 public void run() {
92 try {
93 Integer n = stealJobQueue.take();
94 taken.set(n);
95 } catch (InterruptedException e) {
96 e.printStackTrace();
97 }
98 }
99 };
100 consumer.start();
101 stealJobQueue.offer(3);
102 consumer.join(1000);
103 assertEquals(3, taken.get());
104 consumer.interrupt();
105 }
106
107
108 @Test
109 public void testPoll() throws InterruptedException {
110 stealJobQueue.offer(3);
111 stealFromQueue.offer(10);
112 stealJobQueue.offer(15);
113 stealJobQueue.offer(4);
114 assertEquals(3, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
115 assertEquals(4, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
116 assertEquals("always take from the main queue before trying to steal", 15,
117 stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
118 assertEquals(10, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
119 assertTrue(stealFromQueue.isEmpty());
120 assertTrue(stealJobQueue.isEmpty());
121 assertNull(stealJobQueue.poll(10, TimeUnit.MILLISECONDS));
122 }
123
124 @Test
125 public void testPutInStealQueueFromShouldUnblockPoll() throws InterruptedException {
126 final AtomicInteger taken = new AtomicInteger();
127 Thread consumer = new Thread() {
128 @Override
129 public void run() {
130 try {
131 Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS);
132 taken.set(n);
133 } catch (InterruptedException e) {
134 e.printStackTrace();
135 }
136 }
137 };
138 consumer.start();
139 stealFromQueue.put(3);
140 consumer.join(1000);
141 assertEquals(3, taken.get());
142 consumer.interrupt();
143
144 }
145
146
147 @Test
148 public void testAddInStealJobQueueShouldUnblockPoll() throws InterruptedException {
149 final AtomicInteger taken = new AtomicInteger();
150 Thread consumer = new Thread() {
151 @Override
152 public void run() {
153 try {
154 Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS);
155 taken.set(n);
156 } catch (InterruptedException e) {
157 e.printStackTrace();
158 }
159 }
160 };
161 consumer.start();
162 stealJobQueue.add(3);
163 consumer.join(1000);
164 assertEquals(3, taken.get());
165 consumer.interrupt();
166 }
167
168
169 @Test
170 public void testInteractWithThreadPool() throws InterruptedException {
171 StealJobQueue<Runnable> stealTasksQueue = new StealJobQueue<>();
172 final CountDownLatch stealJobCountDown = new CountDownLatch(3);
173 final CountDownLatch stealFromCountDown = new CountDownLatch(3);
174 ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) {
175 @Override
176 protected void afterExecute(Runnable r, Throwable t) {
177 super.afterExecute(r, t);
178 stealJobCountDown.countDown();
179 }
180
181 };
182
183
184 stealPool.prestartAllCoreThreads();
185
186 ThreadPoolExecutor stealFromPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS,
187 stealTasksQueue.getStealFromQueue()) {
188 @Override
189 protected void afterExecute(Runnable r, Throwable t) {
190 super.afterExecute(r, t);
191 stealFromCountDown.countDown();
192 }
193 };
194
195 for (int i = 0; i < 4; i++) {
196 TestTask task = new TestTask();
197 stealFromPool.execute(task);
198 }
199
200 for (int i = 0; i < 2; i++) {
201 TestTask task = new TestTask();
202 stealPool.execute(task);
203 }
204
205 stealJobCountDown.await(1, TimeUnit.SECONDS);
206 stealFromCountDown.await(1, TimeUnit.SECONDS);
207 assertEquals(0, stealFromCountDown.getCount());
208 assertEquals(0, stealJobCountDown.getCount());
209 }
210
211 class TestTask extends Thread implements Comparable<TestTask> {
212 @Override
213 public int compareTo(TestTask o) {
214 return 0;
215 }
216
217 @Override
218 public void run() {
219 try {
220 Thread.sleep(200);
221 } catch (InterruptedException e) {
222 e.printStackTrace();
223 }
224 }
225 }
226
227 }