1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Random;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.ClusterStatus;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HTableDescriptor;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
45 import org.apache.hadoop.hbase.regionserver.HRegionServer;
46 import org.apache.hadoop.hbase.regionserver.Region;
47 import org.apache.hadoop.hbase.testclassification.MediumTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
50 import org.junit.After;
51 import org.junit.AfterClass;
52 import org.junit.Before;
53 import org.junit.BeforeClass;
54 import org.junit.Test;
55 import org.junit.Ignore;
56 import org.junit.experimental.categories.Category;
57
58 @Category({MediumTests.class})
59 public class TestFastFail {
60 private static final Log LOG = LogFactory.getLog(TestFastFail.class);
61 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
62 private static byte[] FAMILY = Bytes.toBytes("testFamily");
63 private static final Random random = new Random();
64 private static int SLAVES = 3;
65 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
66 private static final int SLEEPTIME = 5000;
67
68
69
70
71 @BeforeClass
72 public static void setUpBeforeClass() throws Exception {
73 TEST_UTIL.startMiniCluster(SLAVES);
74 }
75
76
77
78
79 @AfterClass
80 public static void tearDownAfterClass() throws Exception {
81 TEST_UTIL.shutdownMiniCluster();
82 }
83
84
85
86
87 @Before
88 public void setUp() throws Exception {
89 MyPreemptiveFastFailInterceptor.numBraveSouls.set(0);
90 }
91
92
93
94
95 @After
96 public void tearDown() throws Exception {
97
98 }
99
100 @Ignore ("Can go zombie -- see HBASE-14421; FIX") @Test
101 public void testFastFail() throws IOException, InterruptedException {
102 Admin admin = TEST_UTIL.getHBaseAdmin();
103
104 final String tableName = "testClientRelearningExperiment";
105 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes
106 .toBytes(tableName)));
107 desc.addFamily(new HColumnDescriptor(FAMILY));
108 admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32);
109 final long numRows = 1000;
110
111 Configuration conf = TEST_UTIL.getConfiguration();
112 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100);
113 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10);
114 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
115 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
116 conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
117 MyPreemptiveFastFailInterceptor.class,
118 PreemptiveFastFailInterceptor.class);
119
120 final Connection connection = ConnectionFactory.createConnection(conf);
121
122
123
124
125 List<Put> puts = new ArrayList<>();
126 for (long i = 0; i < numRows; i++) {
127 byte[] rowKey = longToByteArrayKey(i);
128 Put put = new Put(rowKey);
129 byte[] value = rowKey;
130 put.add(FAMILY, QUALIFIER, value);
131 puts.add(put);
132 }
133 try (Table table = connection.getTable(TableName.valueOf(tableName))) {
134 table.put(puts);
135 LOG.info("Written all puts.");
136 }
137
138
139
140
141
142 int nThreads = 100;
143 ExecutorService service = Executors.newFixedThreadPool(nThreads);
144 final CountDownLatch continueOtherHalf = new CountDownLatch(1);
145 final CountDownLatch doneHalfway = new CountDownLatch(nThreads);
146
147 final AtomicInteger numSuccessfullThreads = new AtomicInteger(0);
148 final AtomicInteger numFailedThreads = new AtomicInteger(0);
149
150
151 final AtomicLong totalTimeTaken = new AtomicLong(0);
152 final AtomicInteger numBlockedWorkers = new AtomicInteger(0);
153 final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0);
154
155 List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
156 for (int i = 0; i < nThreads; i++) {
157 futures.add(service.submit(new Callable<Boolean>() {
158
159
160
161
162
163 public Boolean call() throws Exception {
164 try (Table table = connection.getTable(TableName.valueOf(tableName))) {
165 Thread.sleep(Math.abs(random.nextInt()) % 250);
166 byte[] row = longToByteArrayKey(Math.abs(random.nextLong())
167 % numRows);
168 Get g = new Get(row);
169 g.addColumn(FAMILY, QUALIFIER);
170 try {
171 table.get(g);
172 } catch (Exception e) {
173 LOG.debug("Get failed : ", e);
174 doneHalfway.countDown();
175 return false;
176 }
177
178
179 doneHalfway.countDown();
180 continueOtherHalf.await();
181
182 long startTime = System.currentTimeMillis();
183 g = new Get(row);
184 g.addColumn(FAMILY, QUALIFIER);
185 try {
186 table.get(g);
187
188 numSuccessfullThreads.addAndGet(1);
189 } catch (Exception e) {
190 if (e instanceof PreemptiveFastFailException) {
191
192 numPreemptiveFastFailExceptions.addAndGet(1);
193 }
194
195 numFailedThreads.addAndGet(1);
196 return false;
197 } finally {
198 long enTime = System.currentTimeMillis();
199 totalTimeTaken.addAndGet(enTime - startTime);
200 if ((enTime - startTime) >= SLEEPTIME) {
201
202
203
204
205
206 numBlockedWorkers.addAndGet(1);
207 }
208 }
209 return true;
210 } catch (Exception e) {
211 LOG.error("Caught unknown exception", e);
212 doneHalfway.countDown();
213 return false;
214 }
215 }
216 }));
217 }
218
219 doneHalfway.await();
220
221 ClusterStatus status = TEST_UTIL.getHBaseCluster().getClusterStatus();
222
223
224 for (int i = 0; i < SLAVES; i++) {
225 HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i);
226 List<Region> regions = server.getOnlineRegions(TableName.META_TABLE_NAME);
227 if (regions.size() > 0) continue;
228
229 server.getRpcServer().stop();
230 server.stop("Testing");
231 }
232
233
234 continueOtherHalf.countDown();
235
236 Thread.sleep(2 * SLEEPTIME);
237
238 TEST_UTIL.getHBaseCluster().restoreClusterStatus(status);
239
240 int numThreadsReturnedFalse = 0;
241 int numThreadsReturnedTrue = 0;
242 int numThreadsThrewExceptions = 0;
243 for (Future<Boolean> f : futures) {
244 try {
245 numThreadsReturnedTrue += f.get() ? 1 : 0;
246 numThreadsReturnedFalse += f.get() ? 0 : 1;
247 } catch (Exception e) {
248 numThreadsThrewExceptions++;
249 }
250 }
251 LOG.debug("numThreadsReturnedFalse:"
252 + numThreadsReturnedFalse
253 + " numThreadsReturnedTrue:"
254 + numThreadsReturnedTrue
255 + " numThreadsThrewExceptions:"
256 + numThreadsThrewExceptions
257 + " numFailedThreads:"
258 + numFailedThreads.get()
259 + " numSuccessfullThreads:"
260 + numSuccessfullThreads.get()
261 + " numBlockedWorkers:"
262 + numBlockedWorkers.get()
263 + " totalTimeWaited: "
264 + totalTimeTaken.get()
265 / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers
266 .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get());
267
268 assertEquals("The expected number of all the successfull and the failed "
269 + "threads should equal the total number of threads that we spawned",
270 nThreads, numFailedThreads.get() + numSuccessfullThreads.get());
271 assertEquals(
272 "All the failures should be coming from the secondput failure",
273 numFailedThreads.get(), numThreadsReturnedFalse);
274 assertEquals("Number of threads that threw execution exceptions "
275 + "otherwise should be 0", numThreadsThrewExceptions, 0);
276 assertEquals("The regionservers that returned true should equal to the"
277 + " number of successful threads", numThreadsReturnedTrue,
278 numSuccessfullThreads.get());
279
280
281
282
283
284
285
286
287
288
289
290
291
292 assertTrue(
293 "Only few thread should ideally be waiting for the dead "
294 + "regionserver to be coming back. numBlockedWorkers:"
295 + numBlockedWorkers.get() + " threads that retried : "
296 + MyPreemptiveFastFailInterceptor.numBraveSouls.get(),
297 numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls
298 .get());
299 }
300
301 public static class MyPreemptiveFastFailInterceptor extends
302 PreemptiveFastFailInterceptor {
303 public static AtomicInteger numBraveSouls = new AtomicInteger();
304
305 @Override
306 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
307 boolean ret = super.shouldRetryInspiteOfFastFail(fInfo);
308 if (ret)
309 numBraveSouls.addAndGet(1);
310 return ret;
311 }
312
313 public MyPreemptiveFastFailInterceptor(Configuration conf) {
314 super(conf);
315 }
316 }
317
318 private byte[] longToByteArrayKey(long rowKey) {
319 return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
320 }
321 }