1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.security.PrivilegedExceptionAction;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.Map;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.Coprocessor;
40 import org.apache.hadoop.hbase.HBaseConfiguration;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.KeyValue;
47 import org.apache.hadoop.hbase.testclassification.MediumTests;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.regionserver.HRegion;
51 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
52 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
53 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
54 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
55 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
56 import org.apache.hadoop.hbase.wal.WAL;
57 import org.apache.hadoop.hbase.wal.WALFactory;
58 import org.apache.hadoop.hbase.wal.WALKey;
59 import org.apache.hadoop.hbase.wal.WALSplitter;
60 import org.apache.hadoop.hbase.security.User;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.EnvironmentEdge;
63 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64 import org.apache.hadoop.hbase.util.FSUtils;
65 import org.junit.After;
66 import org.junit.AfterClass;
67 import org.junit.Before;
68 import org.junit.BeforeClass;
69 import org.junit.Rule;
70 import org.junit.Test;
71 import org.junit.rules.TestName;
72 import org.junit.experimental.categories.Category;
73
74
75
76
77
78
79 @Category(MediumTests.class)
80 public class TestWALObserver {
81 private static final Log LOG = LogFactory.getLog(TestWALObserver.class);
82 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
83
84 private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
85 private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
86 Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), };
87 private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
88 Bytes.toBytes("q2"), Bytes.toBytes("q3"), };
89 private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
90 Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
91 private static byte[] TEST_ROW = Bytes.toBytes("testRow");
92
93 @Rule
94 public TestName currentTest = new TestName();
95
96 private Configuration conf;
97 private FileSystem fs;
98 private Path dir;
99 private Path hbaseRootDir;
100 private String logName;
101 private Path oldLogDir;
102 private Path logDir;
103 private WALFactory wals;
104
105 @BeforeClass
106 public static void setupBeforeClass() throws Exception {
107 Configuration conf = TEST_UTIL.getConfiguration();
108 conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
109 SampleRegionWALObserver.class.getName(), SampleRegionWALObserver.Legacy.class.getName());
110 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
111 SampleRegionWALObserver.class.getName());
112 conf.setBoolean("dfs.support.append", true);
113 conf.setInt("dfs.client.block.recovery.retries", 2);
114
115 TEST_UTIL.startMiniCluster(1);
116 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
117 .makeQualified(new Path("/hbase"));
118 LOG.info("hbase.rootdir=" + hbaseRootDir);
119 FSUtils.setRootDir(conf, hbaseRootDir);
120 }
121
122 @AfterClass
123 public static void teardownAfterClass() throws Exception {
124 TEST_UTIL.shutdownMiniCluster();
125 }
126
127 @Before
128 public void setUp() throws Exception {
129 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
130
131 this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
132 this.hbaseRootDir = FSUtils.getRootDir(conf);
133 this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
134 this.oldLogDir = new Path(this.hbaseRootDir,
135 HConstants.HREGION_OLDLOGDIR_NAME);
136 this.logDir = new Path(this.hbaseRootDir,
137 DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
138 this.logName = HConstants.HREGION_LOGDIR_NAME;
139
140 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
141 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
142 }
143 this.wals = new WALFactory(conf, null, currentTest.getMethodName());
144 }
145
146 @After
147 public void tearDown() throws Exception {
148 try {
149 wals.shutdown();
150 } catch (IOException exception) {
151
152 LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage());
153 LOG.debug("details of failure to close wal factory.", exception);
154 }
155 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
156 }
157
158
159
160
161
162
163 @Test
164 public void testWALObserverWriteToWAL() throws Exception {
165 final WAL log = wals.getWAL(UNSPECIFIED_REGION);
166 verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.class), false);
167 }
168
169
170
171
172
173
174 @Test
175 public void testLegacyWALObserverWriteToWAL() throws Exception {
176 final WAL log = wals.getWAL(UNSPECIFIED_REGION);
177 verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.Legacy.class), true);
178 }
179
180 private void verifyWritesSeen(final WAL log, final SampleRegionWALObserver cp,
181 final boolean seesLegacy) throws Exception {
182 HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
183 final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
184 .toString(TEST_TABLE));
185
186 Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
187 deleteDir(basedir);
188 fs.mkdirs(new Path(basedir, hri.getEncodedName()));
189
190
191
192
193 cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
194 TEST_FAMILY[1], TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]);
195
196 assertFalse(cp.isPreWALWriteCalled());
197 assertFalse(cp.isPostWALWriteCalled());
198 assertFalse(cp.isPreWALWriteDeprecatedCalled());
199 assertFalse(cp.isPostWALWriteDeprecatedCalled());
200
201
202
203
204 Put p = creatPutWith2Families(TEST_ROW);
205
206 Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap();
207 WALEdit edit = new WALEdit();
208 addFamilyMapToWALEdit(familyMap, edit);
209
210 boolean foundFamily0 = false;
211 boolean foundFamily2 = false;
212 boolean modifiedFamily1 = false;
213
214 List<Cell> cells = edit.getCells();
215
216 for (Cell cell : cells) {
217 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[0])) {
218 foundFamily0 = true;
219 }
220 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[2])) {
221 foundFamily2 = true;
222 }
223 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[1])) {
224 if (!Arrays.equals(cell.getValue(), TEST_VALUE[1])) {
225 modifiedFamily1 = true;
226 }
227 }
228 }
229 assertTrue(foundFamily0);
230 assertFalse(foundFamily2);
231 assertFalse(modifiedFamily1);
232
233
234 long now = EnvironmentEdgeManager.currentTime();
235
236 long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
237 edit, true);
238 log.sync(txid);
239
240
241 foundFamily0 = false;
242 foundFamily2 = false;
243 modifiedFamily1 = false;
244 for (Cell cell : cells) {
245 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[0])) {
246 foundFamily0 = true;
247 }
248 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[2])) {
249 foundFamily2 = true;
250 }
251 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[1])) {
252 if (!Arrays.equals(cell.getValue(), TEST_VALUE[1])) {
253 modifiedFamily1 = true;
254 }
255 }
256 }
257 assertFalse(foundFamily0);
258 assertTrue(foundFamily2);
259 assertTrue(modifiedFamily1);
260
261 assertTrue(cp.isPreWALWriteCalled());
262 assertTrue(cp.isPostWALWriteCalled());
263 assertEquals(seesLegacy, cp.isPreWALWriteDeprecatedCalled());
264 assertEquals(seesLegacy, cp.isPostWALWriteDeprecatedCalled());
265 }
266
267 @Test
268 public void testNonLegacyWALKeysDoNotExplode() throws Exception {
269 TableName tableName = TableName.valueOf(TEST_TABLE);
270 final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
271 .toString(TEST_TABLE));
272 final HRegionInfo hri = new HRegionInfo(tableName, null, null);
273 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
274
275 fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName()));
276
277 final Configuration newConf = HBaseConfiguration.create(this.conf);
278
279 final WAL wal = wals.getWAL(UNSPECIFIED_REGION);
280 final SampleRegionWALObserver newApi = getCoprocessor(wal, SampleRegionWALObserver.class);
281 newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
282 final SampleRegionWALObserver oldApi = getCoprocessor(wal,
283 SampleRegionWALObserver.Legacy.class);
284 oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
285
286 LOG.debug("ensuring wal entries haven't happened before we start");
287 assertFalse(newApi.isPreWALWriteCalled());
288 assertFalse(newApi.isPostWALWriteCalled());
289 assertFalse(newApi.isPreWALWriteDeprecatedCalled());
290 assertFalse(newApi.isPostWALWriteDeprecatedCalled());
291 assertFalse(oldApi.isPreWALWriteCalled());
292 assertFalse(oldApi.isPostWALWriteCalled());
293 assertFalse(oldApi.isPreWALWriteDeprecatedCalled());
294 assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
295
296 LOG.debug("writing to WAL with non-legacy keys.");
297 final int countPerFamily = 5;
298 for (HColumnDescriptor hcd : htd.getFamilies()) {
299 addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
300 EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
301 }
302
303 LOG.debug("Verify that only the non-legacy CP saw edits.");
304 assertTrue(newApi.isPreWALWriteCalled());
305 assertTrue(newApi.isPostWALWriteCalled());
306 assertFalse(newApi.isPreWALWriteDeprecatedCalled());
307 assertFalse(newApi.isPostWALWriteDeprecatedCalled());
308
309 assertFalse(oldApi.isPreWALWriteCalled());
310 assertFalse(oldApi.isPostWALWriteCalled());
311 assertFalse(oldApi.isPreWALWriteDeprecatedCalled());
312 assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
313
314 LOG.debug("reseting cp state.");
315 newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
316 oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
317
318 LOG.debug("write a log edit that supports legacy cps.");
319 final long now = EnvironmentEdgeManager.currentTime();
320 final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now);
321 final WALEdit edit = new WALEdit();
322 final byte[] nonce = Bytes.toBytes("1772");
323 edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
324 final long txid = wal.append(htd, hri, legacyKey, edit, true);
325 wal.sync(txid);
326
327 LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
328 assertTrue("non-legacy WALObserver didn't see pre-write.", newApi.isPreWALWriteCalled());
329 assertTrue("non-legacy WALObserver didn't see post-write.", newApi.isPostWALWriteCalled());
330 assertFalse("non-legacy WALObserver shouldn't have seen legacy pre-write.",
331 newApi.isPreWALWriteDeprecatedCalled());
332 assertFalse("non-legacy WALObserver shouldn't have seen legacy post-write.",
333 newApi.isPostWALWriteDeprecatedCalled());
334 assertTrue("legacy WALObserver didn't see pre-write.", oldApi.isPreWALWriteCalled());
335 assertTrue("legacy WALObserver didn't see post-write.", oldApi.isPostWALWriteCalled());
336 assertTrue("legacy WALObserver didn't see legacy pre-write.",
337 oldApi.isPreWALWriteDeprecatedCalled());
338 assertTrue("legacy WALObserver didn't see legacy post-write.",
339 oldApi.isPostWALWriteDeprecatedCalled());
340 }
341
342
343
344
345 @Test
346 public void testEmptyWALEditAreNotSeen() throws Exception {
347 final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
348 final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
349 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
350
351 WAL log = wals.getWAL(UNSPECIFIED_REGION);
352 try {
353 SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class);
354
355 cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
356
357 assertFalse(cp.isPreWALWriteCalled());
358 assertFalse(cp.isPostWALWriteCalled());
359
360 final long now = EnvironmentEdgeManager.currentTime();
361 long txid = log.append(htd, hri,
362 new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
363 new WALEdit(), true);
364 log.sync(txid);
365
366 assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
367 assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled());
368 } finally {
369 log.close();
370 }
371 }
372
373
374
375
376 @Test
377 public void testWALCoprocessorReplay() throws Exception {
378
379
380 TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
381 final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
382 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
383
384
385
386
387 final HRegionInfo hri = new HRegionInfo(tableName, null, null);
388
389 final Path basedir =
390 FSUtils.getTableDir(this.hbaseRootDir, tableName);
391 deleteDir(basedir);
392 fs.mkdirs(new Path(basedir, hri.getEncodedName()));
393
394 final Configuration newConf = HBaseConfiguration.create(this.conf);
395
396
397 WAL wal = wals.getWAL(UNSPECIFIED_REGION);
398
399 WALEdit edit = new WALEdit();
400 long now = EnvironmentEdgeManager.currentTime();
401
402 final int countPerFamily = 1000;
403
404 for (HColumnDescriptor hcd : htd.getFamilies()) {
405 addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
406 EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
407 }
408 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
409
410 wal.sync();
411
412 User user = HBaseTestingUtility.getDifferentUser(newConf,
413 ".replay.wal.secondtime");
414 user.runAs(new PrivilegedExceptionAction() {
415 public Object run() throws Exception {
416 Path p = runWALSplit(newConf);
417 LOG.info("WALSplit path == " + p);
418 FileSystem newFS = FileSystem.get(newConf);
419
420 final WALFactory wals2 = new WALFactory(conf, null, currentTest.getMethodName()+"2");
421 WAL wal2 = wals2.getWAL(UNSPECIFIED_REGION);;
422 HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir,
423 hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null);
424 long seqid2 = region.getOpenSeqNum();
425
426 SampleRegionWALObserver cp2 =
427 (SampleRegionWALObserver)region.getCoprocessorHost().findCoprocessor(
428 SampleRegionWALObserver.class.getName());
429
430 assertNotNull(cp2);
431 assertTrue(cp2.isPreWALRestoreCalled());
432 assertTrue(cp2.isPostWALRestoreCalled());
433 assertFalse(cp2.isPreWALRestoreDeprecatedCalled());
434 assertFalse(cp2.isPostWALRestoreDeprecatedCalled());
435 region.close();
436 wals2.close();
437 return null;
438 }
439 });
440 }
441
442
443
444
445
446
447 @Test
448 public void testWALObserverLoaded() throws Exception {
449 WAL log = wals.getWAL(UNSPECIFIED_REGION);
450 assertNotNull(getCoprocessor(log, SampleRegionWALObserver.class));
451 }
452
453 private SampleRegionWALObserver getCoprocessor(WAL wal,
454 Class<? extends SampleRegionWALObserver> clazz) throws Exception {
455 WALCoprocessorHost host = wal.getCoprocessorHost();
456 Coprocessor c = host.findCoprocessor(clazz.getName());
457 return (SampleRegionWALObserver) c;
458 }
459
460
461
462
463
464
465
466 private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
467 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
468
469 for (int i = 0; i < TEST_FAMILY.length; i++) {
470 HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
471 htd.addFamily(a);
472 }
473 return new HRegionInfo(htd.getTableName(), null, null, false);
474 }
475
476
477
478
479 private void deleteDir(final Path p) throws IOException {
480 if (this.fs.exists(p)) {
481 if (!this.fs.delete(p, true)) {
482 throw new IOException("Failed remove of " + p);
483 }
484 }
485 }
486
487 private Put creatPutWith2Families(byte[] row) throws IOException {
488 Put p = new Put(row);
489 for (int i = 0; i < TEST_FAMILY.length - 1; i++) {
490 p.add(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]);
491 }
492 return p;
493 }
494
495
496
497
498
499
500
501
502
503 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
504 WALEdit walEdit) {
505 for (List<Cell> edits : familyMap.values()) {
506 for (Cell cell : edits) {
507
508 walEdit.add(cell);
509 }
510 }
511 }
512
513 private Path runWALSplit(final Configuration c) throws IOException {
514 List<Path> splits = WALSplitter.split(
515 hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
516
517 assertEquals(1, splits.size());
518
519 assertTrue(fs.exists(splits.get(0)));
520 LOG.info("Split file=" + splits.get(0));
521 return splits.get(0);
522 }
523
524 private static final byte[] UNSPECIFIED_REGION = new byte[]{};
525
526 private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
527 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
528 final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
529 String familyStr = Bytes.toString(family);
530 long txid = -1;
531 for (int j = 0; j < count; j++) {
532 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
533 byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
534 WALEdit edit = new WALEdit();
535 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
536
537
538 txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
539 ee.currentTime(), mvcc), edit, true);
540 }
541 if (-1 != txid) {
542 wal.sync(txid);
543 }
544 }
545
546 private HTableDescriptor getBasic3FamilyHTableDescriptor(
547 final TableName tableName) {
548 HTableDescriptor htd = new HTableDescriptor(tableName);
549
550 for (int i = 0; i < TEST_FAMILY.length; i++) {
551 HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
552 htd.addFamily(a);
553 }
554 return htd;
555 }
556
557 private HTableDescriptor createBasic3FamilyHTD(final String tableName) {
558 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
559 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
560 htd.addFamily(a);
561 HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
562 htd.addFamily(b);
563 HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
564 htd.addFamily(c);
565 return htd;
566 }
567 }