1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.ScheduledChore;
33 import org.apache.hadoop.hbase.Stoppable;
34 import org.apache.hadoop.hbase.testclassification.SmallTests;
35 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
37 import org.apache.hadoop.hbase.util.Threads;
38 import org.junit.Test;
39 import org.junit.experimental.categories.Category;
40 import org.mockito.Mockito;
41 import org.mockito.invocation.InvocationOnMock;
42 import org.mockito.stubbing.Answer;
43
44 @Category(SmallTests.class)
45 public class TestServerNonceManager {
46
47 @Test
48 public void testNormalStartEnd() throws Exception {
49 final long[] numbers = new long[] { NO_NONCE, 1, 2, Long.MAX_VALUE, Long.MIN_VALUE };
50 ServerNonceManager nm = createManager();
51 for (int i = 0; i < numbers.length; ++i) {
52 for (int j = 0; j < numbers.length; ++j) {
53 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
54 }
55 }
56
57 for (int i = 0; i < numbers.length; ++i) {
58 assertTrue(nm.startOperation(numbers[i], NO_NONCE, createStoppable()));
59 }
60
61 for (int i = 0; i < numbers.length; ++i) {
62 for (int j = 0; j < numbers.length; ++j) {
63 nm.endOperation(numbers[i], numbers[j], false);
64 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
65 }
66 }
67
68 for (int i = 0; i < numbers.length; ++i) {
69 for (int j = 0; j < numbers.length; ++j) {
70 nm.endOperation(numbers[i], numbers[j], true);
71 assertEquals(numbers[j] == NO_NONCE,
72 nm.startOperation(numbers[i], numbers[j], createStoppable()));
73 }
74 }
75 }
76
77 @Test
78 public void testNoEndWithoutStart() {
79 ServerNonceManager nm = createManager();
80 try {
81 nm.endOperation(NO_NONCE, 1, true);
82 fail("Should have thrown");
83 } catch (AssertionError err) {}
84 }
85
86 @Test
87 public void testCleanup() throws Exception {
88 ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
89 EnvironmentEdgeManager.injectEdge(edge);
90 try {
91 ServerNonceManager nm = createManager(6);
92 ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
93 edge.setValue(1);
94 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
95 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
96 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
97 edge.setValue(2);
98 nm.endOperation(NO_NONCE, 1, true);
99 edge.setValue(4);
100 nm.endOperation(NO_NONCE, 2, true);
101 edge.setValue(9);
102 cleanup.choreForTesting();
103
104 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
105
106 assertFalse(nm.startOperation(NO_NONCE, 2, createStoppable()));
107
108 nm.endOperation(NO_NONCE, 3, false);
109 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
110 edge.setValue(11);
111 cleanup.choreForTesting();
112
113 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
114 } finally {
115 EnvironmentEdgeManager.reset();
116 }
117 }
118
119 @Test
120 public void testWalNonces() throws Exception {
121 ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
122 EnvironmentEdgeManager.injectEdge(edge);
123 try {
124 ServerNonceManager nm = createManager(6);
125 ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
126
127 edge.setValue(12);
128 nm.reportOperationFromWal(NO_NONCE, 1, 8);
129 nm.reportOperationFromWal(NO_NONCE, 2, 2);
130 nm.reportOperationFromWal(NO_NONCE, 3, 5);
131 nm.reportOperationFromWal(NO_NONCE, 3, 6);
132
133 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
134
135 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
136 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
137
138 edge.setValue(17);
139 cleanup.choreForTesting();
140 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
141 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
142 edge.setValue(19);
143 cleanup.choreForTesting();
144 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
145 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
146 } finally {
147 EnvironmentEdgeManager.reset();
148 }
149 }
150
151 @Test
152 public void testConcurrentAttempts() throws Exception {
153 final ServerNonceManager nm = createManager();
154
155 nm.startOperation(NO_NONCE, 1, createStoppable());
156 TestRunnable tr = new TestRunnable(nm, 1, false, createStoppable());
157 Thread t = tr.start();
158 waitForThreadToBlockOrExit(t);
159 nm.endOperation(NO_NONCE, 1, true);
160 t.join();
161 tr.propagateError();
162
163 nm.startOperation(NO_NONCE, 2, createStoppable());
164 tr = new TestRunnable(nm, 2, true, createStoppable());
165 t = tr.start();
166 waitForThreadToBlockOrExit(t);
167 nm.endOperation(NO_NONCE, 2, false);
168 t.join();
169 tr.propagateError();
170 nm.endOperation(NO_NONCE, 2, true);
171
172 nm.startOperation(NO_NONCE, 3, createStoppable());
173 tr = new TestRunnable(nm, 4, true, createStoppable());
174 tr.start().join();
175 tr.propagateError();
176 }
177
178 @Test
179 public void testStopWaiting() throws Exception {
180 final ServerNonceManager nm = createManager();
181 nm.setConflictWaitIterationMs(1);
182 Stoppable stoppingStoppable = createStoppable();
183 Mockito.when(stoppingStoppable.isStopped()).thenAnswer(new Answer<Boolean>() {
184 AtomicInteger answer = new AtomicInteger(3);
185 @Override
186 public Boolean answer(InvocationOnMock invocation) throws Throwable {
187 return 0 < answer.decrementAndGet();
188 }
189 });
190
191 nm.startOperation(NO_NONCE, 1, createStoppable());
192 TestRunnable tr = new TestRunnable(nm, 1, null, stoppingStoppable);
193 Thread t = tr.start();
194 waitForThreadToBlockOrExit(t);
195
196 t.join();
197 tr.propagateError();
198 }
199
200 private void waitForThreadToBlockOrExit(Thread t) throws InterruptedException {
201 for (int i = 9; i >= 0; --i) {
202 if (t.getState() == Thread.State.TIMED_WAITING || t.getState() == Thread.State.WAITING
203 || t.getState() == Thread.State.BLOCKED || t.getState() == Thread.State.TERMINATED) {
204 return;
205 }
206 if (i > 0) Thread.sleep(300);
207 }
208
209
210 }
211
212 private static class TestRunnable implements Runnable {
213 public final CountDownLatch startedLatch = new CountDownLatch(1);
214
215 private final ServerNonceManager nm;
216 private final long nonce;
217 private final Boolean expected;
218 private final Stoppable stoppable;
219
220 private Throwable throwable = null;
221
222 public TestRunnable(ServerNonceManager nm, long nonce, Boolean expected, Stoppable stoppable) {
223 this.nm = nm;
224 this.nonce = nonce;
225 this.expected = expected;
226 this.stoppable = stoppable;
227 }
228
229 public void propagateError() throws Exception {
230 if (throwable == null) return;
231 throw new Exception(throwable);
232 }
233
234 public Thread start() {
235 Thread t = new Thread(this);
236 t = Threads.setDaemonThreadRunning(t);
237 try {
238 startedLatch.await();
239 } catch (InterruptedException e) {
240 fail("Unexpected");
241 }
242 return t;
243 }
244
245 @Override
246 public void run() {
247 startedLatch.countDown();
248 boolean shouldThrow = expected == null;
249 boolean hasThrown = true;
250 try {
251 boolean result = nm.startOperation(NO_NONCE, nonce, stoppable);
252 hasThrown = false;
253 if (!shouldThrow) {
254 assertEquals(expected.booleanValue(), result);
255 }
256 } catch (Throwable t) {
257 if (!shouldThrow) {
258 throwable = t;
259 }
260 }
261 if (shouldThrow && !hasThrown) {
262 throwable = new AssertionError("Should have thrown");
263 }
264 }
265 }
266
267 private Stoppable createStoppable() {
268 Stoppable s = Mockito.mock(Stoppable.class);
269 Mockito.when(s.isStopped()).thenReturn(false);
270 return s;
271 }
272
273 private ServerNonceManager createManager() {
274 return createManager(null);
275 }
276
277 private ServerNonceManager createManager(Integer gracePeriod) {
278 Configuration conf = HBaseConfiguration.create();
279 if (gracePeriod != null) {
280 conf.setInt(ServerNonceManager.HASH_NONCE_GRACE_PERIOD_KEY, gracePeriod.intValue());
281 }
282 return new ServerNonceManager(conf);
283 }
284 }