View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.mockito.Mockito.never;
23  import static org.mockito.Mockito.spy;
24  import static org.mockito.Mockito.times;
25  import static org.mockito.Mockito.verify;
26  
27  import java.util.ArrayList;
28  import java.util.List;
29  import java.util.concurrent.CountDownLatch;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.testclassification.MediumTests;
35  import org.apache.hadoop.hbase.errorhandling.ForeignException;
36  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.util.Pair;
39  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41  import org.junit.AfterClass;
42  import org.junit.BeforeClass;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  import org.mockito.Mockito;
46  import org.mockito.invocation.InvocationOnMock;
47  import org.mockito.stubbing.Answer;
48  import org.mockito.verification.VerificationMode;
49  
50  import com.google.common.collect.Lists;
51  
52  /**
53   * Test zookeeper-based, procedure controllers
54   */
55  @Category(MediumTests.class)
56  public class TestZKProcedureControllers {
57  
58    private static final Log LOG = LogFactory.getLog(TestZKProcedureControllers.class);
59    private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
60    private static final String COHORT_NODE_NAME = "expected";
61    private static final String CONTROLLER_NODE_NAME = "controller";
62    private static final VerificationMode once = Mockito.times(1);
63  
64    private final byte[] memberData = new String("data from member").getBytes();
65  
66    @BeforeClass
67    public static void setupTest() throws Exception {
68      UTIL.startMiniZKCluster();
69    }
70  
71    @AfterClass
72    public static void cleanupTest() throws Exception {
73      UTIL.shutdownMiniZKCluster();
74    }
75  
76    /**
77     * Smaller test to just test the actuation on the cohort member
78     * @throws Exception on failure
79     */
80    @Test(timeout = 60000)
81    public void testSimpleZKCohortMemberController() throws Exception {
82      ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
83      final String operationName = "instanceTest";
84  
85      final Subprocedure sub = Mockito.mock(Subprocedure.class);
86      Mockito.when(sub.getName()).thenReturn(operationName);
87  
88      final byte[] data = new byte[] { 1, 2, 3 };
89      final CountDownLatch prepared = new CountDownLatch(1);
90      final CountDownLatch committed = new CountDownLatch(1);
91  
92      final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
93      final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
94          watcher, "testSimple");
95  
96      // mock out cohort member callbacks
97      final ProcedureMember member = Mockito
98          .mock(ProcedureMember.class);
99      Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
100     Mockito.doAnswer(new Answer<Void>() {
101       @Override
102       public Void answer(InvocationOnMock invocation) throws Throwable {
103         controller.sendMemberAcquired(sub);
104         prepared.countDown();
105         return null;
106       }
107     }).when(member).submitSubprocedure(sub);
108     Mockito.doAnswer(new Answer<Void>() {
109       @Override
110       public Void answer(InvocationOnMock invocation) throws Throwable {
111         controller.sendMemberCompleted(sub, memberData);
112         committed.countDown();
113         return null;
114       }
115     }).when(member).receivedReachedGlobalBarrier(operationName);
116 
117     // start running the listener
118     controller.start(COHORT_NODE_NAME, member);
119 
120     // set a prepare node from a 'coordinator'
121     String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
122     ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
123     // wait for the operation to be prepared
124     prepared.await();
125 
126     // create the commit node so we update the operation to enter the commit phase
127     String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
128     LOG.debug("Found prepared, posting commit node:" + commit);
129     ZKUtil.createAndFailSilent(watcher, commit);
130     LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
131     committed.await();
132 
133     verify(monitor, never()).receive(Mockito.any(ForeignException.class));
134     // XXX: broken due to composition.
135 //    verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
136 //      Mockito.any(IOException.class));
137     // cleanup after the test
138     ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
139     assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
140     assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
141   }
142 
143   @Test(timeout = 60000)
144   public void testZKCoordinatorControllerWithNoCohort() throws Exception {
145     final String operationName = "no cohort controller test";
146     final byte[] data = new byte[] { 1, 2, 3 };
147 
148     runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
149     runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
150   }
151 
152   @Test(timeout = 60000)
153   public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
154     final String operationName = "single member controller test";
155     final byte[] data = new byte[] { 1, 2, 3 };
156 
157     runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
158     runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
159   }
160 
161   @Test(timeout = 60000)
162   public void testZKCoordinatorControllerMultipleCohort() throws Exception {
163     final String operationName = "multi member controller test";
164     final byte[] data = new byte[] { 1, 2, 3 };
165 
166     runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
167       "cohort2", "cohort3");
168     runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
169       "cohort2", "cohort3");
170   }
171 
172   private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
173       String operationName, byte[] data, String... cohort) throws Exception {
174     ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
175     List<String> expected = Lists.newArrayList(cohort);
176 
177     final Subprocedure sub = Mockito.mock(Subprocedure.class);
178     Mockito.when(sub.getName()).thenReturn(operationName);
179 
180     CountDownLatch prepared = new CountDownLatch(expected.size());
181     CountDownLatch committed = new CountDownLatch(expected.size());
182     ArrayList<byte[]> dataFromMembers = new ArrayList<byte[]>();
183 
184     // mock out coordinator so we can keep track of zk progress
185     ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
186       prepared, committed, dataFromMembers);
187 
188     ProcedureMember member = Mockito.mock(ProcedureMember.class);
189 
190     Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
191         .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
192     ZKProcedureCoordinatorRpcs controller = pair.getFirst();
193     List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
194     // start the operation
195     Procedure p = Mockito.mock(Procedure.class);
196     Mockito.when(p.getName()).thenReturn(operationName);
197 
198     controller.sendGlobalBarrierAcquire(p, data, expected);
199 
200     // post the prepare node for each expected node
201     for (ZKProcedureMemberRpcs cc : cohortControllers) {
202       cc.sendMemberAcquired(sub);
203     }
204 
205     // wait for all the notifications to reach the coordinator
206     prepared.await();
207     // make sure we got the all the nodes and no more
208     Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
209       Mockito.anyString());
210 
211     // kick off the commit phase
212     controller.sendGlobalBarrierReached(p, expected);
213 
214     // post the committed node for each expected node
215     for (ZKProcedureMemberRpcs cc : cohortControllers) {
216       cc.sendMemberCompleted(sub, memberData);
217     }
218 
219     // wait for all commit notifications to reach the coordinator
220     committed.await();
221     // make sure we got the all the nodes and no more
222     Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
223       Mockito.anyString(), Mockito.eq(memberData));
224 
225     assertEquals("Incorrect number of members returnd data", expected.size(),
226       dataFromMembers.size());
227     for (byte[] result : dataFromMembers) {
228       assertArrayEquals("Incorrect data from member", memberData, result);
229     }
230 
231     controller.resetMembers(p);
232 
233     // verify all behavior
234     verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
235     verifyCohort(member, cohortControllers.size(), operationName, data);
236     verifyCoordinator(operationName, coordinator, expected);
237   }
238 
239   // TODO Broken by composition.
240 //  @Test
241 //  public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
242 //    runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
243 //      "cohort1", "cohort2");
244 //    runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
245 //      "cohort1", "cohort2");
246 //  }
247 
248   public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
249       String... cohort) throws Exception {
250     ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
251     List<String> expected = Lists.newArrayList(cohort);
252 
253     final Subprocedure sub = Mockito.mock(Subprocedure.class);
254     Mockito.when(sub.getName()).thenReturn(operationName);
255 
256     final CountDownLatch prepared = new CountDownLatch(expected.size());
257     final CountDownLatch committed = new CountDownLatch(expected.size());
258     ArrayList<byte[]> dataFromMembers = new ArrayList<byte[]>();
259 
260     // mock out coordinator so we can keep track of zk progress
261     ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
262       prepared, committed, dataFromMembers);
263 
264     ProcedureMember member = Mockito.mock(ProcedureMember.class);
265     Procedure p = Mockito.mock(Procedure.class);
266     Mockito.when(p.getName()).thenReturn(operationName);
267 
268     Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
269         .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
270     ZKProcedureCoordinatorRpcs controller = pair.getFirst();
271     List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
272 
273     // post 1/2 the prepare nodes early
274     for (int i = 0; i < cohortControllers.size() / 2; i++) {
275       cohortControllers.get(i).sendMemberAcquired(sub);
276     }
277 
278     // start the operation
279     controller.sendGlobalBarrierAcquire(p, data, expected);
280 
281     // post the prepare node for each expected node
282     for (ZKProcedureMemberRpcs cc : cohortControllers) {
283       cc.sendMemberAcquired(sub);
284     }
285 
286     // wait for all the notifications to reach the coordinator
287     prepared.await();
288     // make sure we got the all the nodes and no more
289     Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
290       Mockito.anyString());
291 
292     // kick off the commit phase
293     controller.sendGlobalBarrierReached(p, expected);
294 
295     // post the committed node for each expected node
296     for (ZKProcedureMemberRpcs cc : cohortControllers) {
297       cc.sendMemberCompleted(sub, memberData);
298     }
299 
300     // wait for all commit notifications to reach the coordiantor
301     committed.await();
302     // make sure we got the all the nodes and no more
303     Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
304       Mockito.anyString(), Mockito.eq(memberData));
305 
306     controller.resetMembers(p);
307 
308     // verify all behavior
309     verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
310     verifyCohort(member, cohortControllers.size(), operationName, data);
311     verifyCoordinator(operationName, coordinator, expected);
312   }
313 
314   /**
315    * @param dataFromMembers
316    * @return a mock {@link ProcedureCoordinator} that just counts down the
317    *         prepared and committed latch for called to the respective method
318    */
319   private ProcedureCoordinator setupMockCoordinator(String operationName,
320       final CountDownLatch prepared, final CountDownLatch committed,
321       final ArrayList<byte[]> dataFromMembers) {
322     ProcedureCoordinator coordinator = Mockito
323         .mock(ProcedureCoordinator.class);
324     Mockito.mock(ProcedureCoordinator.class);
325     Mockito.doAnswer(new Answer<Void>() {
326       @Override
327       public Void answer(InvocationOnMock invocation) throws Throwable {
328         prepared.countDown();
329         return null;
330       }
331     }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
332     Mockito.doAnswer(new Answer<Void>() {
333       @Override
334       public Void answer(InvocationOnMock invocation) throws Throwable {
335         dataFromMembers.add(memberData);
336         committed.countDown();
337         return null;
338       }
339     }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(),
340       Mockito.eq(memberData));
341     return coordinator;
342   }
343 
344   /**
345    * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
346    */
347   private void verifyZooKeeperClean(String operationName, ZooKeeperWatcher watcher,
348       ZKProcedureUtil controller) throws Exception {
349     String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
350     String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
351     String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
352     assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
353     assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
354     assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
355   }
356 
357   /**
358    * Verify the cohort controller got called once per expected node to start the operation
359    */
360   private void verifyCohort(ProcedureMember member, int cohortSize,
361       String operationName, byte[] data) {
362 //    verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
363 //      (byte[]) Mockito.argThat(new ArrayEquals(data)));
364     Mockito.verify(member,
365       Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any(Subprocedure.class));
366 
367   }
368 
369   /**
370    * Verify that the coordinator only got called once for each expected node
371    */
372   private void verifyCoordinator(String operationName,
373       ProcedureCoordinator coordinator, List<String> expected) {
374     // verify that we got all the expected nodes
375     for (String node : expected) {
376       verify(coordinator, once).memberAcquiredBarrier(operationName, node);
377       verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData);
378     }
379   }
380 
381   /**
382    * Specify how the controllers that should be started (not spy/mockable) for the test.
383    */
384   private abstract class StartControllers {
385     public abstract Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
386         ZooKeeperWatcher watcher, String operationName,
387         ProcedureCoordinator coordinator, String controllerName,
388         ProcedureMember member, List<String> cohortNames) throws Exception;
389   }
390 
391   private final StartControllers startCoordinatorFirst = new StartControllers() {
392 
393     @Override
394     public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
395         ZooKeeperWatcher watcher, String operationName,
396         ProcedureCoordinator coordinator, String controllerName,
397         ProcedureMember member, List<String> expected) throws Exception {
398       // start the controller
399       ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
400           watcher, operationName, CONTROLLER_NODE_NAME);
401       controller.start(coordinator);
402 
403       // make a cohort controller for each expected node
404 
405       List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
406       for (String nodeName : expected) {
407         ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
408         cc.start(nodeName, member);
409         cohortControllers.add(cc);
410       }
411       return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
412           controller, cohortControllers);
413     }
414   };
415 
416   /**
417    * Check for the possible race condition where a cohort member starts after the controller and
418    * therefore could miss a new operation
419    */
420   private final StartControllers startCohortFirst = new StartControllers() {
421 
422     @Override
423     public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
424         ZooKeeperWatcher watcher, String operationName,
425         ProcedureCoordinator coordinator, String controllerName,
426         ProcedureMember member, List<String> expected) throws Exception {
427 
428       // make a cohort controller for each expected node
429       List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
430       for (String nodeName : expected) {
431         ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
432         cc.start(nodeName, member);
433         cohortControllers.add(cc);
434       }
435 
436       // start the controller
437       ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
438           watcher, operationName, CONTROLLER_NODE_NAME);
439       controller.start(coordinator);
440 
441       return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
442           controller, cohortControllers);
443     }
444   };
445 }