View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.util.HashSet;
22  import java.util.List;
23  import java.util.Set;
24  import java.util.concurrent.ExecutionException;
25  import java.util.concurrent.Future;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  
31  public abstract class MultithreadedTestUtil {
32  
33    private static final Log LOG =
34      LogFactory.getLog(MultithreadedTestUtil.class);
35  
36    public static class TestContext {
37      private final Configuration conf;
38      private Throwable err = null;
39      private boolean stopped = false;
40      private int threadDoneCount = 0;
41      private Set<TestThread> testThreads = new HashSet<TestThread>();
42  
43      public TestContext(Configuration configuration) {
44        this.conf = configuration;
45      }
46  
47      protected Configuration getConf() {
48        return conf;
49      }
50  
51      public synchronized boolean shouldRun()  {
52        return !stopped && err == null;
53      }
54  
55      public void addThread(TestThread t) {
56        testThreads.add(t);
57      }
58  
59      public void startThreads() {
60        for (TestThread t : testThreads) {
61          t.start();
62        }
63      }
64  
65      public void waitFor(long millis) throws Exception {
66        long endTime = System.currentTimeMillis() + millis;
67        while (!stopped) {
68          long left = endTime - System.currentTimeMillis();
69          if (left <= 0) break;
70          synchronized (this) {
71            checkException();
72            wait(left);
73          }
74        }
75      }
76      private synchronized void checkException() throws Exception {
77        if (err != null) {
78          throw new RuntimeException("Deferred", err);
79        }
80      }
81  
82      public synchronized void threadFailed(Throwable t) {
83        if (err == null) err = t;
84        LOG.error("Failed!", err);
85        notify();
86      }
87  
88      public synchronized void threadDone() {
89        threadDoneCount++;
90      }
91  
92      public void setStopFlag(boolean s) throws Exception {
93        synchronized (this) {
94          stopped = s;
95        }
96      }
97  
98      public void stop() throws Exception {
99        synchronized (this) {
100         stopped = true;
101       }
102       for (TestThread t : testThreads) {
103         t.join();
104       }
105       checkException();
106     }
107   }
108 
109   /**
110    * A thread that can be added to a test context, and properly
111    * passes exceptions through.
112    */
113   public static abstract class TestThread extends Thread {
114     protected final TestContext ctx;
115     protected boolean stopped;
116 
117     public TestThread(TestContext ctx) {
118       this.ctx = ctx;
119     }
120 
121     public void run() {
122       try {
123         doWork();
124       } catch (Throwable t) {
125         ctx.threadFailed(t);
126       }
127       ctx.threadDone();
128     }
129 
130     public abstract void doWork() throws Exception;
131 
132     protected void stopTestThread() {
133       this.stopped = true;
134     }
135   }
136 
137   /**
138    * A test thread that performs a repeating operation.
139    */
140   public static abstract class RepeatingTestThread extends TestThread {
141     public RepeatingTestThread(TestContext ctx) {
142       super(ctx);
143     }
144 
145     public final void doWork() throws Exception {
146       while (ctx.shouldRun() && !stopped) {
147         doAnAction();
148       }
149     }
150 
151     public abstract void doAnAction() throws Exception;
152   }
153 
154   /**
155    * Verify that no assertions have failed inside a future.
156    * Used for unit tests that spawn threads. E.g.,
157    * <p>
158    * <code>
159    *   List<Future<Void>> results = Lists.newArrayList();
160    *   Future<Void> f = executor.submit(new Callable<Void> {
161    *     public Void call() {
162    *       assertTrue(someMethod());
163    *     }
164    *   });
165    *   results.add(f);
166    *   assertOnFutures(results);
167    * </code>
168    * @param threadResults A list of futures
169    * @param <T>
170    * @throws InterruptedException If interrupted when waiting for a result
171    *                              from one of the futures
172    * @throws ExecutionException If an exception other than AssertionError
173    *                            occurs inside any of the futures
174    */
175   public static <T> void assertOnFutures(List<Future<T>> threadResults)
176   throws InterruptedException, ExecutionException {
177     for (Future<T> threadResult : threadResults) {
178       try {
179         threadResult.get();
180       } catch (ExecutionException e) {
181         if (e.getCause() instanceof AssertionError) {
182           throw (AssertionError) e.getCause();
183         }
184         throw e;
185       }
186     }
187   }
188 }