View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.ScheduledChore;
33  import org.apache.hadoop.hbase.Stoppable;
34  import org.apache.hadoop.hbase.testclassification.SmallTests;
35  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36  import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
37  import org.apache.hadoop.hbase.util.Threads;
38  import org.junit.Test;
39  import org.junit.experimental.categories.Category;
40  import org.mockito.Mockito;
41  import org.mockito.invocation.InvocationOnMock;
42  import org.mockito.stubbing.Answer;
43  
44  @Category(SmallTests.class)
45  public class TestServerNonceManager {
46  
47    @Test
48    public void testNormalStartEnd() throws Exception {
49      final long[] numbers = new long[] { NO_NONCE, 1, 2, Long.MAX_VALUE, Long.MIN_VALUE };
50      ServerNonceManager nm = createManager();
51      for (int i = 0; i < numbers.length; ++i) {
52        for (int j = 0; j < numbers.length; ++j) {
53          assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
54        }
55      }
56      // Should be able to start operation the second time w/o nonces.
57      for (int i = 0; i < numbers.length; ++i) {
58        assertTrue(nm.startOperation(numbers[i], NO_NONCE, createStoppable()));
59      }
60      // Fail all operations - should be able to restart.
61      for (int i = 0; i < numbers.length; ++i) {
62        for (int j = 0; j < numbers.length; ++j) {
63          nm.endOperation(numbers[i], numbers[j], false);
64          assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
65        }
66      }
67      // Succeed all operations - should not be able to restart, except for NO_NONCE.
68      for (int i = 0; i < numbers.length; ++i) {
69        for (int j = 0; j < numbers.length; ++j) {
70          nm.endOperation(numbers[i], numbers[j], true);
71          assertEquals(numbers[j] == NO_NONCE,
72              nm.startOperation(numbers[i], numbers[j], createStoppable()));
73        }
74      }
75    }
76  
77    @Test
78    public void testNoEndWithoutStart() {
79      ServerNonceManager nm = createManager();
80      try {
81        nm.endOperation(NO_NONCE, 1, true);
82        fail("Should have thrown");
83      } catch (AssertionError err) {}
84    }
85  
86    @Test
87    public void testCleanup() throws Exception {
88      ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
89      EnvironmentEdgeManager.injectEdge(edge);
90      try {
91        ServerNonceManager nm = createManager(6);
92        ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
93        edge.setValue(1);
94        assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
95        assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
96        assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
97        edge.setValue(2);
98        nm.endOperation(NO_NONCE, 1, true);
99        edge.setValue(4);
100       nm.endOperation(NO_NONCE, 2, true);
101       edge.setValue(9);
102       cleanup.choreForTesting();
103       // Nonce 1 has been cleaned up.
104       assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
105       // Nonce 2 has not been cleaned up.
106       assertFalse(nm.startOperation(NO_NONCE, 2, createStoppable()));
107       // Nonce 3 was active and active ops should never be cleaned up; try to end and start.
108       nm.endOperation(NO_NONCE, 3, false);
109       assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
110       edge.setValue(11);
111       cleanup.choreForTesting();
112       // Now, nonce 2 has been cleaned up.
113       assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
114     } finally {
115       EnvironmentEdgeManager.reset();
116     }
117   }
118 
119   @Test
120   public void testWalNonces() throws Exception {
121     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
122     EnvironmentEdgeManager.injectEdge(edge);
123     try {
124       ServerNonceManager nm = createManager(6);
125       ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
126       // Add nonces from WAL, including dups.
127       edge.setValue(12);
128       nm.reportOperationFromWal(NO_NONCE, 1, 8);
129       nm.reportOperationFromWal(NO_NONCE, 2, 2);
130       nm.reportOperationFromWal(NO_NONCE, 3, 5);
131       nm.reportOperationFromWal(NO_NONCE, 3, 6);
132       // WAL nonces should prevent cross-server conflicts.
133       assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
134       // Make sure we ignore very old nonces, but not borderline old nonces.
135       assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
136       assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
137       // Make sure grace period is counted from recovery time.
138       edge.setValue(17);
139       cleanup.choreForTesting();
140       assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
141       assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
142       edge.setValue(19);
143       cleanup.choreForTesting();
144       assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
145       assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
146     } finally {
147       EnvironmentEdgeManager.reset();
148     }
149   }
150 
151   @Test
152   public void testConcurrentAttempts() throws Exception {
153     final ServerNonceManager nm = createManager();
154 
155     nm.startOperation(NO_NONCE, 1, createStoppable());
156     TestRunnable tr = new TestRunnable(nm, 1, false, createStoppable());
157     Thread t = tr.start();
158     waitForThreadToBlockOrExit(t);
159     nm.endOperation(NO_NONCE, 1, true); // operation succeeded
160     t.join(); // thread must now unblock and not proceed (result checked inside).
161     tr.propagateError();
162 
163     nm.startOperation(NO_NONCE, 2, createStoppable());
164     tr = new TestRunnable(nm, 2, true, createStoppable());
165     t = tr.start();
166     waitForThreadToBlockOrExit(t);
167     nm.endOperation(NO_NONCE, 2, false);
168     t.join(); // thread must now unblock and allow us to proceed (result checked inside).
169     tr.propagateError();
170     nm.endOperation(NO_NONCE, 2, true); // that is to say we should be able to end operation
171 
172     nm.startOperation(NO_NONCE, 3, createStoppable());
173     tr = new TestRunnable(nm, 4, true, createStoppable());
174     tr.start().join();  // nonce 3 must have no bearing on nonce 4
175     tr.propagateError();
176   }
177 
178   @Test
179   public void testStopWaiting() throws Exception {
180     final ServerNonceManager nm = createManager();
181     nm.setConflictWaitIterationMs(1);
182     Stoppable stoppingStoppable = createStoppable();
183     Mockito.when(stoppingStoppable.isStopped()).thenAnswer(new Answer<Boolean>() {
184       AtomicInteger answer = new AtomicInteger(3);
185       @Override
186       public Boolean answer(InvocationOnMock invocation) throws Throwable {
187         return 0 < answer.decrementAndGet();
188       }
189     });
190 
191     nm.startOperation(NO_NONCE, 1, createStoppable());
192     TestRunnable tr = new TestRunnable(nm, 1, null, stoppingStoppable);
193     Thread t = tr.start();
194     waitForThreadToBlockOrExit(t);
195     // thread must eventually throw
196     t.join();
197     tr.propagateError();
198   }
199 
200   private void waitForThreadToBlockOrExit(Thread t) throws InterruptedException {
201     for (int i = 9; i >= 0; --i) {
202       if (t.getState() == Thread.State.TIMED_WAITING || t.getState() == Thread.State.WAITING
203           || t.getState() == Thread.State.BLOCKED || t.getState() == Thread.State.TERMINATED) {
204         return;
205       }
206       if (i > 0) Thread.sleep(300);
207     }
208     // Thread didn't block in 3 seconds. What is it doing? Continue the test, we'd rather
209     // have a very strange false positive then false negative due to timing.
210   }
211 
212   private static class TestRunnable implements Runnable {
213     public final CountDownLatch startedLatch = new CountDownLatch(1); // It's the final countdown!
214 
215     private final ServerNonceManager nm;
216     private final long nonce;
217     private final Boolean expected;
218     private final Stoppable stoppable;
219 
220     private Throwable throwable = null;
221 
222     public TestRunnable(ServerNonceManager nm, long nonce, Boolean expected, Stoppable stoppable) {
223       this.nm = nm;
224       this.nonce = nonce;
225       this.expected = expected;
226       this.stoppable = stoppable;
227     }
228 
229     public void propagateError() throws Exception {
230       if (throwable == null) return;
231       throw new Exception(throwable);
232     }
233 
234     public Thread start() {
235       Thread t = new Thread(this);
236       t = Threads.setDaemonThreadRunning(t);
237       try {
238         startedLatch.await();
239       } catch (InterruptedException e) {
240         fail("Unexpected");
241       }
242       return t;
243     }
244 
245     @Override
246     public void run() {
247       startedLatch.countDown();
248       boolean shouldThrow = expected == null;
249       boolean hasThrown = true;
250       try {
251         boolean result = nm.startOperation(NO_NONCE, nonce, stoppable);
252         hasThrown = false;
253         if (!shouldThrow) {
254           assertEquals(expected.booleanValue(), result);
255         }
256       } catch (Throwable t) {
257         if (!shouldThrow) {
258           throwable = t;
259         }
260       }
261       if (shouldThrow && !hasThrown) {
262         throwable = new AssertionError("Should have thrown");
263       }
264     }
265   }
266 
267   private Stoppable createStoppable() {
268     Stoppable s = Mockito.mock(Stoppable.class);
269     Mockito.when(s.isStopped()).thenReturn(false);
270     return s;
271   }
272 
273   private ServerNonceManager createManager() {
274     return createManager(null);
275   }
276 
277   private ServerNonceManager createManager(Integer gracePeriod) {
278     Configuration conf = HBaseConfiguration.create();
279     if (gracePeriod != null) {
280       conf.setInt(ServerNonceManager.HASH_NONCE_GRACE_PERIOD_KEY, gracePeriod.intValue());
281     }
282     return new ServerNonceManager(conf);
283   }
284 }