1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Comparator;
29 import java.util.Iterator;
30 import java.util.HashSet;
31 import java.util.Set;
32 import java.util.Random;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
41 import org.apache.hadoop.hbase.procedure2.Procedure;
42 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
43 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
44 import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
45 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
46 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
47 import org.apache.hadoop.hbase.testclassification.SmallTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.io.IOUtils;
50
51 import org.junit.After;
52 import org.junit.Before;
53 import org.junit.Assert;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56
57 import static org.junit.Assert.assertEquals;
58 import static org.junit.Assert.assertFalse;
59 import static org.junit.Assert.assertTrue;
60 import static org.junit.Assert.fail;
61
62 @Category(SmallTests.class)
63 public class TestWALProcedureStore {
64 private static final Log LOG = LogFactory.getLog(TestWALProcedureStore.class);
65
66 private static final int PROCEDURE_STORE_SLOTS = 1;
67 private static final Procedure NULL_PROC = null;
68
69 private WALProcedureStore procStore;
70
71 private HBaseCommonTestingUtility htu;
72 private FileSystem fs;
73 private Path testDir;
74 private Path logDir;
75
76 @Before
77 public void setUp() throws IOException {
78 htu = new HBaseCommonTestingUtility();
79 testDir = htu.getDataTestDir();
80 fs = testDir.getFileSystem(htu.getConfiguration());
81 assertTrue(testDir.depth() > 1);
82
83 logDir = new Path(testDir, "proc-logs");
84 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
85 procStore.start(PROCEDURE_STORE_SLOTS);
86 procStore.recoverLease();
87 procStore.load(new LoadCounter());
88 }
89
90 @After
91 public void tearDown() throws IOException {
92 procStore.stop(false);
93 fs.delete(logDir, true);
94 }
95
96 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
97 procStore.stop(false);
98 procStore.start(PROCEDURE_STORE_SLOTS);
99 procStore.recoverLease();
100 procStore.load(loader);
101 }
102
103 @Test
104 public void testEmptyRoll() throws Exception {
105 for (int i = 0; i < 10; ++i) {
106 procStore.periodicRollForTesting();
107 }
108 FileStatus[] status = fs.listStatus(logDir);
109 assertEquals(1, status.length);
110 }
111
112 @Test
113 public void testEmptyLogLoad() throws Exception {
114 LoadCounter loader = new LoadCounter();
115 storeRestart(loader);
116 assertEquals(0, loader.getMaxProcId());
117 assertEquals(0, loader.getLoadedCount());
118 assertEquals(0, loader.getCorruptedCount());
119 }
120
121 @Test
122 public void testLoad() throws Exception {
123 Set<Long> procIds = new HashSet<>();
124
125
126 Procedure proc1 = new TestSequentialProcedure();
127 procIds.add(proc1.getProcId());
128 procStore.insert(proc1, null);
129
130 Procedure proc2 = new TestSequentialProcedure();
131 Procedure[] child2 = new Procedure[2];
132 child2[0] = new TestSequentialProcedure();
133 child2[1] = new TestSequentialProcedure();
134
135 procIds.add(proc2.getProcId());
136 procIds.add(child2[0].getProcId());
137 procIds.add(child2[1].getProcId());
138 procStore.insert(proc2, child2);
139
140
141 verifyProcIdsOnRestart(procIds);
142
143
144 procStore.update(proc1);
145 procStore.update(child2[1]);
146 procStore.delete(child2[1].getProcId());
147 procIds.remove(child2[1].getProcId());
148
149
150 verifyProcIdsOnRestart(procIds);
151
152
153 procStore.stop(false);
154 FileStatus[] logs = fs.listStatus(logDir);
155 assertEquals(3, logs.length);
156 for (int i = 0; i < logs.length; ++i) {
157 corruptLog(logs[i], 4);
158 }
159 verifyProcIdsOnRestart(procIds);
160 }
161
162 @Test
163 public void testNoTrailerDoubleRestart() throws Exception {
164
165 Procedure proc0 = new TestSequentialProcedure();
166 procStore.insert(proc0, null);
167 Procedure proc1 = new TestSequentialProcedure();
168 procStore.insert(proc1, null);
169 Procedure proc2 = new TestSequentialProcedure();
170 procStore.insert(proc2, null);
171 procStore.rollWriterForTesting();
172
173
174 procStore.delete(proc1.getProcId());
175 procStore.rollWriterForTesting();
176
177
178 procStore.update(proc2);
179 procStore.rollWriterForTesting();
180
181
182 procStore.delete(proc2.getProcId());
183
184
185 procStore.stop(false);
186 FileStatus[] logs = fs.listStatus(logDir);
187 assertEquals(4, logs.length);
188 for (int i = 0; i < logs.length; ++i) {
189 corruptLog(logs[i], 4);
190 }
191
192
193 LoadCounter loader = new LoadCounter();
194 storeRestart(loader);
195 assertEquals(1, loader.getLoadedCount());
196 assertEquals(0, loader.getCorruptedCount());
197
198
199 assertEquals(5, fs.listStatus(logDir).length);
200 loader = new LoadCounter();
201 storeRestart(loader);
202 assertEquals(1, loader.getLoadedCount());
203 assertEquals(0, loader.getCorruptedCount());
204
205
206 procStore.delete(proc0.getProcId());
207 procStore.periodicRollForTesting();
208 assertEquals(1, fs.listStatus(logDir).length);
209 storeRestart(loader);
210 }
211
212 @Test
213 public void testCorruptedTrailer() throws Exception {
214
215 for (int i = 0; i < 100; ++i) {
216 procStore.insert(new TestSequentialProcedure(), null);
217 }
218
219
220 procStore.stop(false);
221
222
223 FileStatus[] logs = fs.listStatus(logDir);
224 assertEquals(1, logs.length);
225 corruptLog(logs[0], 4);
226
227 LoadCounter loader = new LoadCounter();
228 storeRestart(loader);
229 assertEquals(100, loader.getLoadedCount());
230 assertEquals(0, loader.getCorruptedCount());
231 }
232
233 @Test
234 public void testCorruptedEntries() throws Exception {
235
236 for (int i = 0; i < 100; ++i) {
237 procStore.insert(new TestSequentialProcedure(), null);
238 }
239
240
241 procStore.stop(false);
242
243
244
245 FileStatus[] logs = fs.listStatus(logDir);
246 assertEquals(1, logs.length);
247 corruptLog(logs[0], 1823);
248
249 LoadCounter loader = new LoadCounter();
250 storeRestart(loader);
251 assertTrue(procStore.getCorruptedLogs() != null);
252 assertEquals(1, procStore.getCorruptedLogs().size());
253 assertEquals(85, loader.getLoadedCount());
254 assertEquals(0, loader.getCorruptedCount());
255 }
256
257 @Test
258 public void testCorruptedProcedures() throws Exception {
259
260 TestProcedure[] rootProcs = new TestProcedure[10];
261 for (int i = 1; i <= rootProcs.length; i++) {
262 rootProcs[i-1] = new TestProcedure(i, 0);
263 procStore.insert(rootProcs[i-1], null);
264 rootProcs[i-1].addStackId(0);
265 procStore.update(rootProcs[i-1]);
266 }
267
268 procStore.rollWriterForTesting();
269 for (int i = 1; i <= rootProcs.length; i++) {
270 TestProcedure b = new TestProcedure(rootProcs.length + i, i);
271 rootProcs[i-1].addStackId(1);
272 procStore.insert(rootProcs[i-1], new Procedure[] { b });
273 }
274
275 procStore.rollWriterForTesting();
276 for (int i = 1; i <= rootProcs.length; i++) {
277 procStore.update(new TestProcedure(rootProcs.length + i, i));
278 }
279
280
281 procStore.stop(false);
282
283
284
285 FileStatus[] logs = fs.listStatus(logDir);
286 assertEquals(Arrays.toString(logs), 2, logs.length);
287 Arrays.sort(logs, new Comparator<FileStatus>() {
288 @Override
289 public int compare(FileStatus o1, FileStatus o2) {
290 return o1.getPath().getName().compareTo(o2.getPath().getName());
291 }
292 });
293
294 LoadCounter loader = new LoadCounter();
295 storeRestart(loader);
296 assertEquals(rootProcs.length * 2, loader.getLoadedCount());
297 assertEquals(0, loader.getCorruptedCount());
298
299
300 fs.delete(logs[0].getPath(), false);
301 loader.reset();
302 storeRestart(loader);
303 assertEquals(0, loader.getLoadedCount());
304 assertEquals(rootProcs.length, loader.getCorruptedCount());
305 for (Procedure proc: loader.getCorrupted()) {
306 assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
307 assertTrue(proc.toString(),
308 proc.getProcId() > rootProcs.length &&
309 proc.getProcId() <= (rootProcs.length * 2));
310 }
311 }
312
313 @Test(timeout=60000)
314 public void testWalReplayOrder_AB_A() throws Exception {
315
316
317
318 TestProcedure a = new TestProcedure(1, 0);
319 TestProcedure b = new TestProcedure(2, 1);
320
321 procStore.insert(a, null);
322 a.addStackId(0);
323 procStore.update(a);
324
325 procStore.insert(a, new Procedure[] { b });
326 b.addStackId(1);
327 procStore.update(b);
328
329 procStore.rollWriterForTesting();
330
331 a.addStackId(2);
332 procStore.update(a);
333
334 storeRestart(new ProcedureStore.ProcedureLoader() {
335 @Override
336 public void setMaxProcId(long maxProcId) {
337 assertEquals(2, maxProcId);
338 }
339
340 @Override
341 public void load(ProcedureIterator procIter) throws IOException {
342 assertTrue(procIter.hasNext());
343 assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
344 assertTrue(procIter.hasNext());
345 assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
346 assertFalse(procIter.hasNext());
347 }
348
349 @Override
350 public void handleCorrupted(ProcedureIterator procIter) throws IOException {
351 assertFalse(procIter.hasNext());
352 }
353 });
354 }
355
356 @Test(timeout=60000)
357 public void testWalReplayOrder_ABC_BAD() throws Exception {
358
359
360
361 TestProcedure a = new TestProcedure(1, 0);
362 TestProcedure b = new TestProcedure(2, 1);
363 TestProcedure c = new TestProcedure(3, 2);
364 TestProcedure d = new TestProcedure(4, 0);
365
366 procStore.insert(a, null);
367 a.addStackId(0);
368 procStore.update(a);
369
370 procStore.insert(a, new Procedure[] { b });
371 b.addStackId(1);
372 procStore.update(b);
373
374 procStore.insert(b, new Procedure[] { c });
375 b.addStackId(2);
376 procStore.update(b);
377
378 procStore.rollWriterForTesting();
379
380 b.addStackId(3);
381 procStore.update(b);
382
383 a.addStackId(4);
384 procStore.update(a);
385
386 procStore.insert(d, null);
387 d.addStackId(0);
388 procStore.update(d);
389
390 storeRestart(new ProcedureStore.ProcedureLoader() {
391 @Override
392 public void setMaxProcId(long maxProcId) {
393 assertEquals(4, maxProcId);
394 }
395
396 @Override
397 public void load(ProcedureIterator procIter) throws IOException {
398 assertTrue(procIter.hasNext());
399 assertEquals(4, procIter.nextAsProcedureInfo().getProcId());
400
401
402
403 assertTrue(procIter.hasNext());
404 assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
405 assertTrue(procIter.hasNext());
406 assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
407 assertTrue(procIter.hasNext());
408 assertEquals(3, procIter.nextAsProcedureInfo().getProcId());
409 assertFalse(procIter.hasNext());
410 }
411
412 @Override
413 public void handleCorrupted(ProcedureIterator procIter) throws IOException {
414 assertFalse(procIter.hasNext());
415 }
416 });
417 }
418
419 @Test
420 public void testInsertUpdateDelete() throws Exception {
421 final int NTHREAD = 2;
422
423 procStore.stop(false);
424 fs.delete(logDir, true);
425
426 org.apache.hadoop.conf.Configuration conf =
427 new org.apache.hadoop.conf.Configuration(htu.getConfiguration());
428 conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
429 conf.setInt("hbase.procedure.store.wal.periodic.roll.msec", 10000);
430 conf.setInt("hbase.procedure.store.wal.roll.threshold", 128 * 1024);
431
432 fs.mkdirs(logDir);
433 procStore = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
434 procStore.start(NTHREAD);
435 procStore.recoverLease();
436
437 LoadCounter loader = new LoadCounter();
438 procStore.load(loader);
439 assertEquals(0, loader.getMaxProcId());
440 assertEquals(0, loader.getLoadedCount());
441 assertEquals(0, loader.getCorruptedCount());
442
443 final long LAST_PROC_ID = 9999;
444 final Thread[] thread = new Thread[NTHREAD];
445 final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100));
446 for (int i = 0; i < thread.length; ++i) {
447 thread[i] = new Thread() {
448 @Override
449 public void run() {
450 Random rand = new Random();
451 TestProcedure proc;
452 do {
453 proc = new TestProcedure(procCounter.addAndGet(1));
454
455 procStore.insert(proc, null);
456
457 for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
458 try { Thread.sleep(0, rand.nextInt(15)); } catch (InterruptedException e) {}
459 procStore.update(proc);
460 }
461
462 procStore.delete(proc.getProcId());
463 } while (proc.getProcId() < LAST_PROC_ID);
464 }
465 };
466 thread[i].start();
467 }
468
469 for (int i = 0; i < thread.length; ++i) {
470 thread[i].join();
471 }
472
473 procStore.getStoreTracker().dump();
474 assertTrue(procCounter.get() >= LAST_PROC_ID);
475 assertTrue(procStore.getStoreTracker().isEmpty());
476 assertEquals(1, procStore.getActiveLogs().size());
477 }
478
479 @Test
480 public void testRollAndRemove() throws IOException {
481
482 Procedure proc1 = new TestSequentialProcedure();
483 procStore.insert(proc1, null);
484
485 Procedure proc2 = new TestSequentialProcedure();
486 procStore.insert(proc2, null);
487
488
489 procStore.rollWriterForTesting();
490 assertEquals(2, procStore.getActiveLogs().size());
491
492
493
494 procStore.update(proc1);
495 procStore.update(proc2);
496 assertEquals(1, procStore.getActiveLogs().size());
497
498
499 procStore.rollWriterForTesting();
500 assertEquals(2, procStore.getActiveLogs().size());
501
502
503
504 procStore.delete(proc1.getProcId());
505 procStore.delete(proc2.getProcId());
506 assertEquals(1, procStore.getActiveLogs().size());
507 }
508
509 private void corruptLog(final FileStatus logFile, final long dropBytes)
510 throws IOException {
511 assertTrue(logFile.getLen() > dropBytes);
512 LOG.debug("corrupt log " + logFile.getPath() +
513 " size=" + logFile.getLen() + " drop=" + dropBytes);
514 Path tmpPath = new Path(testDir, "corrupted.log");
515 InputStream in = fs.open(logFile.getPath());
516 OutputStream out = fs.create(tmpPath);
517 IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
518 if (!fs.rename(tmpPath, logFile.getPath())) {
519 throw new IOException("Unable to rename");
520 }
521 }
522
523 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
524 LOG.debug("expected: " + procIds);
525 LoadCounter loader = new LoadCounter();
526 storeRestart(loader);
527 assertEquals(procIds.size(), loader.getLoadedCount());
528 assertEquals(0, loader.getCorruptedCount());
529 }
530
531 private void assertEmptyLogDir() {
532 try {
533 FileStatus[] status = fs.listStatus(logDir);
534 assertTrue("expected empty state-log dir", status == null || status.length == 0);
535 } catch (FileNotFoundException e) {
536 fail("expected the state-log dir to be present: " + logDir);
537 } catch (IOException e) {
538 fail("got en exception on state-log dir list: " + e.getMessage());
539 }
540 }
541
542 public static class TestSequentialProcedure extends SequentialProcedure<Void> {
543 private static long seqid = 0;
544
545 public TestSequentialProcedure() {
546 setProcId(++seqid);
547 }
548
549 @Override
550 protected Procedure[] execute(Void env) { return null; }
551
552 @Override
553 protected void rollback(Void env) { }
554
555 @Override
556 protected boolean abort(Void env) { return false; }
557
558 @Override
559 protected void serializeStateData(final OutputStream stream) throws IOException {
560 long procId = getProcId();
561 if (procId % 2 == 0) {
562 stream.write(Bytes.toBytes(procId));
563 }
564 }
565
566 @Override
567 protected void deserializeStateData(InputStream stream) throws IOException {
568 long procId = getProcId();
569 if (procId % 2 == 0) {
570 byte[] bProcId = new byte[8];
571 assertEquals(8, stream.read(bProcId));
572 assertEquals(procId, Bytes.toLong(bProcId));
573 } else {
574 assertEquals(0, stream.available());
575 }
576 }
577 }
578
579 private class LoadCounter implements ProcedureStore.ProcedureLoader {
580 private final ArrayList<Procedure> corrupted = new ArrayList<Procedure>();
581 private final ArrayList<Procedure> loaded = new ArrayList<Procedure>();
582
583 private Set<Long> procIds;
584 private long maxProcId = 0;
585
586 public LoadCounter() {
587 this(null);
588 }
589
590 public LoadCounter(final Set<Long> procIds) {
591 this.procIds = procIds;
592 }
593
594 public void reset() {
595 reset(null);
596 }
597
598 public void reset(final Set<Long> procIds) {
599 corrupted.clear();
600 loaded.clear();
601 this.procIds = procIds;
602 this.maxProcId = 0;
603 }
604
605 public long getMaxProcId() {
606 return maxProcId;
607 }
608
609 public ArrayList<Procedure> getLoaded() {
610 return loaded;
611 }
612
613 public int getLoadedCount() {
614 return loaded.size();
615 }
616
617 public ArrayList<Procedure> getCorrupted() {
618 return corrupted;
619 }
620
621 public int getCorruptedCount() {
622 return corrupted.size();
623 }
624
625 @Override
626 public void setMaxProcId(long maxProcId) {
627 maxProcId = maxProcId;
628 }
629
630 @Override
631 public void load(ProcedureIterator procIter) throws IOException {
632 while (procIter.hasNext()) {
633 Procedure proc = procIter.nextAsProcedure();
634 LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
635 if (procIds != null) {
636 assertTrue("procId=" + proc.getProcId() + " unexpected",
637 procIds.contains(proc.getProcId()));
638 }
639 loaded.add(proc);
640 }
641 }
642
643 @Override
644 public void handleCorrupted(ProcedureIterator procIter) throws IOException {
645 while (procIter.hasNext()) {
646 Procedure proc = procIter.nextAsProcedure();
647 LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
648 corrupted.add(proc);
649 }
650 }
651 }
652 }