1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNull;
22 import static org.junit.Assert.assertSame;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.assertFalse;
25
26 import java.util.Comparator;
27 import java.util.PriorityQueue;
28 import java.util.concurrent.CyclicBarrier;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.hadoop.hbase.testclassification.SmallTests;
34
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.experimental.categories.Category;
39
40 @Category(SmallTests.class)
41 public class TestBoundedPriorityBlockingQueue {
42 private final static int CAPACITY = 16;
43
44 class TestObject {
45 private final int priority;
46 private final int seqId;
47
48 public TestObject(final int priority, final int seqId) {
49 this.priority = priority;
50 this.seqId = seqId;
51 }
52
53 public int getSeqId() {
54 return this.seqId;
55 }
56
57 public int getPriority() {
58 return this.priority;
59 }
60 }
61
62 class TestObjectComparator implements Comparator<TestObject> {
63 public TestObjectComparator() {}
64
65 @Override
66 public int compare(TestObject a, TestObject b) {
67 return a.getPriority() - b.getPriority();
68 }
69 }
70
71 private BoundedPriorityBlockingQueue<TestObject> queue;
72
73 @Before
74 public void setUp() throws Exception {
75 this.queue = new BoundedPriorityBlockingQueue<TestObject>(CAPACITY, new TestObjectComparator());
76 }
77
78 @After
79 public void tearDown() throws Exception {
80 }
81
82 @Test
83 public void tesAppend() throws Exception {
84
85 for (int i = 1; i <= CAPACITY; ++i) {
86 assertTrue(queue.offer(new TestObject(i, i)));
87 assertEquals(i, queue.size());
88 assertEquals(CAPACITY - i, queue.remainingCapacity());
89 }
90 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
91
92
93 for (int i = 1; i <= CAPACITY; ++i) {
94 TestObject obj = queue.poll();
95 assertEquals(i, obj.getSeqId());
96 assertEquals(CAPACITY - i, queue.size());
97 assertEquals(i, queue.remainingCapacity());
98 }
99 assertEquals(null, queue.poll());
100 }
101
102 @Test
103 public void tesAppendSamePriority() throws Exception {
104
105 for (int i = 1; i <= CAPACITY; ++i) {
106 assertTrue(queue.offer(new TestObject(0, i)));
107 assertEquals(i, queue.size());
108 assertEquals(CAPACITY - i, queue.remainingCapacity());
109 }
110 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
111
112
113 for (int i = 1; i <= CAPACITY; ++i) {
114 TestObject obj = queue.poll();
115 assertEquals(i, obj.getSeqId());
116 assertEquals(CAPACITY - i, queue.size());
117 assertEquals(i, queue.remainingCapacity());
118 }
119 assertEquals(null, queue.poll());
120 }
121
122 @Test
123 public void testPrepend() throws Exception {
124
125 for (int i = 1; i <= CAPACITY; ++i) {
126 assertTrue(queue.offer(new TestObject(CAPACITY - i, i)));
127 assertEquals(i, queue.size());
128 assertEquals(CAPACITY - i, queue.remainingCapacity());
129 }
130
131
132 for (int i = 1; i <= CAPACITY; ++i) {
133 TestObject obj = queue.poll();
134 assertEquals(CAPACITY - (i - 1), obj.getSeqId());
135 assertEquals(CAPACITY - i, queue.size());
136 assertEquals(i, queue.remainingCapacity());
137 }
138 assertEquals(null, queue.poll());
139 }
140
141 @Test
142 public void testInsert() throws Exception {
143
144 for (int i = 1; i <= CAPACITY; i += 2) {
145 assertTrue(queue.offer(new TestObject(i, i)));
146 assertEquals((1 + i) / 2, queue.size());
147 }
148 for (int i = 2; i <= CAPACITY; i += 2) {
149 assertTrue(queue.offer(new TestObject(i, i)));
150 assertEquals(CAPACITY / 2 + (i / 2), queue.size());
151 }
152 assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
153
154
155 for (int i = 1; i <= CAPACITY; ++i) {
156 TestObject obj = queue.poll();
157 assertEquals(i, obj.getSeqId());
158 assertEquals(CAPACITY - i, queue.size());
159 assertEquals(i, queue.remainingCapacity());
160 }
161 assertEquals(null, queue.poll());
162 }
163
164 @Test
165 public void testFifoSamePriority() throws Exception {
166 assertTrue(CAPACITY >= 6);
167 for (int i = 0; i < 6; ++i) {
168 assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i)));
169 }
170
171 for (int i = 0; i < 6; i += 2) {
172 TestObject obj = queue.poll();
173 assertEquals(10, obj.getPriority());
174 assertEquals(i, obj.getSeqId());
175 }
176
177 for (int i = 1; i < 6; i += 2) {
178 TestObject obj = queue.poll();
179 assertEquals(20, obj.getPriority());
180 assertEquals(i, obj.getSeqId());
181 }
182 assertEquals(null, queue.poll());
183 }
184
185 @Test
186 public void testPoll() {
187 assertNull(queue.poll());
188 PriorityQueue<TestObject> testList = new PriorityQueue<TestObject>(CAPACITY, new TestObjectComparator());
189
190 for (int i = 0; i < CAPACITY; ++i) {
191 TestObject obj = new TestObject(i, i);
192 testList.add(obj);
193 queue.offer(obj);
194 }
195
196 for (int i = 0; i < CAPACITY; ++i) {
197 assertEquals(testList.poll(), queue.poll());
198 }
199
200 assertNull(null, queue.poll());
201 }
202
203 @Test(timeout=10000)
204 public void testPollInExecutor() throws InterruptedException {
205 final TestObject testObj = new TestObject(0, 0);
206
207 final CyclicBarrier threadsStarted = new CyclicBarrier(2);
208 ExecutorService executor = Executors.newFixedThreadPool(2);
209 executor.execute(new Runnable() {
210 public void run() {
211 try {
212 assertNull(queue.poll(1000, TimeUnit.MILLISECONDS));
213 threadsStarted.await();
214 assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS));
215 assertTrue(queue.isEmpty());
216 } catch (Exception e) {
217 throw new RuntimeException(e);
218 }
219 }
220 });
221
222 executor.execute(new Runnable() {
223 public void run() {
224 try {
225 threadsStarted.await();
226 queue.offer(testObj);
227 } catch (Exception e) {
228 throw new RuntimeException(e);
229 }
230 }
231 });
232
233 executor.shutdown();
234 assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS));
235 }
236 }