View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.CallQueueTooBigException;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.RegionLocations;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HRegionLocation;
33  import org.apache.hadoop.hbase.testclassification.MediumTests;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
36  import org.apache.hadoop.hbase.client.coprocessor.Batch;
37  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
38  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.Threads;
41  import org.junit.Assert;
42  import org.junit.BeforeClass;
43  import org.junit.Rule;
44  import org.junit.Test;
45  import org.junit.experimental.categories.Category;
46  import org.junit.rules.Timeout;
47  import org.mockito.Mockito;
48  
49  import java.io.IOException;
50  import java.io.InterruptedIOException;
51  import java.util.ArrayList;
52  import java.util.Arrays;
53  import java.util.HashMap;
54  import java.util.List;
55  import java.util.Map;
56  import java.util.Set;
57  import java.util.TreeSet;
58  import java.util.concurrent.BlockingQueue;
59  import java.util.concurrent.ExecutorService;
60  import java.util.concurrent.Future;
61  import java.util.concurrent.LinkedBlockingQueue;
62  import java.util.concurrent.RejectedExecutionException;
63  import java.util.concurrent.SynchronousQueue;
64  import java.util.concurrent.ThreadFactory;
65  import java.util.concurrent.ThreadPoolExecutor;
66  import java.util.concurrent.TimeUnit;
67  import java.util.concurrent.atomic.AtomicBoolean;
68  import java.util.concurrent.atomic.AtomicInteger;
69  import java.util.concurrent.atomic.AtomicLong;
70  
71  @Category(MediumTests.class)
72  public class TestAsyncProcess {
73    private final static Log LOG = LogFactory.getLog(TestAsyncProcess.class);
74    private static final TableName DUMMY_TABLE =
75        TableName.valueOf("DUMMY_TABLE");
76    private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
77    private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
78    private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
79    private static final byte[] FAILS = "FAILS".getBytes();
80    private static final Configuration conf = new Configuration();
81  
82    private static ServerName sn = ServerName.valueOf("s1:1,1");
83    private static ServerName sn2 = ServerName.valueOf("s2:2,2");
84    private static ServerName sn3 = ServerName.valueOf("s3:3,3");
85    private static HRegionInfo hri1 =
86        new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
87    private static HRegionInfo hri2 =
88        new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
89    private static HRegionInfo hri3 =
90        new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
91    private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
92    private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
93    private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
94  
95    // Replica stuff
96    private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
97        hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
98    private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
99    private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
100       new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
101   private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
102       new HRegionLocation(hri2r1, sn3));
103   private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
104 
105   private static final String success = "success";
106   private static Exception failure = new Exception("failure");
107 
108   private static int NB_RETRIES = 3;
109 
110   @BeforeClass
111   public static void beforeClass(){
112     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
113   }
114 
115   static class CountingThreadFactory implements ThreadFactory {
116     final AtomicInteger nbThreads;
117     ThreadFactory realFactory =  Threads.newDaemonThreadFactory("test-TestAsyncProcess");
118     @Override
119     public Thread newThread(Runnable r) {
120       nbThreads.incrementAndGet();
121       return realFactory.newThread(r);
122     }
123 
124     CountingThreadFactory(AtomicInteger nbThreads){
125       this.nbThreads = nbThreads;
126     }
127   }
128 
129   static class MyAsyncProcess extends AsyncProcess {
130     final AtomicInteger nbMultiResponse = new AtomicInteger();
131     final AtomicInteger nbActions = new AtomicInteger();
132     public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
133     public AtomicInteger callsCt = new AtomicInteger();
134 
135     @Override
136     protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
137         List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
138         Batch.Callback<Res> callback, Object[] results, boolean needResults) {
139       // Test HTable has tableName of null, so pass DUMMY_TABLE
140       AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
141           DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
142       allReqs.add(r);
143       callsCt.incrementAndGet();
144       return r;
145     }
146 
147     public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
148       this(hc, conf, new AtomicInteger());
149     }
150 
151     public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
152       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
153           new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
154             new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
155     }
156 
157     public MyAsyncProcess(
158         ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
159       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
160         new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
161           new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
162     }
163 
164     public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
165         @SuppressWarnings("unused") boolean dummy) {
166       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
167               new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
168         @Override
169         public void execute(Runnable command) {
170           throw new RejectedExecutionException("test under failure");
171         }
172       },
173           new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
174     }
175 
176     @Override
177     public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
178         boolean atLeastOne, Callback<Res> callback, boolean needResults)
179             throws InterruptedIOException {
180       // We use results in tests to check things, so override to always save them.
181       return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
182     }
183 
184     @Override
185     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
186       callsCt.incrementAndGet();
187       final MultiResponse mr = createMultiResponse(
188           callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
189             @Override
190             public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
191               if (Arrays.equals(FAILS, a.getAction().getRow())) {
192                 mr.add(regionName, a.getOriginalIndex(), failure);
193               } else {
194                 mr.add(regionName, a.getOriginalIndex(), success);
195               }
196             }
197           });
198 
199       return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
200         @Override
201         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
202                                                 int callTimeout)
203         throws IOException, RuntimeException {
204           try {
205             // sleep one second in order for threadpool to start another thread instead of reusing
206             // existing one.
207             Thread.sleep(1000);
208           } catch (InterruptedException e) {
209             // ignore error
210           }
211           return mr;
212         }
213       };
214     }
215   }
216 
217   static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
218 
219     private final IOException e;
220 
221     public CallerWithFailure(IOException e) {
222       super(100, 100, 9);
223       this.e = e;
224     }
225 
226     @Override
227     public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
228         throws IOException, RuntimeException {
229       throw e;
230     }
231   }
232 
233 
234   static class AsyncProcessWithFailure extends MyAsyncProcess {
235 
236     private final IOException ioe;
237 
238     public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
239       super(hc, conf, true);
240       this.ioe = ioe;
241       serverTrackerTimeout = 1;
242     }
243 
244     @Override
245     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
246       callsCt.incrementAndGet();
247       return new CallerWithFailure(ioe);
248     }
249   }
250 
251   class MyAsyncProcessWithReplicas extends MyAsyncProcess {
252     private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator());
253     private long primarySleepMs = 0, replicaSleepMs = 0;
254     private Map<ServerName, Long> customPrimarySleepMs = new HashMap<ServerName, Long>();
255     private final AtomicLong replicaCalls = new AtomicLong(0);
256 
257     public void addFailures(HRegionInfo... hris) {
258       for (HRegionInfo hri : hris) {
259         failures.add(hri.getRegionName());
260       }
261     }
262 
263     public long getReplicaCallCount() {
264       return replicaCalls.get();
265     }
266 
267     public void setPrimaryCallDelay(ServerName server, long primaryMs) {
268       customPrimarySleepMs.put(server, primaryMs);
269     }
270 
271     public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
272       super(hc, conf);
273     }
274 
275     public void setCallDelays(long primaryMs, long replicaMs) {
276       this.primarySleepMs = primaryMs;
277       this.replicaSleepMs = replicaMs;
278     }
279 
280     @Override
281     protected RpcRetryingCaller<MultiResponse> createCaller(
282         MultiServerCallable<Row> callable) {
283       final MultiResponse mr = createMultiResponse(
284           callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
285             @Override
286             public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
287               if (failures.contains(regionName)) {
288                 mr.add(regionName, a.getOriginalIndex(), failure);
289               } else {
290                 boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
291                 mr.add(regionName, a.getOriginalIndex(),
292                     Result.create(new Cell[0], null, isStale));
293               }
294             }
295           });
296       // Currently AsyncProcess either sends all-replica, or all-primary request.
297       final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
298           callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
299       final ServerName server = ((MultiServerCallable<?>)callable).getServerName();
300       String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
301           + callable.getMulti().actions.size() + " entries: ";
302       for (byte[] region : callable.getMulti().actions.keySet()) {
303         debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
304       }
305       LOG.debug(debugMsg);
306       if (!isDefault) {
307         replicaCalls.incrementAndGet();
308       }
309 
310       return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
311         @Override
312         public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
313         throws IOException, RuntimeException {
314           long sleep = -1;
315           if (isDefault) {
316             Long customSleep = customPrimarySleepMs.get(server);
317             sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
318           } else {
319             sleep = replicaSleepMs;
320           }
321           if (sleep != 0) {
322             try {
323               Thread.sleep(sleep);
324             } catch (InterruptedException e) {
325             }
326           }
327           return mr;
328         }
329       };
330     }
331   }
332 
333   static MultiResponse createMultiResponse(final MultiAction<Row> multi,
334       AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
335     final MultiResponse mr = new MultiResponse();
336     nbMultiResponse.incrementAndGet();
337     for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
338       byte[] regionName = entry.getKey();
339       for (Action<Row> a : entry.getValue()) {
340         nbActions.incrementAndGet();
341         gen.addResponse(mr, regionName, a);
342       }
343     }
344     return mr;
345   }
346 
347   private static interface ResponseGenerator {
348     void addResponse(final MultiResponse mr, byte[] regionName, Action<Row> a);
349   }
350 
351   /**
352    * Returns our async process.
353    */
354   static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
355     final AtomicInteger nbThreads = new AtomicInteger(0);
356 
357 
358     protected MyConnectionImpl(Configuration conf) {
359       super(conf);
360     }
361 
362     @Override
363     public RegionLocations locateRegion(TableName tableName,
364         byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
365       return new RegionLocations(loc1);
366     }
367 
368     @Override
369     public boolean hasCellBlockSupport() {
370       return false;
371     }
372   }
373 
374   /**
375    * Returns our async process.
376    */
377   static class MyConnectionImpl2 extends MyConnectionImpl {
378     List<HRegionLocation> hrl;
379     final boolean usedRegions[];
380 
381     protected MyConnectionImpl2(List<HRegionLocation> hrl) {
382       super(conf);
383       this.hrl = hrl;
384       this.usedRegions = new boolean[hrl.size()];
385     }
386 
387     @Override
388     public RegionLocations locateRegion(TableName tableName,
389         byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
390       int i = 0;
391       for (HRegionLocation hr : hrl){
392         if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
393           usedRegions[i] = true;
394           return new RegionLocations(hr);
395         }
396         i++;
397       }
398       return null;
399     }
400 
401   }
402 
403   @Rule
404   public Timeout timeout = Timeout.millis(10000); // 10 seconds max per method tested
405 
406   @Test
407   public void testSubmit() throws Exception {
408     ClusterConnection hc = createHConnection();
409     AsyncProcess ap = new MyAsyncProcess(hc, conf);
410 
411     List<Put> puts = new ArrayList<Put>();
412     puts.add(createPut(1, true));
413 
414     ap.submit(DUMMY_TABLE, puts, false, null, false);
415     Assert.assertTrue(puts.isEmpty());
416   }
417 
418   @Test
419   public void testSubmitWithCB() throws Exception {
420     ClusterConnection hc = createHConnection();
421     final AtomicInteger updateCalled = new AtomicInteger(0);
422     Batch.Callback<Object> cb = new Batch.Callback<Object>() {
423       @Override
424       public void update(byte[] region, byte[] row, Object result) {
425         updateCalled.incrementAndGet();
426       }
427     };
428     AsyncProcess ap = new MyAsyncProcess(hc, conf);
429 
430     List<Put> puts = new ArrayList<Put>();
431     puts.add(createPut(1, true));
432 
433     final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
434     Assert.assertTrue(puts.isEmpty());
435     ars.waitUntilDone();
436     Assert.assertEquals(updateCalled.get(), 1);
437   }
438 
439   @Test
440   public void testSubmitBusyRegion() throws Exception {
441     ClusterConnection hc = createHConnection();
442     AsyncProcess ap = new MyAsyncProcess(hc, conf);
443 
444     List<Put> puts = new ArrayList<Put>();
445     puts.add(createPut(1, true));
446 
447     ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
448     ap.submit(DUMMY_TABLE, puts, false, null, false);
449     Assert.assertEquals(puts.size(), 1);
450 
451     ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
452     ap.submit(DUMMY_TABLE, puts, false, null, false);
453     Assert.assertEquals(0, puts.size());
454   }
455 
456 
457   @Test
458   public void testSubmitBusyRegionServer() throws Exception {
459     ClusterConnection hc = createHConnection();
460     AsyncProcess ap = new MyAsyncProcess(hc, conf);
461 
462     ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
463 
464     List<Put> puts = new ArrayList<Put>();
465     puts.add(createPut(1, true));
466     puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
467     puts.add(createPut(1, true)); // <== this one will make it, the region is already in
468     puts.add(createPut(2, true)); // <== new region, but the rs is ok
469 
470     ap.submit(DUMMY_TABLE, puts, false, null, false);
471     Assert.assertEquals(" puts=" + puts, 1, puts.size());
472 
473     ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
474     ap.submit(DUMMY_TABLE, puts, false, null, false);
475     Assert.assertTrue(puts.isEmpty());
476   }
477 
478   @Test
479   public void testFail() throws Exception {
480     MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
481 
482     List<Put> puts = new ArrayList<Put>();
483     Put p = createPut(1, false);
484     puts.add(p);
485 
486     AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
487     Assert.assertEquals(0, puts.size());
488     ars.waitUntilDone();
489     verifyResult(ars, false);
490     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
491 
492     Assert.assertEquals(1, ars.getErrors().exceptions.size());
493     Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
494         failure.equals(ars.getErrors().exceptions.get(0)));
495     Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
496         failure.equals(ars.getErrors().exceptions.get(0)));
497 
498     Assert.assertEquals(1, ars.getFailedOperations().size());
499     Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
500         p.equals(ars.getFailedOperations().get(0)));
501   }
502 
503 
504   @Test
505   public void testSubmitTrue() throws IOException {
506     final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
507     ap.tasksInProgress.incrementAndGet();
508     final AtomicInteger ai = new AtomicInteger(1);
509     ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
510 
511     final AtomicBoolean checkPoint = new AtomicBoolean(false);
512     final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
513 
514     Thread t = new Thread(){
515       @Override
516       public void run(){
517         Threads.sleep(1000);
518         Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
519         ai.decrementAndGet();
520         ap.tasksInProgress.decrementAndGet();
521         checkPoint2.set(true);
522       }
523     };
524 
525     List<Put> puts = new ArrayList<Put>();
526     Put p = createPut(1, true);
527     puts.add(p);
528 
529     ap.submit(DUMMY_TABLE, puts, false, null, false);
530     Assert.assertFalse(puts.isEmpty());
531 
532     t.start();
533 
534     ap.submit(DUMMY_TABLE, puts, true, null, false);
535     Assert.assertTrue(puts.isEmpty());
536 
537     checkPoint.set(true);
538     while (!checkPoint2.get()){
539       Threads.sleep(1);
540     }
541   }
542 
543   @Test
544   public void testFailAndSuccess() throws Exception {
545     MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
546 
547     List<Put> puts = new ArrayList<Put>();
548     puts.add(createPut(1, false));
549     puts.add(createPut(1, true));
550     puts.add(createPut(1, true));
551 
552     AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
553     Assert.assertTrue(puts.isEmpty());
554     ars.waitUntilDone();
555     verifyResult(ars, false, true, true);
556     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
557     ap.callsCt.set(0);
558     Assert.assertEquals(1, ars.getErrors().actions.size());
559 
560     puts.add(createPut(1, true));
561     // Wait for AP to be free. While ars might have the result, ap counters are decreased later.
562     ap.waitUntilDone();
563     ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
564     Assert.assertEquals(0, puts.size());
565     ars.waitUntilDone();
566     Assert.assertEquals(2, ap.callsCt.get());
567     verifyResult(ars, true);
568   }
569 
570   @Test
571   public void testFlush() throws Exception {
572     MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
573 
574     List<Put> puts = new ArrayList<Put>();
575     puts.add(createPut(1, false));
576     puts.add(createPut(1, true));
577     puts.add(createPut(1, true));
578 
579     AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
580     ars.waitUntilDone();
581     verifyResult(ars, false, true, true);
582     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
583 
584     Assert.assertEquals(1, ars.getFailedOperations().size());
585   }
586 
587   @Test
588   public void testMaxTask() throws Exception {
589     final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
590 
591     for (int i = 0; i < 1000; i++) {
592       ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
593     }
594 
595     final Thread myThread = Thread.currentThread();
596 
597     Thread t = new Thread() {
598       @Override
599       public void run() {
600         Threads.sleep(2000);
601         myThread.interrupt();
602       }
603     };
604 
605     List<Put> puts = new ArrayList<Put>();
606     puts.add(createPut(1, true));
607 
608     t.start();
609 
610     try {
611       ap.submit(DUMMY_TABLE, puts, false, null, false);
612       Assert.fail("We should have been interrupted.");
613     } catch (InterruptedIOException expected) {
614     }
615 
616     final long sleepTime = 2000;
617 
618     Thread t2 = new Thread() {
619       @Override
620       public void run() {
621         Threads.sleep(sleepTime);
622         while (ap.tasksInProgress.get() > 0) {
623           ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
624         }
625       }
626     };
627     t2.start();
628 
629     long start = System.currentTimeMillis();
630     ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false);
631     long end = System.currentTimeMillis();
632 
633     //Adds 100 to secure us against approximate timing.
634     Assert.assertTrue(start + 100L + sleepTime > end);
635   }
636 
637   private static ClusterConnection createHConnection() throws IOException {
638     ClusterConnection hc = createHConnectionCommon();
639     setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
640     setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
641     setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
642     setMockLocation(hc, FAILS, new RegionLocations(loc2));
643     return hc;
644   }
645 
646   private static ClusterConnection createHConnectionWithReplicas() throws IOException {
647     ClusterConnection hc = createHConnectionCommon();
648     setMockLocation(hc, DUMMY_BYTES_1, hrls1);
649     setMockLocation(hc, DUMMY_BYTES_2, hrls2);
650     setMockLocation(hc, DUMMY_BYTES_3, hrls3);
651     return hc;
652   }
653 
654   private static void setMockLocation(ClusterConnection hc, byte[] row,
655       RegionLocations result) throws IOException {
656     Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
657         Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
658   }
659 
660   private static ClusterConnection createHConnectionCommon() {
661     ClusterConnection hc = Mockito.mock(ClusterConnection.class);
662     NonceGenerator ng = Mockito.mock(NonceGenerator.class);
663     Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
664     Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
665     Mockito.when(hc.getConfiguration()).thenReturn(conf);
666     return hc;
667   }
668 
669   @Test
670   public void testHTablePutSuccess() throws Exception {
671     BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
672     ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
673 
674     Put put = createPut(1, true);
675 
676     Assert.assertEquals(0, ht.getWriteBufferSize());
677     ht.mutate(put);
678     Assert.assertEquals(0, ht.getWriteBufferSize());
679   }
680 
681   private void doHTableFailedPut(boolean bufferOn) throws Exception {
682     ClusterConnection conn = createHConnection();
683     HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
684     MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
685     ht.mutator.ap = ap;
686     if (bufferOn) {
687       ht.setWriteBufferSize(1024L * 1024L);
688     } else {
689       ht.setWriteBufferSize(0L);
690     }
691 
692     Put put = createPut(1, false);
693 
694     Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
695     try {
696       ht.put(put);
697       if (bufferOn) {
698         ht.flushCommits();
699       }
700       Assert.fail();
701     } catch (RetriesExhaustedException expected) {
702     }
703     Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
704     // The table should have sent one request, maybe after multiple attempts
705     AsyncRequestFuture ars = null;
706     for (AsyncRequestFuture someReqs : ap.allReqs) {
707       if (someReqs.getResults().length == 0) continue;
708       Assert.assertTrue(ars == null);
709       ars = someReqs;
710     }
711     Assert.assertTrue(ars != null);
712     verifyResult(ars, false);
713 
714     // This should not raise any exception, puts have been 'received' before by the catch.
715     ht.close();
716   }
717 
718   @Test
719   public void testHTableFailedPutWithBuffer() throws Exception {
720     doHTableFailedPut(true);
721   }
722 
723   @Test
724   public void testHTableFailedPutWithoutBuffer() throws Exception {
725     doHTableFailedPut(false);
726   }
727 
728   @Test
729   public void testHTableFailedPutAndNewPut() throws Exception {
730     ClusterConnection conn = createHConnection();
731     BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
732         new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
733     MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
734     mutator.ap = ap;
735 
736     Put p = createPut(1, false);
737     mutator.mutate(p);
738 
739     ap.waitUntilDone(); // Let's do all the retries.
740 
741     // We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the
742     //  doPut if it fails.
743     // This said, it's not a very easy going behavior. For example, when we insert a list of
744     //  puts, we may raise an exception in the middle of the list. It's then up to the caller to
745     //  manage what was inserted, what was tried but failed, and what was not even tried.
746     p = createPut(1, true);
747     Assert.assertEquals(0, mutator.getWriteBuffer().size());
748     try {
749       mutator.mutate(p);
750       Assert.fail();
751     } catch (RetriesExhaustedException expected) {
752     }
753     Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size());
754   }
755 
756   @Test
757   public void testBatch() throws IOException, InterruptedException {
758     ClusterConnection conn = new MyConnectionImpl(conf);
759     HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
760     ht.multiAp = new MyAsyncProcess(conn, conf, false);
761 
762     List<Put> puts = new ArrayList<Put>();
763     puts.add(createPut(1, true));
764     puts.add(createPut(1, true));
765     puts.add(createPut(1, true));
766     puts.add(createPut(1, true));
767     puts.add(createPut(1, false)); // <=== the bad apple, position 4
768     puts.add(createPut(1, true));
769     puts.add(createPut(1, false)); // <=== another bad apple, position 6
770 
771     Object[] res = new Object[puts.size()];
772     try {
773       ht.processBatch(puts, res);
774       Assert.fail();
775     } catch (RetriesExhaustedException expected) {
776     }
777 
778     Assert.assertEquals(res[0], success);
779     Assert.assertEquals(res[1], success);
780     Assert.assertEquals(res[2], success);
781     Assert.assertEquals(res[3], success);
782     Assert.assertEquals(res[4], failure);
783     Assert.assertEquals(res[5], success);
784     Assert.assertEquals(res[6], failure);
785   }
786 
787   @Test
788   public void testErrorsServers() throws IOException {
789     Configuration configuration = new Configuration(conf);
790     ClusterConnection conn = new MyConnectionImpl(configuration);
791     BufferedMutatorImpl mutator =
792         new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
793     configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
794 
795     MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
796     mutator.ap = ap;
797 
798     Assert.assertNotNull(mutator.ap.createServerErrorTracker());
799     Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
800     mutator.ap.serverTrackerTimeout = 1;
801 
802     Put p = createPut(1, false);
803     mutator.mutate(p);
804 
805     try {
806       mutator.flush();
807       Assert.fail();
808     } catch (RetriesExhaustedWithDetailsException expected) {
809     }
810     // Checking that the ErrorsServers came into play and didn't make us stop immediately
811     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
812   }
813 
814   @Test
815   public void testGlobalErrors() throws IOException {
816     ClusterConnection conn = new MyConnectionImpl(conf);
817     BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
818     AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test"));
819     mutator.ap = ap;
820 
821     Assert.assertNotNull(mutator.ap.createServerErrorTracker());
822 
823     Put p = createPut(1, true);
824     mutator.mutate(p);
825 
826     try {
827       mutator.flush();
828       Assert.fail();
829     } catch (RetriesExhaustedWithDetailsException expected) {
830     }
831     // Checking that the ErrorsServers came into play and didn't make us stop immediately
832     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
833   }
834 
835 
836   @Test
837   public void testCallQueueTooLarge() throws IOException {
838     ClusterConnection conn = new MyConnectionImpl(conf);
839     BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
840     AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException());
841     mutator.ap = ap;
842 
843     Assert.assertNotNull(mutator.ap.createServerErrorTracker());
844 
845     Put p = createPut(1, true);
846     mutator.mutate(p);
847 
848     try {
849       mutator.flush();
850       Assert.fail();
851     } catch (RetriesExhaustedWithDetailsException expected) {
852     }
853     // Checking that the ErrorsServers came into play and didn't make us stop immediately
854     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
855   }
856   /**
857    * This test simulates multiple regions on 2 servers. We should have 2 multi requests and
858    *  2 threads: 1 per server, this whatever the number of regions.
859    */
860   @Test
861   public void testThreadCreation() throws Exception {
862     final int NB_REGS = 100;
863     List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
864     List<Get> gets = new ArrayList<Get>(NB_REGS);
865     for (int i = 0; i < NB_REGS; i++) {
866       HRegionInfo hri = new HRegionInfo(
867           DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
868       HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
869       hrls.add(hrl);
870 
871       Get get = new Get(Bytes.toBytes(i * 10L));
872       gets.add(get);
873     }
874 
875     MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
876     HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
877     MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
878     ht.multiAp = ap;
879 
880     ht.batch(gets, new Object[gets.size()]);
881 
882     Assert.assertEquals(ap.nbActions.get(), NB_REGS);
883     Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
884     Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
885 
886     int nbReg = 0;
887     for (int i =0; i<NB_REGS; i++){
888       if (con.usedRegions[i]) nbReg++;
889     }
890     Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
891   }
892 
893   @Test
894   public void testReplicaReplicaSuccess() throws Exception {
895     // Main call takes too long so replicas succeed, except for one region w/o replicas.
896     // One region has no replica, so the main call succeeds for it.
897     MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
898     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
899     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
900     verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
901     Assert.assertEquals(2, ap.getReplicaCallCount());
902   }
903 
904   @Test
905   public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
906     // Main call succeeds before replica calls are kicked off.
907     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
908     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
909     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
910     verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
911     Assert.assertEquals(0, ap.getReplicaCallCount());
912   }
913 
914   @Test
915   public void testReplicaParallelCallsSucceed() throws Exception {
916     // Either main or replica can succeed.
917     MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
918     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
919     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
920     verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
921     long replicaCalls = ap.getReplicaCallCount();
922     Assert.assertTrue(replicaCalls >= 0);
923     Assert.assertTrue(replicaCalls <= 2);
924   }
925 
926   @Test
927   public void testReplicaPartialReplicaCall() throws Exception {
928     // One server is slow, so the result for its region comes from replica, whereas
929     // the result for other region comes from primary before replica calls happen.
930     // There should be no replica call for that region at all.
931     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
932     ap.setPrimaryCallDelay(sn2, 2000);
933     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
934     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
935     verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
936     Assert.assertEquals(1, ap.getReplicaCallCount());
937   }
938 
939   @Test
940   public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
941     // Main calls fail before replica calls can start - this is currently not handled.
942     // It would probably never happen if we can get location (due to retries),
943     // and it would require additional synchronization.
944     MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
945     ap.addFailures(hri1, hri2);
946     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
947     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
948     verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
949     Assert.assertEquals(0, ap.getReplicaCallCount());
950   }
951 
952   @Test
953   public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
954     // Main calls fails after replica calls start. For two-replica region, one replica call
955     // also fails. Regardless, we get replica results for both regions.
956     MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
957     ap.addFailures(hri1, hri1r2, hri2);
958     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
959     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
960     verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
961     Assert.assertEquals(2, ap.getReplicaCallCount());
962   }
963 
964   @Test
965   public void testReplicaAllCallsFailForOneRegion() throws Exception {
966     // For one of the region, all 3, main and replica, calls fail. For the other, replica
967     // call fails but its exception should not be visible as it did succeed.
968     MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
969     ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
970     List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
971     AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
972     verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
973     // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
974     Assert.assertEquals(3, ars.getErrors().getNumExceptions());
975     for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
976       Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
977     }
978   }
979 
980   private MyAsyncProcessWithReplicas createReplicaAp(
981       int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
982     return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
983   }
984 
985   private MyAsyncProcessWithReplicas createReplicaAp(
986       int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
987     // TODO: this is kind of timing dependent... perhaps it should detect from createCaller
988     //       that the replica call has happened and that way control the ordering.
989     Configuration conf = new Configuration();
990     ClusterConnection conn = createHConnectionWithReplicas();
991     conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
992     if (retries > 0) {
993       conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
994     }
995     MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
996     ap.setCallDelays(primaryMs, replicaMs);
997     return ap;
998   }
999 
1000   private static List<Get> makeTimelineGets(byte[]... rows) {
1001     List<Get> result = new ArrayList<Get>();
1002     for (byte[] row : rows) {
1003       Get get = new Get(row);
1004       get.setConsistency(Consistency.TIMELINE);
1005       result.add(get);
1006     }
1007     return result;
1008   }
1009 
1010   private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
1011     Object[] actual = ars.getResults();
1012     Assert.assertEquals(expected.length, actual.length);
1013     for (int i = 0; i < expected.length; ++i) {
1014       Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
1015     }
1016   }
1017 
1018   /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
1019   private enum RR {
1020     TRUE,
1021     FALSE,
1022     DONT_CARE,
1023     FAILED
1024   }
1025 
1026   private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
1027     Object[] actuals = ars.getResults();
1028     Assert.assertEquals(expecteds.length, actuals.length);
1029     for (int i = 0; i < expecteds.length; ++i) {
1030       Object actual = actuals[i];
1031       RR expected = expecteds[i];
1032       Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
1033       if (expected != RR.FAILED && expected != RR.DONT_CARE) {
1034         Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
1035       }
1036     }
1037   }
1038 
1039   /**
1040    * @param regCnt  the region: 1 to 3.
1041    * @param success if true, the put will succeed.
1042    * @return a put
1043    */
1044   private Put createPut(int regCnt, boolean success) {
1045     Put p;
1046     if (!success) {
1047       p = new Put(FAILS);
1048     } else switch (regCnt){
1049       case 1 :
1050         p = new Put(DUMMY_BYTES_1);
1051         break;
1052       case 2:
1053         p = new Put(DUMMY_BYTES_2);
1054         break;
1055       case 3:
1056         p = new Put(DUMMY_BYTES_3);
1057         break;
1058       default:
1059         throw new IllegalArgumentException("unknown " + regCnt);
1060     }
1061 
1062     p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1063 
1064     return p;
1065   }
1066 
1067   static class MyThreadPoolExecutor extends ThreadPoolExecutor {
1068     public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime,
1069         TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) {
1070       super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue);
1071     }
1072 
1073     @Override
1074     public Future submit(Runnable runnable) {
1075       throw new OutOfMemoryError("OutOfMemory error thrown by means");
1076     }
1077   }
1078 
1079   static class AsyncProcessForThrowableCheck extends AsyncProcess {
1080     public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
1081         ExecutorService pool) {
1082       super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
1083           conf));
1084     }
1085   }
1086 
1087   @Test
1088   public void testUncheckedException() throws Exception {
1089     // Test the case pool.submit throws unchecked exception
1090     ClusterConnection hc = createHConnection();
1091     MyThreadPoolExecutor myPool =
1092         new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
1093             new LinkedBlockingQueue<Runnable>(200));
1094     AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, conf, myPool);
1095 
1096     List<Put> puts = new ArrayList<Put>();
1097     puts.add(createPut(1, true));
1098 
1099     ap.submit(DUMMY_TABLE, puts, false, null, false);
1100     Assert.assertTrue(puts.isEmpty());
1101   }
1102 
1103 }