1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.commons.logging.impl.Log4JLogger;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HRegionInfo;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.KeyValue;
46 import org.apache.hadoop.hbase.NotServingRegionException;
47 import org.apache.hadoop.hbase.RegionLocations;
48 import org.apache.hadoop.hbase.TableNotFoundException;
49 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
50 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
51 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
52 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54 import org.apache.hadoop.hbase.protobuf.RequestConverter;
55 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
56 import org.apache.hadoop.hbase.regionserver.HRegionServer;
57 import org.apache.hadoop.hbase.regionserver.InternalScanner;
58 import org.apache.hadoop.hbase.regionserver.RegionScanner;
59 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
60 import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
61 import org.apache.hadoop.hbase.testclassification.MediumTests;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
64 import org.apache.log4j.Level;
65 import org.apache.zookeeper.KeeperException;
66 import org.junit.After;
67 import org.junit.AfterClass;
68 import org.junit.Assert;
69 import org.junit.Before;
70 import org.junit.BeforeClass;
71 import org.junit.Test;
72 import org.junit.experimental.categories.Category;
73
74
75
76
77
78 @Category(MediumTests.class)
79 public class TestReplicasClient {
80 private static final Log LOG = LogFactory.getLog(TestReplicasClient.class);
81
82 static {
83 ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
84 }
85
86 private static final int NB_SERVERS = 1;
87 private static HTable table = null;
88 private static final byte[] row = TestReplicasClient.class.getName().getBytes();
89
90 private static HRegionInfo hriPrimary;
91 private static HRegionInfo hriSecondary;
92
93 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
94 private static final byte[] f = HConstants.CATALOG_FAMILY;
95
96 private final static int REFRESH_PERIOD = 1000;
97
98
99
100
101 public static class SlowMeCopro extends BaseRegionObserver {
102 static final AtomicLong sleepTime = new AtomicLong(0);
103 static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
104 static final AtomicInteger countOfNext = new AtomicInteger(0);
105 private static final AtomicReference<CountDownLatch> cdl =
106 new AtomicReference<CountDownLatch>(new CountDownLatch(0));
107 Random r = new Random();
108 public SlowMeCopro() {
109 }
110
111 @Override
112 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
113 final Get get, final List<Cell> results) throws IOException {
114 slowdownCode(e);
115 }
116
117 @Override
118 public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
119 final Scan scan, final RegionScanner s) throws IOException {
120 slowdownCode(e);
121 return s;
122 }
123
124 @Override
125 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
126 final InternalScanner s, final List<Result> results,
127 final int limit, final boolean hasMore) throws IOException {
128
129
130 if (slowDownNext.get()) {
131
132 if (countOfNext.incrementAndGet() == 2) {
133 sleepTime.set(2000);
134 slowdownCode(e);
135 }
136 }
137 return true;
138 }
139
140 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
141 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
142 CountDownLatch latch = getCdl().get();
143 try {
144 if (sleepTime.get() > 0) {
145 LOG.info("Sleeping for " + sleepTime.get() + " ms");
146 Thread.sleep(sleepTime.get());
147 } else if (latch.getCount() > 0) {
148 LOG.info("Waiting for the counterCountDownLatch");
149 latch.await(2, TimeUnit.MINUTES);
150 if (latch.getCount() > 0) {
151 throw new RuntimeException("Can't wait more");
152 }
153 }
154 } catch (InterruptedException e1) {
155 LOG.error(e1);
156 }
157 } else {
158 LOG.info("We're not the primary replicas.");
159 }
160 }
161
162 public static AtomicReference<CountDownLatch> getCdl() {
163 return cdl;
164 }
165 }
166
167 @BeforeClass
168 public static void beforeClass() throws Exception {
169
170 HTU.getConfiguration().setInt(
171 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
172 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
173 ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
174 HTU.startMiniCluster(NB_SERVERS);
175
176
177 HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
178 hdt.addCoprocessor(SlowMeCopro.class.getName());
179 table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
180
181 hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
182
183
184 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
185 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
186
187
188 LOG.info("Master is going to be stopped");
189 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
190 Configuration c = new Configuration(HTU.getConfiguration());
191 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
192 LOG.info("Master has stopped");
193 }
194
195 @AfterClass
196 public static void afterClass() throws Exception {
197 if (table != null) table.close();
198 HTU.shutdownMiniCluster();
199 }
200
201 @Before
202 public void before() throws IOException {
203 HTU.getHBaseAdmin().getConnection().clearRegionCache();
204 try {
205 openRegion(hriPrimary);
206 } catch (Exception ignored) {
207 }
208 try {
209 openRegion(hriSecondary);
210 } catch (Exception ignored) {
211 }
212 }
213
214 @After
215 public void after() throws IOException, KeeperException {
216 try {
217 closeRegion(hriSecondary);
218 } catch (Exception ignored) {
219 }
220 try {
221 closeRegion(hriPrimary);
222 } catch (Exception ignored) {
223 }
224 ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
225 ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriSecondary);
226
227 HTU.getHBaseAdmin().getConnection().clearRegionCache();
228 }
229
230 private HRegionServer getRS() {
231 return HTU.getMiniHBaseCluster().getRegionServer(0);
232 }
233
234 private void openRegion(HRegionInfo hri) throws Exception {
235 try {
236 if (isRegionOpened(hri)) return;
237 } catch (Exception e){}
238 ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
239
240 AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
241 getRS().getServerName(), hri, 0, null, null);
242 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
243 Assert.assertEquals(responseOpen.getOpeningStateCount(), 1);
244 Assert.assertEquals(responseOpen.getOpeningState(0),
245 AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED);
246 checkRegionIsOpened(hri);
247 }
248
249 private void closeRegion(HRegionInfo hri) throws Exception {
250 ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
251
252 AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
253 getRS().getServerName(), hri.getEncodedName(), true);
254 AdminProtos.CloseRegionResponse responseClose = getRS()
255 .getRSRpcServices().closeRegion(null, crr);
256 Assert.assertTrue(responseClose.getClosed());
257
258 checkRegionIsClosed(hri.getEncodedName());
259
260 ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null);
261 }
262
263 private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
264
265 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
266 Thread.sleep(1);
267 }
268
269 Assert.assertTrue(
270 ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null));
271 }
272
273 private boolean isRegionOpened(HRegionInfo hri) throws Exception {
274 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
275 }
276
277 private void checkRegionIsClosed(String encodedRegionName) throws Exception {
278
279 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
280 Thread.sleep(1);
281 }
282
283 try {
284 Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
285 } catch (NotServingRegionException expected) {
286
287 }
288
289
290 }
291
292 private void flushRegion(HRegionInfo regionInfo) throws IOException {
293 TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
294 }
295
296 @Test
297 public void testUseRegionWithoutReplica() throws Exception {
298 byte[] b1 = "testUseRegionWithoutReplica".getBytes();
299 openRegion(hriSecondary);
300 SlowMeCopro.getCdl().set(new CountDownLatch(0));
301 try {
302 Get g = new Get(b1);
303 Result r = table.get(g);
304 Assert.assertFalse(r.isStale());
305 } finally {
306 closeRegion(hriSecondary);
307 }
308 }
309
310 @Test
311 public void testLocations() throws Exception {
312 byte[] b1 = "testLocations".getBytes();
313 openRegion(hriSecondary);
314 ClusterConnection hc = (ClusterConnection) HTU.getHBaseAdmin().getConnection();
315
316 try {
317 hc.clearRegionCache();
318 RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
319 Assert.assertEquals(2, rl.size());
320
321 rl = hc.locateRegion(table.getName(), b1, true, false);
322 Assert.assertEquals(2, rl.size());
323
324 hc.clearRegionCache();
325 rl = hc.locateRegion(table.getName(), b1, true, false);
326 Assert.assertEquals(2, rl.size());
327
328 rl = hc.locateRegion(table.getName(), b1, false, false);
329 Assert.assertEquals(2, rl.size());
330 } finally {
331 closeRegion(hriSecondary);
332 }
333 }
334
335 @Test
336 public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
337 byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
338 openRegion(hriSecondary);
339
340 try {
341
342 Get g = new Get(b1);
343 Result r = table.get(g);
344 Assert.assertFalse(r.isStale());
345 } finally {
346 closeRegion(hriSecondary);
347 }
348 }
349
350
351 @Test
352 public void testGetNoResultStaleRegionWithReplica() throws Exception {
353 byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
354 openRegion(hriSecondary);
355
356 SlowMeCopro.getCdl().set(new CountDownLatch(1));
357 try {
358 Get g = new Get(b1);
359 g.setConsistency(Consistency.TIMELINE);
360 Result r = table.get(g);
361 Assert.assertTrue(r.isStale());
362 } finally {
363 SlowMeCopro.getCdl().get().countDown();
364 closeRegion(hriSecondary);
365 }
366 }
367
368 @Test
369 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
370 byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
371 openRegion(hriSecondary);
372
373 try {
374
375 SlowMeCopro.sleepTime.set(2000);
376 Get g = new Get(b1);
377 Result r = table.get(g);
378 Assert.assertFalse(r.isStale());
379
380 } finally {
381 SlowMeCopro.sleepTime.set(0);
382 closeRegion(hriSecondary);
383 }
384 }
385
386 @Test
387 public void testFlushTable() throws Exception {
388 openRegion(hriSecondary);
389 try {
390 flushRegion(hriPrimary);
391 flushRegion(hriSecondary);
392
393 Put p = new Put(row);
394 p.add(f, row, row);
395 table.put(p);
396
397 flushRegion(hriPrimary);
398 flushRegion(hriSecondary);
399 } finally {
400 Delete d = new Delete(row);
401 table.delete(d);
402 closeRegion(hriSecondary);
403 }
404 }
405
406 @Test
407 public void testFlushPrimary() throws Exception {
408 openRegion(hriSecondary);
409
410 try {
411 flushRegion(hriPrimary);
412
413 Put p = new Put(row);
414 p.add(f, row, row);
415 table.put(p);
416
417 flushRegion(hriPrimary);
418 } finally {
419 Delete d = new Delete(row);
420 table.delete(d);
421 closeRegion(hriSecondary);
422 }
423 }
424
425 @Test
426 public void testFlushSecondary() throws Exception {
427 openRegion(hriSecondary);
428 try {
429 flushRegion(hriSecondary);
430
431 Put p = new Put(row);
432 p.add(f, row, row);
433 table.put(p);
434
435 flushRegion(hriSecondary);
436 } catch (TableNotFoundException expected) {
437 } finally {
438 Delete d = new Delete(row);
439 table.delete(d);
440 closeRegion(hriSecondary);
441 }
442 }
443
444 @Test
445 public void testUseRegionWithReplica() throws Exception {
446 byte[] b1 = "testUseRegionWithReplica".getBytes();
447 openRegion(hriSecondary);
448
449 try {
450
451 Put p = new Put(b1);
452 p.add(f, b1, b1);
453 table.put(p);
454 LOG.info("Put done");
455
456
457 Get g = new Get(b1);
458 Result r = table.get(g);
459 Assert.assertFalse(r.isStale());
460 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
461 LOG.info("get works and is not stale done");
462
463
464 SlowMeCopro.sleepTime.set(2000);
465 g = new Get(b1);
466 r = table.get(g);
467 Assert.assertFalse(r.isStale());
468 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
469 SlowMeCopro.sleepTime.set(0);
470 LOG.info("sleep and is not stale done");
471
472
473 SlowMeCopro.getCdl().set(new CountDownLatch(1));
474 g = new Get(b1);
475 g.setConsistency(Consistency.TIMELINE);
476 r = table.get(g);
477 Assert.assertTrue(r.isStale());
478 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
479 SlowMeCopro.getCdl().get().countDown();
480
481 LOG.info("stale done");
482
483
484 g = new Get(b1);
485 g.setCheckExistenceOnly(true);
486 r = table.get(g);
487 Assert.assertFalse(r.isStale());
488 Assert.assertTrue(r.getExists());
489 LOG.info("exists not stale done");
490
491
492 SlowMeCopro.getCdl().set(new CountDownLatch(1));
493 g = new Get(b1);
494 g.setCheckExistenceOnly(true);
495 g.setConsistency(Consistency.TIMELINE);
496 r = table.get(g);
497 Assert.assertTrue(r.isStale());
498 Assert.assertFalse("The secondary has stale data", r.getExists());
499 SlowMeCopro.getCdl().get().countDown();
500 LOG.info("exists stale before flush done");
501
502 flushRegion(hriPrimary);
503 flushRegion(hriSecondary);
504 LOG.info("flush done");
505 Thread.sleep(1000 + REFRESH_PERIOD * 2);
506
507
508 SlowMeCopro.getCdl().set(new CountDownLatch(1));
509 g = new Get(b1);
510 g.setConsistency(Consistency.TIMELINE);
511 r = table.get(g);
512 Assert.assertTrue(r.isStale());
513 Assert.assertFalse(r.isEmpty());
514 SlowMeCopro.getCdl().get().countDown();
515 LOG.info("stale done");
516
517
518 SlowMeCopro.getCdl().set(new CountDownLatch(1));
519 g = new Get(b1);
520 g.setCheckExistenceOnly(true);
521 g.setConsistency(Consistency.TIMELINE);
522 r = table.get(g);
523 Assert.assertTrue(r.isStale());
524 Assert.assertTrue(r.getExists());
525 SlowMeCopro.getCdl().get().countDown();
526 LOG.info("exists stale after flush done");
527
528 } finally {
529 SlowMeCopro.getCdl().get().countDown();
530 SlowMeCopro.sleepTime.set(0);
531 Delete d = new Delete(b1);
532 table.delete(d);
533 closeRegion(hriSecondary);
534 }
535 }
536
537 @Test
538 public void testCancelOfMultiGet() throws Exception {
539 openRegion(hriSecondary);
540 try {
541 List<Put> puts = new ArrayList<Put>(2);
542 byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
543 Put p = new Put(b1);
544 p.add(f, b1, b1);
545 puts.add(p);
546
547 byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
548 p = new Put(b2);
549 p.add(f, b2, b2);
550 puts.add(p);
551 table.put(puts);
552 LOG.debug("PUT done");
553 flushRegion(hriPrimary);
554 LOG.info("flush done");
555
556 Thread.sleep(1000 + REFRESH_PERIOD * 2);
557
558 AsyncProcess ap = ((ClusterConnection) HTU.getHBaseAdmin().getConnection())
559 .getAsyncProcess();
560
561
562 SlowMeCopro.getCdl().set(new CountDownLatch(1));
563
564 List<Get> gets = new ArrayList<Get>();
565 Get g = new Get(b1);
566 g.setCheckExistenceOnly(true);
567 g.setConsistency(Consistency.TIMELINE);
568 gets.add(g);
569 g = new Get(b2);
570 g.setCheckExistenceOnly(true);
571 g.setConsistency(Consistency.TIMELINE);
572 gets.add(g);
573 Object[] results = new Object[2];
574 AsyncRequestFuture reqs = ap.submitAll(table.getPool(), table.getName(),
575 gets, null, results);
576 reqs.waitUntilDone();
577
578 for (Object r : results) {
579 Assert.assertTrue(((Result)r).isStale());
580 Assert.assertTrue(((Result)r).getExists());
581 }
582 Set<MultiServerCallable<Row>> set = ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
583
584 Assert.assertTrue(!set.isEmpty());
585 for (MultiServerCallable<Row> m : set) {
586 Assert.assertTrue(m.isCancelled());
587 }
588 } finally {
589 SlowMeCopro.getCdl().get().countDown();
590 SlowMeCopro.sleepTime.set(0);
591 SlowMeCopro.slowDownNext.set(false);
592 SlowMeCopro.countOfNext.set(0);
593 for (int i = 0; i < 2; i++) {
594 byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
595 Delete d = new Delete(b1);
596 table.delete(d);
597 }
598 closeRegion(hriSecondary);
599 }
600 }
601
602 @Test
603 public void testScanWithReplicas() throws Exception {
604
605 runMultipleScansOfOneType(false, false);
606 }
607
608 @Test
609 public void testSmallScanWithReplicas() throws Exception {
610
611 runMultipleScansOfOneType(false, true);
612 }
613
614 @Test
615 public void testReverseScanWithReplicas() throws Exception {
616
617 runMultipleScansOfOneType(true, false);
618 }
619
620 @Test
621 public void testCancelOfScan() throws Exception {
622 openRegion(hriSecondary);
623 int NUMROWS = 100;
624 try {
625 for (int i = 0; i < NUMROWS; i++) {
626 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
627 Put p = new Put(b1);
628 p.add(f, b1, b1);
629 table.put(p);
630 }
631 LOG.debug("PUT done");
632 int caching = 20;
633 byte[] start;
634 start = Bytes.toBytes("testUseRegionWithReplica" + 0);
635
636 flushRegion(hriPrimary);
637 LOG.info("flush done");
638 Thread.sleep(1000 + REFRESH_PERIOD * 2);
639
640
641 SlowMeCopro.slowDownNext.set(true);
642 SlowMeCopro.countOfNext.set(0);
643 SlowMeCopro.sleepTime.set(5000);
644
645 Scan scan = new Scan(start);
646 scan.setCaching(caching);
647 scan.setConsistency(Consistency.TIMELINE);
648 ResultScanner scanner = table.getScanner(scan);
649 Iterator<Result> iter = scanner.iterator();
650 iter.next();
651 Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
652 SlowMeCopro.slowDownNext.set(false);
653 SlowMeCopro.countOfNext.set(0);
654 } finally {
655 SlowMeCopro.cdl.get().countDown();
656 SlowMeCopro.sleepTime.set(0);
657 SlowMeCopro.slowDownNext.set(false);
658 SlowMeCopro.countOfNext.set(0);
659 for (int i = 0; i < NUMROWS; i++) {
660 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
661 Delete d = new Delete(b1);
662 table.delete(d);
663 }
664 closeRegion(hriSecondary);
665 }
666 }
667
668 private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
669 openRegion(hriSecondary);
670 int NUMROWS = 100;
671 int NUMCOLS = 10;
672 try {
673 for (int i = 0; i < NUMROWS; i++) {
674 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
675 for (int col = 0; col < NUMCOLS; col++) {
676 Put p = new Put(b1);
677 String qualifier = "qualifer" + col;
678 KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
679 p.add(kv);
680 table.put(p);
681 }
682 }
683 LOG.debug("PUT done");
684 int caching = 20;
685 long maxResultSize = Long.MAX_VALUE;
686
687 byte[] start;
688 if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1));
689 else start = Bytes.toBytes("testUseRegionWithReplica" + 0);
690
691 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
692 start, NUMROWS, NUMCOLS, false, false);
693
694
695
696 SlowMeCopro.sleepTime.set(5000);
697 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS,
698 NUMCOLS, false, false);
699 SlowMeCopro.sleepTime.set(0);
700
701 flushRegion(hriPrimary);
702 LOG.info("flush done");
703 Thread.sleep(1000 + REFRESH_PERIOD * 2);
704
705
706 SlowMeCopro.sleepTime.set(5000);
707 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
708 start, NUMROWS, NUMCOLS, true, false);
709 SlowMeCopro.sleepTime.set(0);
710
711
712 SlowMeCopro.slowDownNext.set(true);
713 SlowMeCopro.countOfNext.set(0);
714 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
715 NUMROWS, NUMCOLS, true, true);
716 SlowMeCopro.slowDownNext.set(false);
717 SlowMeCopro.countOfNext.set(0);
718
719
720 SlowMeCopro.sleepTime.set(5000);
721 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize,
722 start, NUMROWS, NUMCOLS, false, false);
723 SlowMeCopro.sleepTime.set(0);
724
725
726
727 maxResultSize = 1;
728 SlowMeCopro.slowDownNext.set(true);
729 SlowMeCopro.countOfNext.set(0);
730 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
731 NUMROWS, NUMCOLS, true, true);
732 maxResultSize = Long.MAX_VALUE;
733 SlowMeCopro.slowDownNext.set(false);
734 SlowMeCopro.countOfNext.set(0);
735 } finally {
736 SlowMeCopro.getCdl().get().countDown();
737 SlowMeCopro.sleepTime.set(0);
738 SlowMeCopro.slowDownNext.set(false);
739 SlowMeCopro.countOfNext.set(0);
740 for (int i = 0; i < NUMROWS; i++) {
741 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
742 Delete d = new Delete(b1);
743 table.delete(d);
744 }
745 closeRegion(hriSecondary);
746 }
747 }
748
749 private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
750 int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
751 boolean staleExpected, boolean slowNext)
752 throws Exception {
753 Scan scan = new Scan(startRow);
754 scan.setCaching(caching);
755 scan.setMaxResultSize(maxResultSize);
756 scan.setReversed(reversed);
757 scan.setSmall(small);
758 scan.setConsistency(consistency);
759 ResultScanner scanner = table.getScanner(scan);
760 Iterator<Result> iter = scanner.iterator();
761
762
763 HashMap<String, Boolean> map = new HashMap<String, Boolean>();
764
765
766 int rowCount = 0;
767 int cellCount = 0;
768 int countOfStale = 0;
769
770 while (iter.hasNext()) {
771 rowCount++;
772 Result r = iter.next();
773 String row = new String(r.getRow());
774
775 if (map.containsKey(row)) {
776 throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
777 }
778
779 map.put(row, true);
780
781 for (Cell cell : r.rawCells()) {
782 cellCount++;
783 }
784
785 if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected);
786 if (r.isStale()) countOfStale++;
787 }
788 Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
789 rowCount == numRows);
790 Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
791 cellCount == (numRows * numCols));
792
793 if (slowNext) {
794 LOG.debug("Count of Stale " + countOfStale);
795 Assert.assertTrue(countOfStale > 1);
796
797
798
799 if (maxResultSize != Long.MAX_VALUE) {
800 Assert.assertTrue(countOfStale <= numRows);
801 } else {
802 Assert.assertTrue(countOfStale < numRows);
803 }
804 }
805 }
806 }