View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.CountDownLatch;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.FileStatus;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
33  import org.apache.hadoop.hbase.ProcedureInfo;
34  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
35  import org.apache.hadoop.hbase.testclassification.SmallTests;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
38  import org.apache.hadoop.hbase.util.Threads;
39  import org.junit.After;
40  import org.junit.Before;
41  import org.junit.Test;
42  import org.junit.experimental.categories.Category;
43  
44  import static org.junit.Assert.assertEquals;
45  import static org.junit.Assert.assertFalse;
46  import static org.junit.Assert.assertTrue;
47  
48  @Category(SmallTests.class)
49  public class TestProcedureRecovery {
50    private static final Log LOG = LogFactory.getLog(TestProcedureRecovery.class);
51  
52    private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
53  
54    private static TestProcEnv procEnv;
55    private static ProcedureExecutor<TestProcEnv> procExecutor;
56    private static ProcedureStore procStore;
57    private static int procSleepInterval;
58  
59    private HBaseCommonTestingUtility htu;
60    private FileSystem fs;
61    private Path testDir;
62    private Path logDir;
63  
64    @Before
65    public void setUp() throws IOException {
66      htu = new HBaseCommonTestingUtility();
67      testDir = htu.getDataTestDir();
68      fs = testDir.getFileSystem(htu.getConfiguration());
69      assertTrue(testDir.depth() > 1);
70  
71      logDir = new Path(testDir, "proc-logs");
72      procEnv = new TestProcEnv();
73      procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
74      procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
75      procExecutor.testing = new ProcedureExecutor.Testing();
76      procStore.start(PROCEDURE_EXECUTOR_SLOTS);
77      procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
78      procSleepInterval = 0;
79    }
80  
81    @After
82    public void tearDown() throws IOException {
83      procExecutor.stop();
84      procStore.stop(false);
85      fs.delete(logDir, true);
86    }
87  
88    private void restart() throws Exception {
89      dumpLogDirState();
90      ProcedureTestingUtility.restart(procExecutor);
91      dumpLogDirState();
92    }
93  
94    public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
95      private int step = 0;
96  
97      public TestSingleStepProcedure() { }
98  
99      @Override
100     protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
101       env.waitOnLatch();
102       LOG.debug("execute procedure " + this + " step=" + step);
103       step++;
104       setResult(Bytes.toBytes(step));
105       return null;
106     }
107 
108     @Override
109     protected void rollback(TestProcEnv env) { }
110 
111     @Override
112     protected boolean abort(TestProcEnv env) { return true; }
113   }
114 
115   public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
116     private AtomicBoolean abort = new AtomicBoolean(false);
117     private int step = 0;
118 
119     @Override
120     protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
121       env.waitOnLatch();
122       LOG.debug("execute procedure " + this + " step=" + step);
123       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
124       step++;
125       Threads.sleepWithoutInterrupt(procSleepInterval);
126       if (isAborted()) {
127         setFailure(new RemoteProcedureException(getClass().getName(),
128           new ProcedureAbortedException(
129             "got an abort at " + getClass().getName() + " step=" + step)));
130         return null;
131       }
132       return null;
133     }
134 
135     @Override
136     protected void rollback(TestProcEnv env) {
137       LOG.debug("rollback procedure " + this + " step=" + step);
138       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
139       step++;
140     }
141 
142     @Override
143     protected boolean abort(TestProcEnv env) {
144       abort.set(true);
145       return true;
146     }
147 
148     private boolean isAborted() {
149       boolean aborted = abort.get();
150       BaseTestStepProcedure proc = this;
151       while (proc.hasParent() && !aborted) {
152         proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId());
153         aborted = proc.isAborted();
154       }
155       return aborted;
156     }
157   }
158 
159   public static class TestMultiStepProcedure extends BaseTestStepProcedure {
160     public TestMultiStepProcedure() { }
161 
162     @Override
163     public Procedure[] execute(TestProcEnv env) throws InterruptedException {
164       super.execute(env);
165       return isFailed() ? null : new Procedure[] { new Step1Procedure() };
166     }
167 
168     public static class Step1Procedure extends BaseTestStepProcedure {
169       public Step1Procedure() { }
170 
171       @Override
172       protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
173         super.execute(env);
174         return isFailed() ? null : new Procedure[] { new Step2Procedure() };
175       }
176     }
177 
178     public static class Step2Procedure extends BaseTestStepProcedure {
179       public Step2Procedure() { }
180     }
181   }
182 
183   @Test
184   public void testNoopLoad() throws Exception {
185     restart();
186   }
187 
188   @Test(timeout=30000)
189   public void testSingleStepProcRecovery() throws Exception {
190     Procedure proc = new TestSingleStepProcedure();
191     procExecutor.testing.killBeforeStoreUpdate = true;
192     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
193     assertFalse(procExecutor.isRunning());
194     procExecutor.testing.killBeforeStoreUpdate = false;
195 
196     // Restart and verify that the procedures restart
197     long restartTs = EnvironmentEdgeManager.currentTime();
198     restart();
199     waitProcedure(procId);
200     ProcedureInfo result = procExecutor.getResult(procId);
201     assertTrue(result.getLastUpdate() > restartTs);
202     ProcedureTestingUtility.assertProcNotFailed(result);
203     assertEquals(1, Bytes.toInt(result.getResult()));
204     long resultTs = result.getLastUpdate();
205 
206     // Verify that after another restart the result is still there
207     restart();
208     result = procExecutor.getResult(procId);
209     ProcedureTestingUtility.assertProcNotFailed(result);
210     assertEquals(resultTs, result.getLastUpdate());
211     assertEquals(1, Bytes.toInt(result.getResult()));
212   }
213 
214   @Test(timeout=30000)
215   public void testMultiStepProcRecovery() throws Exception {
216     // Step 0 - kill
217     Procedure proc = new TestMultiStepProcedure();
218     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
219     assertFalse(procExecutor.isRunning());
220 
221     // Step 0 exec && Step 1 - kill
222     restart();
223     waitProcedure(procId);
224     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
225     assertFalse(procExecutor.isRunning());
226 
227     // Step 1 exec && step 2 - kill
228     restart();
229     waitProcedure(procId);
230     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
231     assertFalse(procExecutor.isRunning());
232 
233     // Step 2 exec
234     restart();
235     waitProcedure(procId);
236     assertTrue(procExecutor.isRunning());
237 
238     // The procedure is completed
239     ProcedureInfo result = procExecutor.getResult(procId);
240     ProcedureTestingUtility.assertProcNotFailed(result);
241   }
242 
243   @Test(timeout=30000)
244   public void testMultiStepRollbackRecovery() throws Exception {
245     // Step 0 - kill
246     Procedure proc = new TestMultiStepProcedure();
247     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
248     assertFalse(procExecutor.isRunning());
249 
250     // Step 0 exec && Step 1 - kill
251     restart();
252     waitProcedure(procId);
253     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
254     assertFalse(procExecutor.isRunning());
255 
256     // Step 1 exec && step 2 - kill
257     restart();
258     waitProcedure(procId);
259     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
260     assertFalse(procExecutor.isRunning());
261 
262     // Step 2 exec - rollback - kill
263     procSleepInterval = 2500;
264     restart();
265     assertTrue(procExecutor.abort(procId));
266     waitProcedure(procId);
267     assertFalse(procExecutor.isRunning());
268 
269     // rollback - kill
270     restart();
271     waitProcedure(procId);
272     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
273     assertFalse(procExecutor.isRunning());
274 
275     // rollback - complete
276     restart();
277     waitProcedure(procId);
278     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
279     assertFalse(procExecutor.isRunning());
280 
281     // Restart the executor and get the result
282     restart();
283     waitProcedure(procId);
284 
285     // The procedure is completed
286     ProcedureInfo result = procExecutor.getResult(procId);
287     ProcedureTestingUtility.assertIsAbortException(result);
288   }
289 
290   @Test(timeout=30000)
291   public void testCompletedProcWithSameNonce() throws Exception {
292     final long nonceGroup = 123;
293     final long nonce = 2222;
294     Procedure proc = new TestSingleStepProcedure();
295     // Submit a proc and wait for its completion
296     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
297 
298     // Restart
299     restart();
300     waitProcedure(procId);
301 
302     Procedure proc2 = new TestSingleStepProcedure();
303     // Submit a procedure with the same nonce and expect the same procedure would return.
304     long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
305     assertTrue(procId == procId2);
306 
307     ProcedureInfo result = procExecutor.getResult(procId2);
308     ProcedureTestingUtility.assertProcNotFailed(result);
309   }
310 
311   @Test(timeout=30000)
312   public void testRunningProcWithSameNonce() throws Exception {
313     final long nonceGroup = 456;
314     final long nonce = 33333;
315     Procedure proc = new TestSingleStepProcedure();
316     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
317 
318     // Restart (use a latch to prevent the step execution until we submitted proc2)
319     CountDownLatch latch = new CountDownLatch(1);
320     procEnv.setWaitLatch(latch);
321     restart();
322     // Submit a procedure with the same nonce and expect the same procedure would return.
323     Procedure proc2 = new TestSingleStepProcedure();
324     long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
325     latch.countDown();
326     procEnv.setWaitLatch(null);
327 
328     // The original proc is not completed and the new submission should have the same proc Id.
329     assertTrue(procId == procId2);
330   }
331 
332 
333   public static class TestStateMachineProcedure
334       extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
335     enum State { STATE_1, STATE_2, STATE_3, DONE }
336 
337     public TestStateMachineProcedure() {}
338 
339     private AtomicBoolean aborted = new AtomicBoolean(false);
340     private int iResult = 0;
341 
342     @Override
343     protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
344       switch (state) {
345         case STATE_1:
346           LOG.info("execute step 1 " + this);
347           setNextState(State.STATE_2);
348           iResult += 3;
349           break;
350         case STATE_2:
351           LOG.info("execute step 2 " + this);
352           setNextState(State.STATE_3);
353           iResult += 5;
354           break;
355         case STATE_3:
356           LOG.info("execute step 3 " + this);
357           Threads.sleepWithoutInterrupt(procSleepInterval);
358           if (aborted.get()) {
359             LOG.info("aborted step 3 " + this);
360             setAbortFailure("test", "aborted");
361             break;
362           }
363           setNextState(State.DONE);
364           iResult += 7;
365           setResult(Bytes.toBytes(iResult));
366           return Flow.NO_MORE_STATE;
367         default:
368           throw new UnsupportedOperationException();
369       }
370       return Flow.HAS_MORE_STATE;
371     }
372 
373     @Override
374     protected void rollbackState(TestProcEnv env, final State state) {
375       switch (state) {
376         case STATE_1:
377           LOG.info("rollback step 1 " + this);
378           break;
379         case STATE_2:
380           LOG.info("rollback step 2 " + this);
381           break;
382         case STATE_3:
383           LOG.info("rollback step 3 " + this);
384           break;
385         default:
386           throw new UnsupportedOperationException();
387       }
388     }
389 
390     @Override
391     protected State getState(final int stateId) {
392       return State.values()[stateId];
393     }
394 
395     @Override
396     protected int getStateId(final State state) {
397       return state.ordinal();
398     }
399 
400     @Override
401     protected State getInitialState() {
402       return State.STATE_1;
403     }
404 
405     @Override
406     protected boolean abort(TestProcEnv env) {
407       aborted.set(true);
408       return true;
409     }
410 
411     @Override
412     protected void serializeStateData(final OutputStream stream) throws IOException {
413       super.serializeStateData(stream);
414       stream.write(Bytes.toBytes(iResult));
415     }
416 
417     @Override
418     protected void deserializeStateData(final InputStream stream) throws IOException {
419       super.deserializeStateData(stream);
420       byte[] data = new byte[4];
421       stream.read(data);
422       iResult = Bytes.toInt(data);
423     }
424   }
425 
426   @Test(timeout=30000)
427   public void testStateMachineRecovery() throws Exception {
428     ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
429     ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
430 
431     // Step 1 - kill
432     Procedure proc = new TestStateMachineProcedure();
433     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
434     assertFalse(procExecutor.isRunning());
435 
436     // Step 1 exec && Step 2 - kill
437     restart();
438     waitProcedure(procId);
439     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
440     assertFalse(procExecutor.isRunning());
441 
442     // Step 2 exec && step 3 - kill
443     restart();
444     waitProcedure(procId);
445     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
446     assertFalse(procExecutor.isRunning());
447 
448     // Step 3 exec
449     restart();
450     waitProcedure(procId);
451     assertTrue(procExecutor.isRunning());
452 
453     // The procedure is completed
454     ProcedureInfo result = procExecutor.getResult(procId);
455     ProcedureTestingUtility.assertProcNotFailed(result);
456     assertEquals(15, Bytes.toInt(result.getResult()));
457   }
458 
459   @Test(timeout=30000)
460   public void testStateMachineRollbackRecovery() throws Exception {
461     ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
462     ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
463 
464     // Step 1 - kill
465     Procedure proc = new TestStateMachineProcedure();
466     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
467     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
468     assertFalse(procExecutor.isRunning());
469 
470     // Step 1 exec && Step 2 - kill
471     restart();
472     waitProcedure(procId);
473     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
474     assertFalse(procExecutor.isRunning());
475 
476     // Step 2 exec && step 3 - kill
477     restart();
478     waitProcedure(procId);
479     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
480     assertFalse(procExecutor.isRunning());
481 
482     // Step 3 exec - rollback step 3 - kill
483     procSleepInterval = 2500;
484     restart();
485     assertTrue(procExecutor.abort(procId));
486     waitProcedure(procId);
487     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
488     assertFalse(procExecutor.isRunning());
489 
490     // Rollback step 3 - rollback step 2 - kill
491     restart();
492     waitProcedure(procId);
493     assertFalse(procExecutor.isRunning());
494     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
495 
496     // Rollback step 2 - step 1 - kill
497     restart();
498     waitProcedure(procId);
499     assertFalse(procExecutor.isRunning());
500     ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
501 
502     // Rollback step 1 - complete
503     restart();
504     waitProcedure(procId);
505     assertTrue(procExecutor.isRunning());
506 
507     // The procedure is completed
508     ProcedureInfo result = procExecutor.getResult(procId);
509     ProcedureTestingUtility.assertIsAbortException(result);
510   }
511 
512   private void waitProcedure(final long procId) {
513     ProcedureTestingUtility.waitProcedure(procExecutor, procId);
514     dumpLogDirState();
515   }
516 
517   private void dumpLogDirState() {
518     try {
519       FileStatus[] files = fs.listStatus(logDir);
520       if (files != null && files.length > 0) {
521         for (FileStatus file: files) {
522           assertTrue(file.toString(), file.isFile());
523           LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
524         }
525       } else {
526         LOG.debug("no files under: " + logDir);
527       }
528     } catch (IOException e) {
529       LOG.warn("Unable to dump " + logDir, e);
530     }
531   }
532 
533   private static class TestProcEnv {
534     private CountDownLatch latch = null;
535 
536     /**
537      * set/unset a latch. every procedure execute() step will wait on the latch if any.
538      */
539     public void setWaitLatch(CountDownLatch latch) {
540       this.latch = latch;
541     }
542 
543     public void waitOnLatch() throws InterruptedException {
544       if (latch != null) {
545         latch.await();
546       }
547     }
548   }
549 }