View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Random;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CountDownLatch;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.ClusterStatus;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
45  import org.apache.hadoop.hbase.regionserver.HRegionServer;
46  import org.apache.hadoop.hbase.regionserver.Region;
47  import org.apache.hadoop.hbase.testclassification.MediumTests;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
50  import org.junit.After;
51  import org.junit.AfterClass;
52  import org.junit.Before;
53  import org.junit.BeforeClass;
54  import org.junit.Test;
55  import org.junit.Ignore;
56  import org.junit.experimental.categories.Category;
57  
58  @Category({MediumTests.class})
59  public class TestFastFail {
60    private static final Log LOG = LogFactory.getLog(TestFastFail.class);
61    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
62    private static byte[] FAMILY = Bytes.toBytes("testFamily");
63    private static final Random random = new Random();
64    private static int SLAVES = 3;
65    private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
66    private static final int SLEEPTIME = 5000;
67  
68    /**
69     * @throws java.lang.Exception
70     */
71    @BeforeClass
72    public static void setUpBeforeClass() throws Exception {
73      TEST_UTIL.startMiniCluster(SLAVES);
74    }
75  
76    /**
77     * @throws java.lang.Exception
78     */
79    @AfterClass
80    public static void tearDownAfterClass() throws Exception {
81      TEST_UTIL.shutdownMiniCluster();
82    }
83  
84    /**
85     * @throws java.lang.Exception
86     */
87    @Before
88    public void setUp() throws Exception {
89      MyPreemptiveFastFailInterceptor.numBraveSouls.set(0);
90    }
91  
92    /**
93     * @throws java.lang.Exception
94     */
95    @After
96    public void tearDown() throws Exception {
97      // Nothing to do.
98    }
99  
100   @Ignore ("Can go zombie -- see HBASE-14421; FIX") @Test
101   public void testFastFail() throws IOException, InterruptedException {
102     Admin admin = TEST_UTIL.getHBaseAdmin();
103 
104     final String tableName = "testClientRelearningExperiment";
105     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes
106         .toBytes(tableName)));
107     desc.addFamily(new HColumnDescriptor(FAMILY));
108     admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32);
109     final long numRows = 1000;
110 
111     Configuration conf = TEST_UTIL.getConfiguration();
112     conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100);
113     conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10);
114     conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
115     conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
116     conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
117         MyPreemptiveFastFailInterceptor.class,
118         PreemptiveFastFailInterceptor.class);
119 
120     final Connection connection = ConnectionFactory.createConnection(conf);
121 
122     /**
123      * Write numRows worth of data, so that the workers can arbitrarily read.
124      */
125     List<Put> puts = new ArrayList<>();
126     for (long i = 0; i < numRows; i++) {
127       byte[] rowKey = longToByteArrayKey(i);
128       Put put = new Put(rowKey);
129       byte[] value = rowKey; // value is the same as the row key
130       put.add(FAMILY, QUALIFIER, value);
131       puts.add(put);
132     }
133     try (Table table = connection.getTable(TableName.valueOf(tableName))) {
134       table.put(puts);
135       LOG.info("Written all puts.");
136     }
137 
138     /**
139      * The number of threads that are going to perform actions against the test
140      * table.
141      */
142     int nThreads = 100;
143     ExecutorService service = Executors.newFixedThreadPool(nThreads);
144     final CountDownLatch continueOtherHalf = new CountDownLatch(1);
145     final CountDownLatch doneHalfway = new CountDownLatch(nThreads);
146 
147     final AtomicInteger numSuccessfullThreads = new AtomicInteger(0);
148     final AtomicInteger numFailedThreads = new AtomicInteger(0);
149 
150     // The total time taken for the threads to perform the second put;
151     final AtomicLong totalTimeTaken = new AtomicLong(0);
152     final AtomicInteger numBlockedWorkers = new AtomicInteger(0);
153     final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0);
154 
155     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
156     for (int i = 0; i < nThreads; i++) {
157       futures.add(service.submit(new Callable<Boolean>() {
158         /**
159          * The workers are going to perform a couple of reads. The second read
160          * will follow the killing of a regionserver so that we make sure that
161          * some of threads go into PreemptiveFastFailExcception
162          */
163         public Boolean call() throws Exception {
164           try (Table table = connection.getTable(TableName.valueOf(tableName))) {
165             Thread.sleep(Math.abs(random.nextInt()) % 250); // Add some jitter here
166             byte[] row = longToByteArrayKey(Math.abs(random.nextLong())
167                 % numRows);
168             Get g = new Get(row);
169             g.addColumn(FAMILY, QUALIFIER);
170             try {
171               table.get(g);
172             } catch (Exception e) {
173               LOG.debug("Get failed : ", e);
174               doneHalfway.countDown();
175               return false;
176             }
177 
178             // Done with one get, proceeding to do the next one.
179             doneHalfway.countDown();
180             continueOtherHalf.await();
181 
182             long startTime = System.currentTimeMillis();
183             g = new Get(row);
184             g.addColumn(FAMILY, QUALIFIER);
185             try {
186               table.get(g);
187               // The get was successful
188               numSuccessfullThreads.addAndGet(1);
189             } catch (Exception e) {
190               if (e instanceof PreemptiveFastFailException) {
191                 // We were issued a PreemptiveFastFailException
192                 numPreemptiveFastFailExceptions.addAndGet(1);
193               }
194               // Irrespective of PFFE, the request failed.
195               numFailedThreads.addAndGet(1);
196               return false;
197             } finally {
198               long enTime = System.currentTimeMillis();
199               totalTimeTaken.addAndGet(enTime - startTime);
200               if ((enTime - startTime) >= SLEEPTIME) {
201                 // Considering the slow workers as the blockedWorkers.
202                 // This assumes that the threads go full throttle at performing
203                 // actions. In case the thread scheduling itself is as slow as
204                 // SLEEPTIME, then this test might fail and so, we might have
205                 // set it to a higher number on slower machines.
206                 numBlockedWorkers.addAndGet(1);
207               }
208             }
209             return true;
210           } catch (Exception e) {
211             LOG.error("Caught unknown exception", e);
212             doneHalfway.countDown();
213             return false;
214           }
215         }
216       }));
217     }
218 
219     doneHalfway.await();
220 
221     ClusterStatus status = TEST_UTIL.getHBaseCluster().getClusterStatus();
222 
223     // Kill a regionserver
224     for (int i = 0; i < SLAVES; i++) {
225       HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i);
226       List<Region> regions = server.getOnlineRegions(TableName.META_TABLE_NAME);
227       if (regions.size() > 0) continue; // We don't want to kill META table because that adds extra
228                                         // latencies which can't be tested very easily.
229       server.getRpcServer().stop();
230       server.stop("Testing");
231     }
232 
233     // Let the threads continue going
234     continueOtherHalf.countDown();
235 
236     Thread.sleep(2 * SLEEPTIME);
237     // Restore the cluster
238     TEST_UTIL.getHBaseCluster().restoreClusterStatus(status);
239 
240     int numThreadsReturnedFalse = 0;
241     int numThreadsReturnedTrue = 0;
242     int numThreadsThrewExceptions = 0;
243     for (Future<Boolean> f : futures) {
244       try {
245         numThreadsReturnedTrue += f.get() ? 1 : 0;
246         numThreadsReturnedFalse += f.get() ? 0 : 1;
247       } catch (Exception e) {
248         numThreadsThrewExceptions++;
249       }
250     }
251     LOG.debug("numThreadsReturnedFalse:"
252         + numThreadsReturnedFalse
253         + " numThreadsReturnedTrue:"
254         + numThreadsReturnedTrue
255         + " numThreadsThrewExceptions:"
256         + numThreadsThrewExceptions
257         + " numFailedThreads:"
258         + numFailedThreads.get()
259         + " numSuccessfullThreads:"
260         + numSuccessfullThreads.get()
261         + " numBlockedWorkers:"
262         + numBlockedWorkers.get()
263         + " totalTimeWaited: "
264         + totalTimeTaken.get()
265         / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers
266             .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get());
267 
268     assertEquals("The expected number of all the successfull and the failed "
269         + "threads should equal the total number of threads that we spawned",
270         nThreads, numFailedThreads.get() + numSuccessfullThreads.get());
271     assertEquals(
272         "All the failures should be coming from the secondput failure",
273         numFailedThreads.get(), numThreadsReturnedFalse);
274     assertEquals("Number of threads that threw execution exceptions "
275         + "otherwise should be 0", numThreadsThrewExceptions, 0);
276     assertEquals("The regionservers that returned true should equal to the"
277         + " number of successful threads", numThreadsReturnedTrue,
278         numSuccessfullThreads.get());
279     /* 'should' is not worthy of an assert. Disabling because randomly this seems to randomly
280      * not but true. St.Ack 20151012
281      *
282     assertTrue(
283         "There should be atleast one thread that retried instead of failing",
284         MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0);
285     assertTrue(
286         "There should be atleast one PreemptiveFastFail exception,"
287             + " otherwise, the test makes little sense."
288             + "numPreemptiveFastFailExceptions: "
289             + numPreemptiveFastFailExceptions.get(),
290         numPreemptiveFastFailExceptions.get() > 0);
291     */
292     assertTrue(
293         "Only few thread should ideally be waiting for the dead "
294             + "regionserver to be coming back. numBlockedWorkers:"
295             + numBlockedWorkers.get() + " threads that retried : "
296             + MyPreemptiveFastFailInterceptor.numBraveSouls.get(),
297         numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls
298             .get());
299   }
300 
301   public static class MyPreemptiveFastFailInterceptor extends
302       PreemptiveFastFailInterceptor {
303     public static AtomicInteger numBraveSouls = new AtomicInteger();
304 
305     @Override
306     protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
307       boolean ret = super.shouldRetryInspiteOfFastFail(fInfo);
308       if (ret)
309         numBraveSouls.addAndGet(1);
310       return ret;
311     }
312 
313     public MyPreemptiveFastFailInterceptor(Configuration conf) {
314       super(conf);
315     }
316   }
317 
318   private byte[] longToByteArrayKey(long rowKey) {
319     return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
320   }
321 }