1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.client;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.testclassification.MediumTests;
34 import org.apache.hadoop.hbase.Waiter;
35 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
36 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
37 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
38 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
39 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
40 import org.apache.hadoop.hbase.protobuf.RequestConverter;
41 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
42 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.Pair;
45 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
46 import org.junit.AfterClass;
47 import org.junit.Assert;
48 import org.junit.BeforeClass;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51
52 import java.io.IOException;
53 import java.util.ArrayList;
54 import java.util.Arrays;
55 import java.util.List;
56 import java.util.concurrent.CountDownLatch;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.atomic.AtomicLong;
59 import java.util.concurrent.atomic.AtomicReference;
60
61 @Category(MediumTests.class)
62 public class TestReplicaWithCluster {
63 private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
64
65 private static final int NB_SERVERS = 2;
66 private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
67 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
68
69
70 private static HBaseTestingUtility HTU2;
71 private static final byte[] f = HConstants.CATALOG_FAMILY;
72
73 private final static int REFRESH_PERIOD = 1000;
74
75
76
77
78 public static class SlowMeCopro extends BaseRegionObserver {
79 static final AtomicLong sleepTime = new AtomicLong(0);
80 static final AtomicReference<CountDownLatch> cdl =
81 new AtomicReference<CountDownLatch>(new CountDownLatch(0));
82
83 public SlowMeCopro() {
84 }
85
86 @Override
87 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
88 final Get get, final List<Cell> results) throws IOException {
89
90 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
91 CountDownLatch latch = cdl.get();
92 try {
93 if (sleepTime.get() > 0) {
94 LOG.info("Sleeping for " + sleepTime.get() + " ms");
95 Thread.sleep(sleepTime.get());
96 } else if (latch.getCount() > 0) {
97 LOG.info("Waiting for the counterCountDownLatch");
98 latch.await(2, TimeUnit.MINUTES);
99 if (latch.getCount() > 0) {
100 throw new RuntimeException("Can't wait more");
101 }
102 }
103 } catch (InterruptedException e1) {
104 LOG.error(e1);
105 }
106 } else {
107 LOG.info("We're not the primary replicas.");
108 }
109 }
110 }
111
112 @BeforeClass
113 public static void beforeClass() throws Exception {
114
115 HTU.getConfiguration().setInt(
116 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
117
118 HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
119 HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
120 HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
121 HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
122 HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
123 HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
124 HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
125
126 HTU.startMiniCluster(NB_SERVERS);
127 HTU.getHBaseCluster().startMaster();
128 }
129
130 @AfterClass
131 public static void afterClass() throws Exception {
132 HTU2.shutdownMiniCluster();
133 HTU.shutdownMiniCluster();
134 }
135
136 @Test (timeout=30000)
137 public void testCreateDeleteTable() throws IOException {
138
139 HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
140 hdt.setRegionReplication(NB_SERVERS);
141 hdt.addCoprocessor(SlowMeCopro.class.getName());
142 Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
143
144 Put p = new Put(row);
145 p.add(f, row, row);
146 table.put(p);
147
148 Get g = new Get(row);
149 Result r = table.get(g);
150 Assert.assertFalse(r.isStale());
151
152 try {
153
154 SlowMeCopro.cdl.set(new CountDownLatch(1));
155 g = new Get(row);
156 g.setConsistency(Consistency.TIMELINE);
157 r = table.get(g);
158 Assert.assertTrue(r.isStale());
159 SlowMeCopro.cdl.get().countDown();
160 } finally {
161 SlowMeCopro.cdl.get().countDown();
162 SlowMeCopro.sleepTime.set(0);
163 }
164
165 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
166 HTU.deleteTable(hdt.getTableName());
167 }
168
169 @Test (timeout=120000)
170 public void testChangeTable() throws Exception {
171 HTableDescriptor hdt = HTU.createTableDescriptor("testChangeTable");
172 hdt.setRegionReplication(NB_SERVERS);
173 hdt.addCoprocessor(SlowMeCopro.class.getName());
174 Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
175
176
177 Put p = new Put(row);
178 p.add(f, row, row);
179 table.put(p);
180
181 Get g = new Get(row);
182 Result r = table.get(g);
183 Assert.assertFalse(r.isStale());
184
185
186 HTableDescriptor bHdt = HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
187 HColumnDescriptor hcd = new HColumnDescriptor(row);
188 hdt.addFamily(hcd);
189 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
190 HTU.getHBaseAdmin().modifyTable(hdt.getTableName(), hdt);
191 HTU.getHBaseAdmin().enableTable(hdt.getTableName());
192 HTableDescriptor nHdt = HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
193 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
194 bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
195
196 p = new Put(row);
197 p.add(row, row, row);
198 table.put(p);
199
200 g = new Get(row);
201 r = table.get(g);
202 Assert.assertFalse(r.isStale());
203
204 try {
205 SlowMeCopro.cdl.set(new CountDownLatch(1));
206 g = new Get(row);
207 g.setConsistency(Consistency.TIMELINE);
208 r = table.get(g);
209 Assert.assertTrue(r.isStale());
210 } finally {
211 SlowMeCopro.cdl.get().countDown();
212 SlowMeCopro.sleepTime.set(0);
213 }
214
215 HTU.getHBaseCluster().stopMaster(0);
216 Admin admin = new HBaseAdmin(HTU.getConfiguration());
217 nHdt =admin.getTableDescriptor(hdt.getTableName());
218 Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
219 bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
220
221 admin.disableTable(hdt.getTableName());
222 admin.deleteTable(hdt.getTableName());
223 HTU.getHBaseCluster().startMaster();
224 admin.close();
225 }
226
227 @SuppressWarnings("deprecation")
228 @Test (timeout=300000)
229 public void testReplicaAndReplication() throws Exception {
230 HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication");
231 hdt.setRegionReplication(NB_SERVERS);
232
233 HColumnDescriptor fam = new HColumnDescriptor(row);
234 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
235 hdt.addFamily(fam);
236
237 hdt.addCoprocessor(SlowMeCopro.class.getName());
238 HTU.getHBaseAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
239
240 Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
241 conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
242 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
243 MiniZooKeeperCluster miniZK = HTU.getZkCluster();
244
245 HTU2 = new HBaseTestingUtility(conf2);
246 HTU2.setZkCluster(miniZK);
247 HTU2.startMiniCluster(NB_SERVERS);
248 LOG.info("Setup second Zk");
249 HTU2.getHBaseAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
250
251 ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
252 admin.addPeer("2", HTU2.getClusterKey());
253 admin.close();
254
255 Put p = new Put(row);
256 p.add(row, row, row);
257 final Table table = new HTable(HTU.getConfiguration(), hdt.getTableName());
258 table.put(p);
259
260 HTU.getHBaseAdmin().flush(table.getName());
261 LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
262
263 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
264 @Override
265 public boolean evaluate() throws Exception {
266 try {
267 SlowMeCopro.cdl.set(new CountDownLatch(1));
268 Get g = new Get(row);
269 g.setConsistency(Consistency.TIMELINE);
270 Result r = table.get(g);
271 Assert.assertTrue(r.isStale());
272 return !r.isEmpty();
273 } finally {
274 SlowMeCopro.cdl.get().countDown();
275 SlowMeCopro.sleepTime.set(0);
276 }
277 }});
278 table.close();
279 LOG.info("stale get on the first cluster done. Now for the second.");
280
281 final Table table2 = new HTable(HTU.getConfiguration(), hdt.getTableName());
282 Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
283 @Override
284 public boolean evaluate() throws Exception {
285 try {
286 SlowMeCopro.cdl.set(new CountDownLatch(1));
287 Get g = new Get(row);
288 g.setConsistency(Consistency.TIMELINE);
289 Result r = table2.get(g);
290 Assert.assertTrue(r.isStale());
291 return !r.isEmpty();
292 } finally {
293 SlowMeCopro.cdl.get().countDown();
294 SlowMeCopro.sleepTime.set(0);
295 }
296 }});
297 table2.close();
298
299 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
300 HTU.deleteTable(hdt.getTableName());
301
302 HTU2.getHBaseAdmin().disableTable(hdt.getTableName());
303 HTU2.deleteTable(hdt.getTableName());
304
305
306
307 }
308
309 @Test (timeout=30000)
310 public void testBulkLoad() throws IOException {
311
312 LOG.debug("Creating test table");
313 HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad");
314 hdt.setRegionReplication(NB_SERVERS);
315 hdt.addCoprocessor(SlowMeCopro.class.getName());
316 Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
317
318
319 LOG.debug("Creating test data");
320 Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
321 final int numRows = 10;
322 final byte[] qual = Bytes.toBytes("qual");
323 final byte[] val = Bytes.toBytes("val");
324 final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>();
325 for (HColumnDescriptor col : hdt.getColumnFamilies()) {
326 Path hfile = new Path(dir, col.getNameAsString());
327 TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(),
328 qual, val, numRows);
329 famPaths.add(new Pair<byte[], String>(col.getName(), hfile.toString()));
330 }
331
332
333 LOG.debug("Loading test data");
334 @SuppressWarnings("deprecation")
335 final HConnection conn = HTU.getHBaseAdmin().getConnection();
336 RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
337 conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
338 @Override
339 public Void call(int timeout) throws Exception {
340 LOG.debug("Going to connect to server " + getLocation() + " for row "
341 + Bytes.toStringBinary(getRow()));
342 byte[] regionName = getLocation().getRegionInfo().getRegionName();
343 BulkLoadHFileRequest request =
344 RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
345 getStub().bulkLoadHFile(null, request);
346 return null;
347 }
348 };
349 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
350 RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
351 caller.callWithRetries(callable, 10000);
352
353
354 LOG.debug("Verifying data load");
355 for (int i = 0; i < numRows; i++) {
356 byte[] row = TestHRegionServerBulkLoad.rowkey(i);
357 Get g = new Get(row);
358 Result r = table.get(g);
359 Assert.assertFalse(r.isStale());
360 }
361
362
363 LOG.debug("Verifying replica queries");
364 try {
365 SlowMeCopro.cdl.set(new CountDownLatch(1));
366 for (int i = 0; i < numRows; i++) {
367 byte[] row = TestHRegionServerBulkLoad.rowkey(i);
368 Get g = new Get(row);
369 g.setConsistency(Consistency.TIMELINE);
370 Result r = table.get(g);
371 Assert.assertTrue(r.isStale());
372 }
373 SlowMeCopro.cdl.get().countDown();
374 } finally {
375 SlowMeCopro.cdl.get().countDown();
376 SlowMeCopro.sleepTime.set(0);
377 }
378
379 HTU.getHBaseAdmin().disableTable(hdt.getTableName());
380 HTU.deleteTable(hdt.getTableName());
381 }
382 }