View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.ProcedureInfo;
32  import org.apache.hadoop.hbase.util.Threads;
33  import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
34  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
36  import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
37  import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
38  import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
39  
40  import static org.junit.Assert.assertEquals;
41  import static org.junit.Assert.assertFalse;
42  import static org.junit.Assert.assertTrue;
43  
44  public class ProcedureTestingUtility {
45    private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class);
46  
47    private ProcedureTestingUtility() {
48    }
49  
50    public static ProcedureStore createStore(final Configuration conf, final FileSystem fs,
51        final Path baseDir) throws IOException {
52      return createWalStore(conf, fs, baseDir);
53    }
54  
55    public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
56        final Path logDir) throws IOException {
57      return new WALProcedureStore(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
58        @Override
59        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
60          // no-op
61        }
62      });
63    }
64  
65    public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
66        throws Exception {
67      restart(procExecutor, null, true);
68    }
69  
70    public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
71        Runnable beforeStartAction, boolean failOnCorrupted) throws Exception {
72      ProcedureStore procStore = procExecutor.getStore();
73      int storeThreads = procExecutor.getNumThreads();
74      int execThreads = procExecutor.getNumThreads();
75      // stop
76      procExecutor.stop();
77      procExecutor.join();
78      procStore.stop(false);
79      // nothing running...
80      if (beforeStartAction != null) {
81        beforeStartAction.run();
82      }
83      // re-start
84      procStore.start(storeThreads);
85      procExecutor.start(execThreads, failOnCorrupted);
86    }
87  
88    public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
89        boolean value) {
90      if (procExecutor.testing == null) {
91        procExecutor.testing = new ProcedureExecutor.Testing();
92      }
93      procExecutor.testing.killBeforeStoreUpdate = value;
94      LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
95    }
96  
97    public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
98        boolean value) {
99      if (procExecutor.testing == null) {
100       procExecutor.testing = new ProcedureExecutor.Testing();
101     }
102     procExecutor.testing.toggleKillBeforeStoreUpdate = value;
103   }
104 
105   public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
106     if (procExecutor.testing == null) {
107       procExecutor.testing = new ProcedureExecutor.Testing();
108     }
109     procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
110     LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
111   }
112 
113   public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
114       boolean value) {
115     ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
116     ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
117   }
118 
119   public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
120       throws IOException {
121     NoopProcedureStore procStore = new NoopProcedureStore();
122     ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<TEnv>(conf, env, procStore);
123     procStore.start(1);
124     procExecutor.start(1, false);
125     try {
126       return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
127     } finally {
128       procStore.stop(false);
129       procExecutor.stop();
130     }
131   }
132 
133   public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
134     return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
135   }
136 
137   public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
138       final long nonceGroup,
139       final long nonce) {
140     long procId = procExecutor.submitProcedure(proc, nonceGroup, nonce);
141     waitProcedure(procExecutor, procId);
142     return procId;
143   }
144 
145   public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
146     while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
147       Threads.sleepWithoutInterrupt(250);
148     }
149   }
150 
151   public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
152     int stableRuns = 0;
153     while (stableRuns < 10) {
154       if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getRunnableSet().size() > 0) {
155         stableRuns = 0;
156         Threads.sleepWithoutInterrupt(100);
157       } else {
158         stableRuns++;
159         Threads.sleepWithoutInterrupt(25);
160       }
161     }
162   }
163 
164   public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
165       long procId) {
166     assertFalse("expected a running proc", procExecutor.isFinished(procId));
167     assertEquals(null, procExecutor.getResult(procId));
168   }
169 
170   public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
171       long procId) {
172     ProcedureInfo result = procExecutor.getResult(procId);
173     assertTrue("expected procedure result", result != null);
174     assertProcNotFailed(result);
175   }
176 
177   public static void assertProcNotFailed(final ProcedureInfo result) {
178     ForeignExceptionMessage exception = result.getForeignExceptionMessage();
179     String msg = exception != null ? result.getExceptionFullMessage() : "no exception found";
180     assertFalse(msg, result.isFailed());
181   }
182 
183   public static void assertIsAbortException(final ProcedureInfo result) {
184     assertEquals(true, result.isFailed());
185     LOG.info(result.getExceptionFullMessage());
186     Throwable cause = getExceptionCause(result);
187     assertTrue("expected abort exception, got " + cause,
188       cause instanceof ProcedureAbortedException);
189   }
190 
191   public static void assertIsTimeoutException(final ProcedureInfo result) {
192     assertEquals(true, result.isFailed());
193     LOG.info(result.getExceptionFullMessage());
194     Throwable cause = getExceptionCause(result);
195     assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
196   }
197 
198   public static void assertIsIllegalArgumentException(final ProcedureInfo result) {
199     assertEquals(true, result.isFailed());
200     LOG.info(result.getExceptionFullMessage());
201     Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
202     assertTrue("expected IllegalArgumentIOException, got " + cause,
203       cause instanceof IllegalArgumentIOException);
204   }
205 
206   public static Throwable getExceptionCause(final ProcedureInfo procInfo) {
207     assert procInfo.getForeignExceptionMessage() != null;
208     return RemoteProcedureException.fromProto(procInfo.getForeignExceptionMessage()).getCause();
209   }
210 
211   public static class TestProcedure extends Procedure<Void> {
212     public TestProcedure() {}
213 
214     public TestProcedure(long procId) {
215       this(procId, 0);
216     }
217 
218     public TestProcedure(long procId, long parentId) {
219       setProcId(procId);
220       if (parentId > 0) {
221         setParentProcId(parentId);
222       }
223     }
224 
225     public void addStackId(final int index) {
226       addStackIndex(index);
227     }
228 
229     @Override
230     protected Procedure[] execute(Void env) { return null; }
231 
232     @Override
233     protected void rollback(Void env) { }
234 
235     @Override
236     protected boolean abort(Void env) { return false; }
237 
238     @Override
239     protected void serializeStateData(final OutputStream stream) throws IOException { }
240 
241     @Override
242     protected void deserializeStateData(final InputStream stream) throws IOException { }
243   }
244 }