View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import static org.mockito.Matchers.any;
21  import static org.mockito.Matchers.anyString;
22  import static org.mockito.Matchers.eq;
23  import static org.mockito.Mockito.doAnswer;
24  import static org.mockito.Mockito.doThrow;
25  import static org.mockito.Mockito.inOrder;
26  import static org.mockito.Mockito.mock;
27  import static org.mockito.Mockito.never;
28  import static org.mockito.Mockito.reset;
29  import static org.mockito.Mockito.spy;
30  import static org.mockito.Mockito.verify;
31  import static org.mockito.Mockito.verifyZeroInteractions;
32  import static org.mockito.Mockito.when;
33  
34  import java.io.IOException;
35  import java.util.concurrent.ThreadPoolExecutor;
36  
37  import org.apache.hadoop.hbase.testclassification.SmallTests;
38  import org.apache.hadoop.hbase.errorhandling.ForeignException;
39  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
40  import org.apache.hadoop.hbase.errorhandling.TimeoutException;
41  import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
42  import org.junit.After;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  import org.mockito.InOrder;
46  import org.mockito.Mockito;
47  import org.mockito.invocation.InvocationOnMock;
48  import org.mockito.stubbing.Answer;
49  
50  /**
51   * Test the procedure member, and it's error handling mechanisms.
52   */
53  @Category(SmallTests.class)
54  public class TestProcedureMember {
55    private static final long WAKE_FREQUENCY = 100;
56    private static final long TIMEOUT = 100000;
57    private static final long POOL_KEEP_ALIVE = 1;
58  
59    private final String op = "some op";
60    private final byte[] data = new byte[0];
61    private final ForeignExceptionDispatcher mockListener = Mockito
62        .spy(new ForeignExceptionDispatcher());
63    private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
64    private final ProcedureMemberRpcs mockMemberComms = Mockito
65        .mock(ProcedureMemberRpcs.class);
66    private ProcedureMember member;
67    private ForeignExceptionDispatcher dispatcher;
68    Subprocedure spySub;
69  
70    /**
71     * Reset all the mock objects
72     */
73    @After
74    public void resetTest() {
75      reset(mockListener, mockBuilder, mockMemberComms);
76      if (member != null)
77        try {
78          member.close();
79        } catch (IOException e) {
80          e.printStackTrace();
81        }
82    }
83  
84    /**
85     * Build a member using the class level mocks
86     * @return member to use for tests
87     */
88    private ProcedureMember buildCohortMember() {
89      String name = "node";
90      ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
91      return new ProcedureMember(mockMemberComms, pool, mockBuilder);
92    }
93  
94    /**
95     * Setup a procedure member that returns the spied-upon {@link Subprocedure}.
96     */
97    private void buildCohortMemberPair() throws IOException {
98      dispatcher = new ForeignExceptionDispatcher();
99      String name = "node";
100     ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
101     member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
102     when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception
103     Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
104     spySub = spy(subproc);
105     when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
106     addCommitAnswer();
107   }
108 
109 
110   /**
111    * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
112    */
113   private void addCommitAnswer() throws IOException {
114     doAnswer(new Answer<Void>() {
115       @Override
116       public Void answer(InvocationOnMock invocation) throws Throwable {
117         member.receivedReachedGlobalBarrier(op);
118         return null;
119       }
120     }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
121   }
122 
123   /**
124    * Test the normal sub procedure execution case.
125    */
126   @Test(timeout = 500)
127   public void testSimpleRun() throws Exception {
128     member = buildCohortMember();
129     EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
130     EmptySubprocedure spy = spy(subproc);
131     when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
132 
133     // when we get a prepare, then start the commit phase
134     addCommitAnswer();
135 
136     // run the operation
137     // build a new operation
138     Subprocedure subproc1 = member.createSubprocedure(op, data);
139     member.submitSubprocedure(subproc1);
140     // and wait for it to finish
141     subproc.waitForLocallyCompleted();
142 
143     // make sure everything ran in order
144     InOrder order = inOrder(mockMemberComms, spy);
145     order.verify(spy).acquireBarrier();
146     order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
147     order.verify(spy).insideBarrier();
148     order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data));
149     order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
150         any(ForeignException.class));
151   }
152 
153   /**
154    * Make sure we call cleanup etc, when we have an exception during
155    * {@link Subprocedure#acquireBarrier()}.
156    */
157   @Test(timeout = 60000)
158   public void testMemberPrepareException() throws Exception {
159     buildCohortMemberPair();
160 
161     // mock an exception on Subprocedure's prepare
162     doAnswer(
163         new Answer<Void>() {
164           @Override
165           public Void answer(InvocationOnMock invocation) throws Throwable {
166             throw new IOException("Forced IOException in member acquireBarrier");
167           }
168         }).when(spySub).acquireBarrier();
169 
170     // run the operation
171     // build a new operation
172     Subprocedure subproc = member.createSubprocedure(op, data);
173     member.submitSubprocedure(subproc);
174     // if the operation doesn't die properly, then this will timeout
175     member.closeAndWait(TIMEOUT);
176 
177     // make sure everything ran in order
178     InOrder order = inOrder(mockMemberComms, spySub);
179     order.verify(spySub).acquireBarrier();
180     // Later phases not run
181     order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
182     order.verify(spySub, never()).insideBarrier();
183     order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
184     // error recovery path exercised
185     order.verify(spySub).cancel(anyString(), any(Exception.class));
186     order.verify(spySub).cleanup(any(Exception.class));
187   }
188 
189   /**
190    * Make sure we call cleanup etc, when we have an exception during prepare.
191    */
192   @Test(timeout = 60000)
193   public void testSendMemberAcquiredCommsFailure() throws Exception {
194     buildCohortMemberPair();
195 
196     // mock an exception on Subprocedure's prepare
197     doAnswer(
198         new Answer<Void>() {
199           @Override
200           public Void answer(InvocationOnMock invocation) throws Throwable {
201             throw new IOException("Forced IOException in memeber prepare");
202           }
203         }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
204 
205     // run the operation
206     // build a new operation
207     Subprocedure subproc = member.createSubprocedure(op, data);
208     member.submitSubprocedure(subproc);
209     // if the operation doesn't die properly, then this will timeout
210     member.closeAndWait(TIMEOUT);
211 
212     // make sure everything ran in order
213     InOrder order = inOrder(mockMemberComms, spySub);
214     order.verify(spySub).acquireBarrier();
215     order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
216 
217     // Later phases not run
218     order.verify(spySub, never()).insideBarrier();
219     order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
220     // error recovery path exercised
221     order.verify(spySub).cancel(anyString(), any(Exception.class));
222     order.verify(spySub).cleanup(any(Exception.class));
223   }
224 
225   /**
226    * Fail correctly if coordinator aborts the procedure.  The subprocedure will not interrupt a
227    * running {@link Subprocedure#prepare} -- prepare needs to finish first, and the the abort
228    * is checked.  Thus, the {@link Subprocedure#prepare} should succeed but later get rolled back
229    * via {@link Subprocedure#cleanup}.
230    */
231   @Test(timeout = 60000)
232   public void testCoordinatorAbort() throws Exception {
233     buildCohortMemberPair();
234 
235     // mock that another node timed out or failed to prepare
236     final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
237     doAnswer(
238         new Answer<Void>() {
239           @Override
240           public Void answer(InvocationOnMock invocation) throws Throwable {
241             // inject a remote error (this would have come from an external thread)
242             spySub.cancel("bogus message", oate);
243             // sleep the wake frequency since that is what we promised
244             Thread.sleep(WAKE_FREQUENCY);
245             return null;
246           }
247         }).when(spySub).waitForReachedGlobalBarrier();
248 
249     // run the operation
250     // build a new operation
251     Subprocedure subproc = member.createSubprocedure(op, data);
252     member.submitSubprocedure(subproc);
253     // if the operation doesn't die properly, then this will timeout
254     member.closeAndWait(TIMEOUT);
255 
256     // make sure everything ran in order
257     InOrder order = inOrder(mockMemberComms, spySub);
258     order.verify(spySub).acquireBarrier();
259     order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
260     // Later phases not run
261     order.verify(spySub, never()).insideBarrier();
262     order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
263     // error recovery path exercised
264     order.verify(spySub).cancel(anyString(), any(Exception.class));
265     order.verify(spySub).cleanup(any(Exception.class));
266   }
267 
268   /**
269    * Handle failures if a member's commit phase fails.
270    *
271    * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
272    * 2PC the transaction is committed just before the coordinator sends commit messages to the
273    * member.  Members are then responsible for reading its TX log.  This implementation actually
274    * rolls back, and thus breaks the normal TX guarantees.
275   */
276   @Test(timeout = 60000)
277   public void testMemberCommitException() throws Exception {
278     buildCohortMemberPair();
279 
280     // mock an exception on Subprocedure's prepare
281     doAnswer(
282         new Answer<Void>() {
283           @Override
284           public Void answer(InvocationOnMock invocation) throws Throwable {
285             throw new IOException("Forced IOException in memeber prepare");
286           }
287         }).when(spySub).insideBarrier();
288 
289     // run the operation
290     // build a new operation
291     Subprocedure subproc = member.createSubprocedure(op, data);
292     member.submitSubprocedure(subproc);
293     // if the operation doesn't die properly, then this will timeout
294     member.closeAndWait(TIMEOUT);
295 
296     // make sure everything ran in order
297     InOrder order = inOrder(mockMemberComms, spySub);
298     order.verify(spySub).acquireBarrier();
299     order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
300     order.verify(spySub).insideBarrier();
301 
302     // Later phases not run
303     order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
304     // error recovery path exercised
305     order.verify(spySub).cancel(anyString(), any(Exception.class));
306     order.verify(spySub).cleanup(any(Exception.class));
307   }
308 
309   /**
310    * Handle Failures if a member's commit phase succeeds but notification to coordinator fails
311    *
312    * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
313    * 2PC the transaction is committed just before the coordinator sends commit messages to the
314    * member.  Members are then responsible for reading its TX log.  This implementation actually
315    * rolls back, and thus breaks the normal TX guarantees.
316   */
317   @Test(timeout = 60000)
318   public void testMemberCommitCommsFailure() throws Exception {
319     buildCohortMemberPair();
320     final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
321     doAnswer(
322         new Answer<Void>() {
323           @Override
324           public Void answer(InvocationOnMock invocation) throws Throwable {
325             // inject a remote error (this would have come from an external thread)
326             spySub.cancel("commit comms fail", oate);
327             // sleep the wake frequency since that is what we promised
328             Thread.sleep(WAKE_FREQUENCY);
329             return null;
330           }
331         }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class), eq(data));
332 
333     // run the operation
334     // build a new operation
335     Subprocedure subproc = member.createSubprocedure(op, data);
336     member.submitSubprocedure(subproc);
337     // if the operation doesn't die properly, then this will timeout
338     member.closeAndWait(TIMEOUT);
339 
340     // make sure everything ran in order
341     InOrder order = inOrder(mockMemberComms, spySub);
342     order.verify(spySub).acquireBarrier();
343     order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
344     order.verify(spySub).insideBarrier();
345     order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data));
346     // error recovery path exercised
347     order.verify(spySub).cancel(anyString(), any(Exception.class));
348     order.verify(spySub).cleanup(any(Exception.class));
349   }
350 
351   /**
352    * Fail correctly on getting an external error while waiting for the prepared latch
353    * @throws Exception on failure
354    */
355   @Test(timeout = 60000)
356   public void testPropagateConnectionErrorBackToManager() throws Exception {
357     // setup the operation
358     member = buildCohortMember();
359     ProcedureMember memberSpy = spy(member);
360 
361     // setup the commit and the spy
362     final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
363     ForeignExceptionDispatcher dispSpy = spy(dispatcher);
364     Subprocedure commit = new EmptySubprocedure(member, dispatcher);
365     Subprocedure spy = spy(commit);
366     when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
367 
368     // fail during the prepare phase
369     doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
370     // and throw a connection error when we try to tell the controller about it
371     doThrow(new IOException("Controller is down!")).when(mockMemberComms)
372         .sendMemberAborted(eq(spy), any(ForeignException.class));
373 
374 
375     // run the operation
376     // build a new operation
377     Subprocedure subproc = memberSpy.createSubprocedure(op, data);
378     memberSpy.submitSubprocedure(subproc);
379     // if the operation doesn't die properly, then this will timeout
380     memberSpy.closeAndWait(TIMEOUT);
381 
382     // make sure everything ran in order
383     InOrder order = inOrder(mockMemberComms, spy, dispSpy);
384     // make sure we acquire.
385     order.verify(spy).acquireBarrier();
386     order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
387 
388     // TODO Need to do another refactor to get this to propagate to the coordinator.
389     // make sure we pass a remote exception back the controller
390 //    order.verify(mockMemberComms).sendMemberAborted(eq(spy),
391 //      any(ExternalException.class));
392 //    order.verify(dispSpy).receiveError(anyString(),
393 //        any(ExternalException.class), any());
394   }
395 
396   /**
397    * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
398    * correctly build a new task for the requested operation
399    * @throws Exception on failure
400    */
401   @Test
402   public void testNoTaskToBeRunFromRequest() throws Exception {
403     ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
404     when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
405       .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
406     member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
407     // builder returns null
408     // build a new operation
409     Subprocedure subproc = member.createSubprocedure(op, data);
410     member.submitSubprocedure(subproc);
411     // throws an illegal state exception
412     try {
413       // build a new operation
414       Subprocedure subproc2 = member.createSubprocedure(op, data);
415       member.submitSubprocedure(subproc2);
416     } catch (IllegalStateException ise) {
417     }
418     // throws an illegal argument exception
419     try {
420       // build a new operation
421       Subprocedure subproc3 = member.createSubprocedure(op, data);
422       member.submitSubprocedure(subproc3);
423     } catch (IllegalArgumentException iae) {
424     }
425 
426     // no request should reach the pool
427     verifyZeroInteractions(pool);
428     // get two abort requests
429     // TODO Need to do another refactor to get this to propagate to the coordinator.
430     // verify(mockMemberComms, times(2)).sendMemberAborted(any(Subprocedure.class), any(ExternalException.class));
431   }
432 
433   /**
434    * Helper {@link Procedure} who's phase for each step is just empty
435    */
436   public class EmptySubprocedure extends SubprocedureImpl {
437     public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
438       super( member, op, dispatcher,
439       // TODO 1000000 is an arbitrary number that I picked.
440           WAKE_FREQUENCY, TIMEOUT);
441     }
442   }
443 }