View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import static org.mockito.Matchers.any;
21  import static org.mockito.Matchers.anyString;
22  import static org.mockito.Mockito.mock;
23  import static org.mockito.Mockito.never;
24  import static org.mockito.Mockito.spy;
25  import static org.mockito.Mockito.verify;
26  import static org.mockito.Mockito.when;
27  
28  import java.util.ArrayList;
29  import java.util.List;
30  import java.util.concurrent.CountDownLatch;
31  
32  import org.apache.hadoop.hbase.testclassification.SmallTests;
33  import org.apache.hadoop.hbase.errorhandling.ForeignException;
34  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
35  import org.junit.Before;
36  import org.junit.Test;
37  import org.junit.experimental.categories.Category;
38  
39  /**
40   * Demonstrate how Procedure handles single members, multiple members, and errors semantics
41   */
42  @Category(SmallTests.class)
43  public class TestProcedure {
44  
45    ProcedureCoordinator coord;
46  
47    @Before
48    public void setup() {
49      coord = mock(ProcedureCoordinator.class);
50      final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class);
51      when(coord.getRpcs()).thenReturn(comms); // make it not null
52    }
53  
54    class LatchedProcedure extends Procedure {
55      CountDownLatch startedAcquireBarrier = new CountDownLatch(1);
56      CountDownLatch startedDuringBarrier = new CountDownLatch(1);
57      CountDownLatch completedProcedure = new CountDownLatch(1);
58  
59      public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor,
60          long wakeFreq, long timeout, String opName, byte[] data,
61          List<String> expectedMembers) {
62        super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers);
63      }
64  
65      @Override
66      public void sendGlobalBarrierStart() {
67        startedAcquireBarrier.countDown();
68      }
69  
70      @Override
71      public void sendGlobalBarrierReached() {
72        startedDuringBarrier.countDown();
73      }
74  
75      @Override
76      public void sendGlobalBarrierComplete() {
77        completedProcedure.countDown();
78      }
79    };
80  
81    /**
82     * With a single member, verify ordered execution.  The Coordinator side is run in a separate
83     * thread so we can only trigger from members and wait for particular state latches.
84     */
85    @Test(timeout = 60000)
86    public void testSingleMember() throws Exception {
87      // The member
88      List<String> members =  new ArrayList<String>();
89      members.add("member");
90      LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
91          Integer.MAX_VALUE, "op", null, members);
92      final LatchedProcedure procspy = spy(proc);
93      // coordinator: start the barrier procedure
94      new Thread() {
95        public void run() {
96          procspy.call();
97        }
98      }.start();
99  
100     // coordinator: wait for the barrier to be acquired, then send start barrier
101     proc.startedAcquireBarrier.await();
102 
103     // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
104     verify(procspy).sendGlobalBarrierStart();
105     verify(procspy, never()).sendGlobalBarrierReached();
106     verify(procspy, never()).sendGlobalBarrierComplete();
107     verify(procspy, never()).barrierAcquiredByMember(anyString());
108 
109     // member: trigger global barrier acquisition
110     proc.barrierAcquiredByMember(members.get(0));
111 
112     // coordinator: wait for global barrier to be acquired.
113     proc.acquiredBarrierLatch.await();
114     verify(procspy).sendGlobalBarrierStart(); // old news
115 
116     // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
117     // or was not called here.
118 
119     // member: trigger global barrier release
120     proc.barrierReleasedByMember(members.get(0), new byte[0]);
121 
122     // coordinator: wait for procedure to be completed
123     proc.completedProcedure.await();
124     verify(procspy).sendGlobalBarrierReached();
125     verify(procspy).sendGlobalBarrierComplete();
126     verify(procspy, never()).receive(any(ForeignException.class));
127   }
128 
129   @Test(timeout = 60000)
130   public void testMultipleMember() throws Exception {
131     // 2 members
132     List<String> members =  new ArrayList<String>();
133     members.add("member1");
134     members.add("member2");
135 
136     LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
137         Integer.MAX_VALUE, "op", null, members);
138     final LatchedProcedure procspy = spy(proc);
139     // start the barrier procedure
140     new Thread() {
141       public void run() {
142         procspy.call();
143       }
144     }.start();
145 
146     // coordinator: wait for the barrier to be acquired, then send start barrier
147     procspy.startedAcquireBarrier.await();
148 
149     // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
150     verify(procspy).sendGlobalBarrierStart();
151     verify(procspy, never()).sendGlobalBarrierReached();
152     verify(procspy, never()).sendGlobalBarrierComplete();
153     verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals
154 
155     // member0: [1/2] trigger global barrier acquisition.
156     procspy.barrierAcquiredByMember(members.get(0));
157 
158     // coordinator not satisified.
159     verify(procspy).sendGlobalBarrierStart();
160     verify(procspy, never()).sendGlobalBarrierReached();
161     verify(procspy, never()).sendGlobalBarrierComplete();
162 
163     // member 1: [2/2] trigger global barrier acquisition.
164     procspy.barrierAcquiredByMember(members.get(1));
165 
166     // coordinator: wait for global barrier to be acquired.
167     procspy.startedDuringBarrier.await();
168     verify(procspy).sendGlobalBarrierStart(); // old news
169 
170     // member 1, 2: trigger global barrier release
171     procspy.barrierReleasedByMember(members.get(0), new byte[0]);
172     procspy.barrierReleasedByMember(members.get(1), new byte[0]);
173 
174     // coordinator wait for procedure to be completed
175     procspy.completedProcedure.await();
176     verify(procspy).sendGlobalBarrierReached();
177     verify(procspy).sendGlobalBarrierComplete();
178     verify(procspy, never()).receive(any(ForeignException.class));
179   }
180 
181   @Test(timeout = 60000)
182   public void testErrorPropagation() throws Exception {
183     List<String> members =  new ArrayList<String>();
184     members.add("member");
185     Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100,
186         Integer.MAX_VALUE, "op", null, members);
187     final Procedure procspy = spy(proc);
188 
189     ForeignException cause = new ForeignException("SRC", "External Exception");
190     proc.receive(cause);
191 
192     // start the barrier procedure
193     Thread t = new Thread() {
194       public void run() {
195         procspy.call();
196       }
197     };
198     t.start();
199     t.join();
200 
201     verify(procspy, never()).sendGlobalBarrierStart();
202     verify(procspy, never()).sendGlobalBarrierReached();
203     verify(procspy).sendGlobalBarrierComplete();
204   }
205 
206   @Test(timeout = 60000)
207   public void testBarrieredErrorPropagation() throws Exception {
208     List<String> members =  new ArrayList<String>();
209     members.add("member");
210     LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
211         Integer.MAX_VALUE, "op", null, members);
212     final LatchedProcedure procspy = spy(proc);
213 
214     // start the barrier procedure
215     Thread t = new Thread() {
216       public void run() {
217         procspy.call();
218       }
219     };
220     t.start();
221 
222     // now test that we can put an error in before the commit phase runs
223     procspy.startedAcquireBarrier.await();
224     ForeignException cause = new ForeignException("SRC", "External Exception");
225     procspy.receive(cause);
226     procspy.barrierAcquiredByMember(members.get(0));
227     t.join();
228 
229     // verify state of all the object
230     verify(procspy).sendGlobalBarrierStart();
231     verify(procspy).sendGlobalBarrierComplete();
232     verify(procspy, never()).sendGlobalBarrierReached();
233   }
234 }