View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.ArrayList;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
32  import org.apache.hadoop.hbase.io.util.StreamUtils;
33  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
34  import org.apache.hadoop.hbase.testclassification.LargeTests;
35  
36  import org.junit.After;
37  import org.junit.Before;
38  import org.junit.Test;
39  import org.junit.experimental.categories.Category;
40  
41  import static org.junit.Assert.assertEquals;
42  import static org.junit.Assert.assertTrue;
43  import static org.junit.Assert.fail;
44  
45  @Category(LargeTests.class)
46  public class TestProcedureReplayOrder {
47    private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class);
48  
49    private static final int NUM_THREADS = 16;
50  
51    private ProcedureExecutor<Void> procExecutor;
52    private TestProcedureEnv procEnv;
53    private ProcedureStore procStore;
54  
55    private HBaseCommonTestingUtility htu;
56    private FileSystem fs;
57    private Path testDir;
58    private Path logDir;
59  
60    @Before
61    public void setUp() throws IOException {
62      htu = new HBaseCommonTestingUtility();
63      htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 25);
64  
65      testDir = htu.getDataTestDir();
66      fs = testDir.getFileSystem(htu.getConfiguration());
67      assertTrue(testDir.depth() > 1);
68  
69      logDir = new Path(testDir, "proc-logs");
70      procEnv = new TestProcedureEnv();
71      procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
72      procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
73      procStore.start(NUM_THREADS);
74      procExecutor.start(1, true);
75    }
76  
77    @After
78    public void tearDown() throws IOException {
79      procExecutor.stop();
80      procStore.stop(false);
81      fs.delete(logDir, true);
82    }
83  
84    @Test(timeout=90000)
85    public void testSingleStepReplayOrder() throws Exception {
86      final int NUM_PROC_XTHREAD = 32;
87      final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD;
88  
89      // submit the procedures
90      submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class);
91  
92      while (procEnv.getExecId() < NUM_PROCS) {
93        Thread.sleep(100);
94      }
95  
96      // restart the executor and allow the procedures to run
97      ProcedureTestingUtility.restart(procExecutor);
98  
99      // wait the execution of all the procedures and
100     // assert that the execution order was sorted by procId
101     ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
102     procEnv.assertSortedExecList(NUM_PROCS);
103   }
104 
105   @Test(timeout=90000)
106   public void testMultiStepReplayOrder() throws Exception {
107     final int NUM_PROC_XTHREAD = 24;
108     final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2);
109 
110     // submit the procedures
111     submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class);
112 
113     while (procEnv.getExecId() < NUM_PROCS) {
114       Thread.sleep(100);
115     }
116 
117     // restart the executor and allow the procedures to run
118     ProcedureTestingUtility.restart(procExecutor);
119 
120     // wait the execution of all the procedures and
121     // assert that the execution order was sorted by procId
122     ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
123     procEnv.assertSortedExecList(NUM_PROCS);
124   }
125 
126   private void submitProcedures(final int nthreads, final int nprocPerThread,
127       final Class<?> procClazz) throws Exception {
128     Thread[] submitThreads = new Thread[nthreads];
129     for (int i = 0; i < submitThreads.length; ++i) {
130       submitThreads[i] = new Thread() {
131         @Override
132         public void run() {
133           for (int i = 0; i < nprocPerThread; ++i) {
134             try {
135               procExecutor.submitProcedure((Procedure)procClazz.newInstance());
136             } catch (InstantiationException|IllegalAccessException e) {
137               LOG.error("unable to instantiate the procedure", e);
138               fail("failure during the proc.newInstance(): " + e.getMessage());
139             }
140           }
141         }
142       };
143     }
144 
145     for (int i = 0; i < submitThreads.length; ++i) {
146       submitThreads[i].start();
147     }
148 
149     for (int i = 0; i < submitThreads.length; ++i) {
150       submitThreads[i].join();
151     }
152   }
153 
154   private static class TestProcedureEnv {
155     private ArrayList<TestProcedure> execList = new ArrayList<TestProcedure>();
156     private AtomicLong execTimestamp = new AtomicLong(0);
157 
158     public long getExecId() {
159       return execTimestamp.get();
160     }
161 
162     public long nextExecId() {
163       return execTimestamp.incrementAndGet();
164     }
165 
166     public void addToExecList(final TestProcedure proc) {
167       execList.add(proc);
168     }
169 
170     public void assertSortedExecList(int numProcs) {
171       assertEquals(numProcs, execList.size());
172       LOG.debug("EXEC LIST: " + execList);
173       for (int i = 0; i < execList.size() - 1; ++i) {
174         TestProcedure a = execList.get(i);
175         TestProcedure b = execList.get(i + 1);
176         assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId());
177       }
178     }
179   }
180 
181   public static abstract class TestProcedure extends Procedure<TestProcedureEnv> {
182     protected long execId = 0;
183     protected int step = 0;
184 
185     public long getExecId() {
186       return execId;
187     }
188 
189     @Override
190     protected void rollback(TestProcedureEnv env) { }
191 
192     @Override
193     protected boolean abort(TestProcedureEnv env) { return true; }
194 
195     @Override
196     protected void serializeStateData(final OutputStream stream) throws IOException {
197       StreamUtils.writeLong(stream, execId);
198     }
199 
200     @Override
201     protected void deserializeStateData(final InputStream stream) throws IOException {
202       execId = StreamUtils.readLong(stream);
203       step = 2;
204     }
205   }
206 
207   public static class TestSingleStepProcedure extends TestProcedure {
208     public TestSingleStepProcedure() { }
209 
210     @Override
211     protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
212       LOG.trace("execute procedure step=" + step + ": " + this);
213       if (step == 0) {
214         step = 1;
215         execId = env.nextExecId();
216         return new Procedure[] { this };
217       } else if (step == 2) {
218         env.addToExecList(this);
219         return null;
220       }
221       throw new ProcedureYieldException();
222     }
223 
224     @Override
225     public String toString() {
226       return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")";
227     }
228   }
229 
230   public static class TestTwoStepProcedure extends TestProcedure {
231     public TestTwoStepProcedure() { }
232 
233     @Override
234     protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
235       LOG.trace("execute procedure step=" + step + ": " + this);
236       if (step == 0) {
237         step = 1;
238         execId = env.nextExecId();
239         return new Procedure[] { new TestSingleStepProcedure() };
240       } else if (step == 2) {
241         env.addToExecList(this);
242         return null;
243       }
244       throw new ProcedureYieldException();
245     }
246 
247     @Override
248     public String toString() {
249       return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")";
250     }
251   }
252 }