1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.fail;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.testclassification.LargeTests;
28 import org.apache.hadoop.hbase.MiniHBaseCluster;
29 import org.apache.hadoop.hbase.client.Get;
30 import org.apache.hadoop.hbase.client.Put;
31 import org.apache.hadoop.hbase.client.Result;
32 import org.apache.hadoop.hbase.client.ResultScanner;
33 import org.apache.hadoop.hbase.client.Scan;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.hbase.util.JVMClusterUtil;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.experimental.categories.Category;
39
40 import java.io.IOException;
41
42
43
44
45 @Category(LargeTests.class)
46 public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
47
48 private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class);
49
50
51
52
53 @Before
54 public void setUp() throws Exception {
55
56
57 for (JVMClusterUtil.RegionServerThread r :
58 utility1.getHBaseCluster().getRegionServerThreads()) {
59 utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
60 }
61 utility1.deleteTableData(tableName);
62
63
64
65
66
67 Scan scan = new Scan();
68 int lastCount = 0;
69 for (int i = 0; i < NB_RETRIES; i++) {
70 if (i == NB_RETRIES - 1) {
71 fail("Waited too much time for truncate");
72 }
73 ResultScanner scanner = htable2.getScanner(scan);
74 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
75 scanner.close();
76 if (res.length != 0) {
77 if (res.length < lastCount) {
78 i--;
79 }
80 lastCount = res.length;
81 LOG.info("Still got " + res.length + " rows");
82 Thread.sleep(SLEEP_TIME);
83 } else {
84 break;
85 }
86 }
87 }
88
89 @Test(timeout = 300000)
90 public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
91
92 LOG.info("testSimplePutDelete");
93 MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
94 int numRS = peerCluster.getRegionServerThreads().size();
95
96 doPutTest(Bytes.toBytes(1));
97
98 int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
99 peerCluster.stopRegionServer(rsToStop);
100 peerCluster.waitOnRegionServer(rsToStop);
101
102
103 assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size());
104
105 doPutTest(Bytes.toBytes(2));
106
107 peerCluster.startRegionServer();
108
109
110 assertEquals(numRS, peerCluster.getRegionServerThreads().size());
111
112 doPutTest(Bytes.toBytes(3));
113
114 }
115
116 private void doPutTest(byte[] row) throws IOException, InterruptedException {
117 Put put = new Put(row);
118 put.add(famName, row, row);
119
120 if (htable1 == null) {
121 htable1 = utility1.getConnection().getTable(tableName);
122 }
123
124 htable1.put(put);
125
126 Get get = new Get(row);
127 for (int i = 0; i < NB_RETRIES; i++) {
128 if (i == NB_RETRIES - 1) {
129 fail("Waited too much time for put replication");
130 }
131 Result res = htable2.get(get);
132 if (res.size() == 0) {
133 LOG.info("Row not available");
134 Thread.sleep(SLEEP_TIME);
135 } else {
136 assertArrayEquals(res.value(), row);
137 break;
138 }
139 }
140
141 }
142
143 }