1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.spy;
24 import static org.mockito.Mockito.times;
25 import static org.mockito.Mockito.verify;
26
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.testclassification.MediumTests;
35 import org.apache.hadoop.hbase.errorhandling.ForeignException;
36 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
37 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38 import org.apache.hadoop.hbase.util.Pair;
39 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41 import org.junit.AfterClass;
42 import org.junit.BeforeClass;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45 import org.mockito.Mockito;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.mockito.stubbing.Answer;
48 import org.mockito.verification.VerificationMode;
49
50 import com.google.common.collect.Lists;
51
52
53
54
55 @Category(MediumTests.class)
56 public class TestZKProcedureControllers {
57
58 private static final Log LOG = LogFactory.getLog(TestZKProcedureControllers.class);
59 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
60 private static final String COHORT_NODE_NAME = "expected";
61 private static final String CONTROLLER_NODE_NAME = "controller";
62 private static final VerificationMode once = Mockito.times(1);
63
64 private final byte[] memberData = new String("data from member").getBytes();
65
66 @BeforeClass
67 public static void setupTest() throws Exception {
68 UTIL.startMiniZKCluster();
69 }
70
71 @AfterClass
72 public static void cleanupTest() throws Exception {
73 UTIL.shutdownMiniZKCluster();
74 }
75
76
77
78
79
80 @Test(timeout = 60000)
81 public void testSimpleZKCohortMemberController() throws Exception {
82 ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
83 final String operationName = "instanceTest";
84
85 final Subprocedure sub = Mockito.mock(Subprocedure.class);
86 Mockito.when(sub.getName()).thenReturn(operationName);
87
88 final byte[] data = new byte[] { 1, 2, 3 };
89 final CountDownLatch prepared = new CountDownLatch(1);
90 final CountDownLatch committed = new CountDownLatch(1);
91
92 final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
93 final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
94 watcher, "testSimple");
95
96
97 final ProcedureMember member = Mockito
98 .mock(ProcedureMember.class);
99 Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
100 Mockito.doAnswer(new Answer<Void>() {
101 @Override
102 public Void answer(InvocationOnMock invocation) throws Throwable {
103 controller.sendMemberAcquired(sub);
104 prepared.countDown();
105 return null;
106 }
107 }).when(member).submitSubprocedure(sub);
108 Mockito.doAnswer(new Answer<Void>() {
109 @Override
110 public Void answer(InvocationOnMock invocation) throws Throwable {
111 controller.sendMemberCompleted(sub, memberData);
112 committed.countDown();
113 return null;
114 }
115 }).when(member).receivedReachedGlobalBarrier(operationName);
116
117
118 controller.start(COHORT_NODE_NAME, member);
119
120
121 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
122 ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
123
124 prepared.await();
125
126
127 String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
128 LOG.debug("Found prepared, posting commit node:" + commit);
129 ZKUtil.createAndFailSilent(watcher, commit);
130 LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
131 committed.await();
132
133 verify(monitor, never()).receive(Mockito.any(ForeignException.class));
134
135
136
137
138 ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
139 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
140 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
141 }
142
143 @Test(timeout = 60000)
144 public void testZKCoordinatorControllerWithNoCohort() throws Exception {
145 final String operationName = "no cohort controller test";
146 final byte[] data = new byte[] { 1, 2, 3 };
147
148 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
149 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
150 }
151
152 @Test(timeout = 60000)
153 public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
154 final String operationName = "single member controller test";
155 final byte[] data = new byte[] { 1, 2, 3 };
156
157 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
158 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
159 }
160
161 @Test(timeout = 60000)
162 public void testZKCoordinatorControllerMultipleCohort() throws Exception {
163 final String operationName = "multi member controller test";
164 final byte[] data = new byte[] { 1, 2, 3 };
165
166 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
167 "cohort2", "cohort3");
168 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
169 "cohort2", "cohort3");
170 }
171
172 private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
173 String operationName, byte[] data, String... cohort) throws Exception {
174 ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
175 List<String> expected = Lists.newArrayList(cohort);
176
177 final Subprocedure sub = Mockito.mock(Subprocedure.class);
178 Mockito.when(sub.getName()).thenReturn(operationName);
179
180 CountDownLatch prepared = new CountDownLatch(expected.size());
181 CountDownLatch committed = new CountDownLatch(expected.size());
182 ArrayList<byte[]> dataFromMembers = new ArrayList<byte[]>();
183
184
185 ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
186 prepared, committed, dataFromMembers);
187
188 ProcedureMember member = Mockito.mock(ProcedureMember.class);
189
190 Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
191 .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
192 ZKProcedureCoordinatorRpcs controller = pair.getFirst();
193 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
194
195 Procedure p = Mockito.mock(Procedure.class);
196 Mockito.when(p.getName()).thenReturn(operationName);
197
198 controller.sendGlobalBarrierAcquire(p, data, expected);
199
200
201 for (ZKProcedureMemberRpcs cc : cohortControllers) {
202 cc.sendMemberAcquired(sub);
203 }
204
205
206 prepared.await();
207
208 Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
209 Mockito.anyString());
210
211
212 controller.sendGlobalBarrierReached(p, expected);
213
214
215 for (ZKProcedureMemberRpcs cc : cohortControllers) {
216 cc.sendMemberCompleted(sub, memberData);
217 }
218
219
220 committed.await();
221
222 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
223 Mockito.anyString(), Mockito.eq(memberData));
224
225 assertEquals("Incorrect number of members returnd data", expected.size(),
226 dataFromMembers.size());
227 for (byte[] result : dataFromMembers) {
228 assertArrayEquals("Incorrect data from member", memberData, result);
229 }
230
231 controller.resetMembers(p);
232
233
234 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
235 verifyCohort(member, cohortControllers.size(), operationName, data);
236 verifyCoordinator(operationName, coordinator, expected);
237 }
238
239
240
241
242
243
244
245
246
247
248 public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
249 String... cohort) throws Exception {
250 ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
251 List<String> expected = Lists.newArrayList(cohort);
252
253 final Subprocedure sub = Mockito.mock(Subprocedure.class);
254 Mockito.when(sub.getName()).thenReturn(operationName);
255
256 final CountDownLatch prepared = new CountDownLatch(expected.size());
257 final CountDownLatch committed = new CountDownLatch(expected.size());
258 ArrayList<byte[]> dataFromMembers = new ArrayList<byte[]>();
259
260
261 ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
262 prepared, committed, dataFromMembers);
263
264 ProcedureMember member = Mockito.mock(ProcedureMember.class);
265 Procedure p = Mockito.mock(Procedure.class);
266 Mockito.when(p.getName()).thenReturn(operationName);
267
268 Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
269 .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
270 ZKProcedureCoordinatorRpcs controller = pair.getFirst();
271 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
272
273
274 for (int i = 0; i < cohortControllers.size() / 2; i++) {
275 cohortControllers.get(i).sendMemberAcquired(sub);
276 }
277
278
279 controller.sendGlobalBarrierAcquire(p, data, expected);
280
281
282 for (ZKProcedureMemberRpcs cc : cohortControllers) {
283 cc.sendMemberAcquired(sub);
284 }
285
286
287 prepared.await();
288
289 Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
290 Mockito.anyString());
291
292
293 controller.sendGlobalBarrierReached(p, expected);
294
295
296 for (ZKProcedureMemberRpcs cc : cohortControllers) {
297 cc.sendMemberCompleted(sub, memberData);
298 }
299
300
301 committed.await();
302
303 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
304 Mockito.anyString(), Mockito.eq(memberData));
305
306 controller.resetMembers(p);
307
308
309 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
310 verifyCohort(member, cohortControllers.size(), operationName, data);
311 verifyCoordinator(operationName, coordinator, expected);
312 }
313
314
315
316
317
318
319 private ProcedureCoordinator setupMockCoordinator(String operationName,
320 final CountDownLatch prepared, final CountDownLatch committed,
321 final ArrayList<byte[]> dataFromMembers) {
322 ProcedureCoordinator coordinator = Mockito
323 .mock(ProcedureCoordinator.class);
324 Mockito.mock(ProcedureCoordinator.class);
325 Mockito.doAnswer(new Answer<Void>() {
326 @Override
327 public Void answer(InvocationOnMock invocation) throws Throwable {
328 prepared.countDown();
329 return null;
330 }
331 }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
332 Mockito.doAnswer(new Answer<Void>() {
333 @Override
334 public Void answer(InvocationOnMock invocation) throws Throwable {
335 dataFromMembers.add(memberData);
336 committed.countDown();
337 return null;
338 }
339 }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(),
340 Mockito.eq(memberData));
341 return coordinator;
342 }
343
344
345
346
347 private void verifyZooKeeperClean(String operationName, ZooKeeperWatcher watcher,
348 ZKProcedureUtil controller) throws Exception {
349 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
350 String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
351 String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
352 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
353 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
354 assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
355 }
356
357
358
359
360 private void verifyCohort(ProcedureMember member, int cohortSize,
361 String operationName, byte[] data) {
362
363
364 Mockito.verify(member,
365 Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any(Subprocedure.class));
366
367 }
368
369
370
371
372 private void verifyCoordinator(String operationName,
373 ProcedureCoordinator coordinator, List<String> expected) {
374
375 for (String node : expected) {
376 verify(coordinator, once).memberAcquiredBarrier(operationName, node);
377 verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData);
378 }
379 }
380
381
382
383
384 private abstract class StartControllers {
385 public abstract Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
386 ZooKeeperWatcher watcher, String operationName,
387 ProcedureCoordinator coordinator, String controllerName,
388 ProcedureMember member, List<String> cohortNames) throws Exception;
389 }
390
391 private final StartControllers startCoordinatorFirst = new StartControllers() {
392
393 @Override
394 public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
395 ZooKeeperWatcher watcher, String operationName,
396 ProcedureCoordinator coordinator, String controllerName,
397 ProcedureMember member, List<String> expected) throws Exception {
398
399 ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
400 watcher, operationName, CONTROLLER_NODE_NAME);
401 controller.start(coordinator);
402
403
404
405 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
406 for (String nodeName : expected) {
407 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
408 cc.start(nodeName, member);
409 cohortControllers.add(cc);
410 }
411 return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
412 controller, cohortControllers);
413 }
414 };
415
416
417
418
419
420 private final StartControllers startCohortFirst = new StartControllers() {
421
422 @Override
423 public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
424 ZooKeeperWatcher watcher, String operationName,
425 ProcedureCoordinator coordinator, String controllerName,
426 ProcedureMember member, List<String> expected) throws Exception {
427
428
429 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
430 for (String nodeName : expected) {
431 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
432 cc.start(nodeName, member);
433 cohortControllers.add(cc);
434 }
435
436
437 ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
438 watcher, operationName, CONTROLLER_NODE_NAME);
439 controller.start(coordinator);
440
441 return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
442 controller, cohortControllers);
443 }
444 };
445 }