1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22 import java.io.FileNotFoundException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.concurrent.atomic.AtomicReference;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.ReentrantLock;
28 import java.util.concurrent.LinkedTransferQueue;
29 import java.util.concurrent.TimeUnit;
30 import java.util.Arrays;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.LinkedList;
36 import java.util.Set;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FSDataOutputStream;
42 import org.apache.hadoop.fs.FileAlreadyExistsException;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.fs.PathFilter;
47 import org.apache.hadoop.hbase.classification.InterfaceAudience;
48 import org.apache.hadoop.hbase.classification.InterfaceStability;
49 import org.apache.hadoop.hbase.procedure2.Procedure;
50 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
51 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
52 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
53 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
54 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
55 import org.apache.hadoop.hbase.util.Threads;
56 import org.apache.hadoop.ipc.RemoteException;
57
58 import com.google.common.annotations.VisibleForTesting;
59
60
61
62
63 @InterfaceAudience.Private
64 @InterfaceStability.Evolving
65 public class WALProcedureStore extends ProcedureStoreBase {
66 private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
67
68 public interface LeaseRecovery {
69 void recoverFileLease(FileSystem fs, Path path) throws IOException;
70 }
71
72 private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
73 "hbase.procedure.store.wal.max.retries.before.roll";
74 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
75
76 private static final String WAIT_BEFORE_ROLL_CONF_KEY =
77 "hbase.procedure.store.wal.wait.before.roll";
78 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
79
80 private static final String ROLL_RETRIES_CONF_KEY =
81 "hbase.procedure.store.wal.max.roll.retries";
82 private static final int DEFAULT_ROLL_RETRIES = 3;
83
84 private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
85 "hbase.procedure.store.wal.sync.failure.roll.max";
86 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
87
88 private static final String PERIODIC_ROLL_CONF_KEY =
89 "hbase.procedure.store.wal.periodic.roll.msec";
90 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000;
91
92 private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
93 private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
94
95 private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
96 private static final boolean DEFAULT_USE_HSYNC = true;
97
98 private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
99 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024;
100
101 private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
102 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
103 private final ReentrantLock lock = new ReentrantLock();
104 private final Condition waitCond = lock.newCondition();
105 private final Condition slotCond = lock.newCondition();
106 private final Condition syncCond = lock.newCondition();
107
108 private final LeaseRecovery leaseRecovery;
109 private final Configuration conf;
110 private final FileSystem fs;
111 private final Path logDir;
112
113 private final AtomicReference<Throwable> syncException = new AtomicReference<Throwable>();
114 private final AtomicBoolean loading = new AtomicBoolean(true);
115 private final AtomicBoolean inSync = new AtomicBoolean(false);
116 private final AtomicLong totalSynced = new AtomicLong(0);
117 private final AtomicLong lastRollTs = new AtomicLong(0);
118
119 private LinkedTransferQueue<ByteSlot> slotsCache = null;
120 private Set<ProcedureWALFile> corruptedLogs = null;
121 private FSDataOutputStream stream = null;
122 private long flushLogId = 0;
123 private int slotIndex = 0;
124 private Thread syncThread;
125 private ByteSlot[] slots;
126
127 private int maxRetriesBeforeRoll;
128 private int maxSyncFailureRoll;
129 private int waitBeforeRoll;
130 private int rollRetries;
131 private int periodicRollMsec;
132 private long rollThreshold;
133 private boolean useHsync;
134 private int syncWaitMsec;
135
136 public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir,
137 final LeaseRecovery leaseRecovery) {
138 this.fs = fs;
139 this.conf = conf;
140 this.logDir = logDir;
141 this.leaseRecovery = leaseRecovery;
142 }
143
144 @Override
145 public void start(int numSlots) throws IOException {
146 if (!setRunning(true)) {
147 return;
148 }
149
150
151 loading.set(true);
152 slots = new ByteSlot[numSlots];
153 slotsCache = new LinkedTransferQueue();
154 while (slotsCache.size() < numSlots) {
155 slotsCache.offer(new ByteSlot());
156 }
157
158
159 maxRetriesBeforeRoll =
160 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
161 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
162 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
163 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
164 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
165 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
166 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
167 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
168
169
170 syncThread = new Thread("WALProcedureStoreSyncThread") {
171 @Override
172 public void run() {
173 try {
174 syncLoop();
175 } catch (Throwable e) {
176 LOG.error("Got an exception from the sync-loop", e);
177 if (!isSyncAborted()) {
178 sendAbortProcessSignal();
179 }
180 }
181 }
182 };
183 syncThread.start();
184 }
185
186 @Override
187 public void stop(boolean abort) {
188 if (!setRunning(false)) {
189 return;
190 }
191
192 LOG.info("Stopping the WAL Procedure Store");
193 sendStopSignal();
194
195 if (!abort) {
196 try {
197 while (syncThread.isAlive()) {
198 sendStopSignal();
199 syncThread.join(250);
200 }
201 } catch (InterruptedException e) {
202 LOG.warn("join interrupted", e);
203 Thread.currentThread().interrupt();
204 }
205 }
206
207
208 closeStream();
209
210
211
212
213 for (ProcedureWALFile log: logs) {
214 log.close();
215 }
216 logs.clear();
217 }
218
219 private void sendStopSignal() {
220 if (lock.tryLock()) {
221 try {
222 waitCond.signalAll();
223 syncCond.signalAll();
224 } finally {
225 lock.unlock();
226 }
227 }
228 }
229
230 @Override
231 public int getNumThreads() {
232 return slots == null ? 0 : slots.length;
233 }
234
235 public ProcedureStoreTracker getStoreTracker() {
236 return storeTracker;
237 }
238
239 public ArrayList<ProcedureWALFile> getActiveLogs() {
240 lock.lock();
241 try {
242 return new ArrayList<ProcedureWALFile>(logs);
243 } finally {
244 lock.unlock();
245 }
246 }
247
248 public Set<ProcedureWALFile> getCorruptedLogs() {
249 return corruptedLogs;
250 }
251
252 @Override
253 public void recoverLease() throws IOException {
254 lock.lock();
255 try {
256 LOG.info("Starting WAL Procedure Store lease recovery");
257 FileStatus[] oldLogs = getLogFiles();
258 while (isRunning()) {
259
260 flushLogId = initOldLogs(oldLogs);
261
262
263 if (!rollWriter(flushLogId + 1)) {
264
265 LOG.debug("someone else has already created log " + flushLogId);
266 continue;
267 }
268
269
270 oldLogs = getLogFiles();
271 if (getMaxLogId(oldLogs) > flushLogId) {
272 if (LOG.isDebugEnabled()) {
273 LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
274 }
275 logs.getLast().removeFile();
276 continue;
277 }
278
279 LOG.info("Lease acquired for flushLogId: " + flushLogId);
280 break;
281 }
282 } finally {
283 lock.unlock();
284 }
285 }
286
287 @Override
288 public void load(final ProcedureLoader loader) throws IOException {
289 if (logs.isEmpty()) {
290 throw new RuntimeException("recoverLease() must be called before loading data");
291 }
292
293
294 if (logs.size() == 1) {
295 if (LOG.isDebugEnabled()) {
296 LOG.debug("No state logs to replay.");
297 }
298 loader.setMaxProcId(0);
299 loading.set(false);
300 return;
301 }
302
303
304 Iterator<ProcedureWALFile> it = logs.descendingIterator();
305 it.next();
306 try {
307 ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
308 @Override
309 public void setMaxProcId(long maxProcId) {
310 loader.setMaxProcId(maxProcId);
311 }
312
313 @Override
314 public void load(ProcedureIterator procIter) throws IOException {
315 loader.load(procIter);
316 }
317
318 @Override
319 public void handleCorrupted(ProcedureIterator procIter) throws IOException {
320 loader.handleCorrupted(procIter);
321 }
322
323 @Override
324 public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
325 if (corruptedLogs == null) {
326 corruptedLogs = new HashSet<ProcedureWALFile>();
327 }
328 corruptedLogs.add(log);
329
330 }
331 });
332 } finally {
333 loading.set(false);
334 }
335 }
336
337 @Override
338 public void insert(final Procedure proc, final Procedure[] subprocs) {
339 if (LOG.isTraceEnabled()) {
340 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
341 }
342
343 ByteSlot slot = acquireSlot();
344 try {
345
346 long[] subProcIds = null;
347 if (subprocs != null) {
348 ProcedureWALFormat.writeInsert(slot, proc, subprocs);
349 subProcIds = new long[subprocs.length];
350 for (int i = 0; i < subprocs.length; ++i) {
351 subProcIds[i] = subprocs[i].getProcId();
352 }
353 } else {
354 assert !proc.hasParent();
355 ProcedureWALFormat.writeInsert(slot, proc);
356 }
357
358
359 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
360 } catch (IOException e) {
361
362
363 LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
364 ", subprocs=" + Arrays.toString(subprocs), e);
365 throw new RuntimeException(e);
366 } finally {
367 releaseSlot(slot);
368 }
369 }
370
371 @Override
372 public void update(final Procedure proc) {
373 if (LOG.isTraceEnabled()) {
374 LOG.trace("Update " + proc);
375 }
376
377 ByteSlot slot = acquireSlot();
378 try {
379
380 ProcedureWALFormat.writeUpdate(slot, proc);
381
382
383 pushData(PushType.UPDATE, slot, proc.getProcId(), null);
384 } catch (IOException e) {
385
386
387 LOG.fatal("Unable to serialize the procedure: " + proc, e);
388 throw new RuntimeException(e);
389 } finally {
390 releaseSlot(slot);
391 }
392 }
393
394 @Override
395 public void delete(final long procId) {
396 if (LOG.isTraceEnabled()) {
397 LOG.trace("Delete " + procId);
398 }
399
400 ByteSlot slot = acquireSlot();
401 try {
402
403 ProcedureWALFormat.writeDelete(slot, procId);
404
405
406 pushData(PushType.DELETE, slot, procId, null);
407 } catch (IOException e) {
408
409
410 LOG.fatal("Unable to serialize the procedure: " + procId, e);
411 throw new RuntimeException(e);
412 } finally {
413 releaseSlot(slot);
414 }
415 }
416
417 private ByteSlot acquireSlot() {
418 ByteSlot slot = slotsCache.poll();
419 return slot != null ? slot : new ByteSlot();
420 }
421
422 private void releaseSlot(final ByteSlot slot) {
423 slot.reset();
424 slotsCache.offer(slot);
425 }
426
427 private enum PushType { INSERT, UPDATE, DELETE };
428
429 private long pushData(final PushType type, final ByteSlot slot,
430 final long procId, final long[] subProcIds) {
431 if (!isRunning()) {
432 throw new RuntimeException("the store must be running before inserting data");
433 }
434 if (logs.isEmpty()) {
435 throw new RuntimeException("recoverLease() must be called before inserting data");
436 }
437
438 long logId = -1;
439 lock.lock();
440 try {
441
442 while (true) {
443 if (!isRunning()) {
444 throw new RuntimeException("store no longer running");
445 } else if (isSyncAborted()) {
446 throw new RuntimeException("sync aborted", syncException.get());
447 } else if (inSync.get()) {
448 syncCond.await();
449 } else if (slotIndex == slots.length) {
450 slotCond.signal();
451 syncCond.await();
452 } else {
453 break;
454 }
455 }
456
457 updateStoreTracker(type, procId, subProcIds);
458 slots[slotIndex++] = slot;
459 logId = flushLogId;
460
461
462 if (slotIndex == 1) {
463 waitCond.signal();
464 }
465
466
467 if (slotIndex == slots.length) {
468 waitCond.signal();
469 slotCond.signal();
470 }
471
472 syncCond.await();
473 } catch (InterruptedException e) {
474 Thread.currentThread().interrupt();
475 sendAbortProcessSignal();
476 throw new RuntimeException(e);
477 } finally {
478 lock.unlock();
479 if (isSyncAborted()) {
480 throw new RuntimeException("sync aborted", syncException.get());
481 }
482 }
483 return logId;
484 }
485
486 private void updateStoreTracker(final PushType type,
487 final long procId, final long[] subProcIds) {
488 switch (type) {
489 case INSERT:
490 if (subProcIds == null) {
491 storeTracker.insert(procId);
492 } else {
493 storeTracker.insert(procId, subProcIds);
494 }
495 break;
496 case UPDATE:
497 storeTracker.update(procId);
498 break;
499 case DELETE:
500 storeTracker.delete(procId);
501 break;
502 default:
503 throw new RuntimeException("invalid push type " + type);
504 }
505 }
506
507 private boolean isSyncAborted() {
508 return syncException.get() != null;
509 }
510
511 private void syncLoop() throws Throwable {
512 inSync.set(false);
513 lock.lock();
514 try {
515 while (isRunning()) {
516 try {
517
518 if (slotIndex == 0) {
519 if (!loading.get()) {
520 periodicRoll();
521 }
522
523 if (LOG.isTraceEnabled()) {
524 float rollTsSec = getMillisFromLastRoll() / 1000.0f;
525 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
526 StringUtils.humanSize(totalSynced.get()),
527 StringUtils.humanSize(totalSynced.get() / rollTsSec)));
528 }
529
530 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
531 if (slotIndex == 0) {
532
533 continue;
534 }
535 }
536
537
538 long syncWaitSt = System.currentTimeMillis();
539 if (slotIndex != slots.length) {
540 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
541 }
542 long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
543 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
544 float rollSec = getMillisFromLastRoll() / 1000.0f;
545 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
546 StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
547 StringUtils.humanSize(totalSynced.get()),
548 StringUtils.humanSize(totalSynced.get() / rollSec)));
549 }
550
551 inSync.set(true);
552 totalSynced.addAndGet(syncSlots());
553 slotIndex = 0;
554 inSync.set(false);
555 } catch (InterruptedException e) {
556 Thread.currentThread().interrupt();
557 sendAbortProcessSignal();
558 syncException.compareAndSet(null, e);
559 throw e;
560 } catch (Throwable t) {
561 syncException.compareAndSet(null, t);
562 throw t;
563 } finally {
564 syncCond.signalAll();
565 }
566 }
567 } finally {
568 lock.unlock();
569 }
570 }
571
572 private long syncSlots() throws Throwable {
573 int retry = 0;
574 int logRolled = 0;
575 long totalSynced = 0;
576 do {
577 try {
578 totalSynced = syncSlots(stream, slots, 0, slotIndex);
579 break;
580 } catch (Throwable e) {
581 LOG.warn("unable to sync slots, retry=" + retry);
582 if (++retry >= maxRetriesBeforeRoll) {
583 if (logRolled >= maxSyncFailureRoll) {
584 LOG.error("Sync slots after log roll failed, abort.", e);
585 sendAbortProcessSignal();
586 throw e;
587 }
588
589 if (!rollWriterOrDie()) {
590 throw e;
591 }
592
593 logRolled++;
594 retry = 0;
595 }
596 }
597 } while (isRunning());
598 return totalSynced;
599 }
600
601 protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
602 throws IOException {
603 long totalSynced = 0;
604 for (int i = 0; i < count; ++i) {
605 ByteSlot data = slots[offset + i];
606 data.writeTo(stream);
607 totalSynced += data.size();
608 }
609
610 if (useHsync) {
611 stream.hsync();
612 } else {
613 stream.hflush();
614 }
615 sendPostSyncSignal();
616
617 if (LOG.isTraceEnabled()) {
618 LOG.trace("Sync slots=" + count + '/' + slots.length +
619 ", flushed=" + StringUtils.humanSize(totalSynced));
620 }
621 return totalSynced;
622 }
623
624 private boolean rollWriterOrDie() {
625 for (int i = 0; i < rollRetries; ++i) {
626 if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
627
628 try {
629 if (rollWriter()) {
630 return true;
631 }
632 } catch (IOException e) {
633 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
634 }
635 }
636 LOG.fatal("Unable to roll the log");
637 sendAbortProcessSignal();
638 throw new RuntimeException("unable to roll the log");
639 }
640
641 private boolean tryRollWriter() {
642 try {
643 return rollWriter();
644 } catch (IOException e) {
645 LOG.warn("Unable to roll the log", e);
646 return false;
647 }
648 }
649
650 private long getMillisToNextPeriodicRoll() {
651 if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
652 return periodicRollMsec - getMillisFromLastRoll();
653 }
654 return Long.MAX_VALUE;
655 }
656
657 private long getMillisFromLastRoll() {
658 return (System.currentTimeMillis() - lastRollTs.get());
659 }
660
661 @VisibleForTesting
662 protected void periodicRollForTesting() throws IOException {
663 lock.lock();
664 try {
665 periodicRoll();
666 } finally {
667 lock.unlock();
668 }
669 }
670
671 @VisibleForTesting
672 protected boolean rollWriterForTesting() throws IOException {
673 lock.lock();
674 try {
675 return rollWriter();
676 } finally {
677 lock.unlock();
678 }
679 }
680
681 private void periodicRoll() throws IOException {
682 if (storeTracker.isEmpty()) {
683 if (LOG.isTraceEnabled()) {
684 LOG.trace("no active procedures");
685 }
686 tryRollWriter();
687 removeAllLogs(flushLogId - 1);
688 } else {
689 if (storeTracker.isUpdated()) {
690 if (LOG.isTraceEnabled()) {
691 LOG.trace("all the active procedures are in the latest log");
692 }
693 removeAllLogs(flushLogId - 1);
694 }
695
696
697
698 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
699 tryRollWriter();
700 }
701
702 removeInactiveLogs();
703 }
704 }
705
706 private boolean rollWriter() throws IOException {
707
708 if (!rollWriter(flushLogId + 1)) {
709 LOG.warn("someone else has already created log " + flushLogId);
710 return false;
711 }
712
713
714
715 if (getMaxLogId(getLogFiles()) > flushLogId) {
716 LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
717 logs.getLast().removeFile();
718 return false;
719 }
720
721
722 return true;
723 }
724
725 private boolean rollWriter(final long logId) throws IOException {
726 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
727 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
728
729 ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
730 .setVersion(ProcedureWALFormat.HEADER_VERSION)
731 .setType(ProcedureWALFormat.LOG_TYPE_STREAM)
732 .setMinProcId(storeTracker.getMinProcId())
733 .setLogId(logId)
734 .build();
735
736 FSDataOutputStream newStream = null;
737 Path newLogFile = null;
738 long startPos = -1;
739 newLogFile = getLogFilePath(logId);
740 try {
741 newStream = fs.create(newLogFile, false);
742 } catch (FileAlreadyExistsException e) {
743 LOG.error("Log file with id=" + logId + " already exists", e);
744 return false;
745 } catch (RemoteException re) {
746 LOG.warn("failed to create log file with id=" + logId, re);
747 return false;
748 }
749 try {
750 ProcedureWALFormat.writeHeader(newStream, header);
751 startPos = newStream.getPos();
752 } catch (IOException ioe) {
753 LOG.warn("Encountered exception writing header", ioe);
754 newStream.close();
755 return false;
756 }
757
758 closeStream();
759
760 storeTracker.resetUpdates();
761 stream = newStream;
762 flushLogId = logId;
763 totalSynced.set(0);
764 lastRollTs.set(System.currentTimeMillis());
765 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
766
767 if (LOG.isDebugEnabled()) {
768 LOG.debug("Roll new state log: " + logId);
769 }
770 return true;
771 }
772
773 private void closeStream() {
774 try {
775 if (stream != null) {
776 try {
777 ProcedureWALFile log = logs.getLast();
778 log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
779 ProcedureWALFormat.writeTrailer(stream, storeTracker);
780 } catch (IOException e) {
781 LOG.warn("Unable to write the trailer: " + e.getMessage());
782 }
783 stream.close();
784 }
785 } catch (IOException e) {
786 LOG.error("Unable to close the stream", e);
787 } finally {
788 stream = null;
789 }
790 }
791
792
793
794
795 private void removeInactiveLogs() {
796
797 while (logs.size() > 1) {
798 ProcedureWALFile log = logs.getFirst();
799 if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
800 break;
801 }
802 removeLogFile(log);
803 }
804 }
805
806 private void removeAllLogs(long lastLogId) {
807 if (logs.size() <= 1) return;
808
809 if (LOG.isDebugEnabled()) {
810 LOG.debug("Remove all state logs with ID less than " + lastLogId);
811 }
812 while (logs.size() > 1) {
813 ProcedureWALFile log = logs.getFirst();
814 if (lastLogId < log.getLogId()) {
815 break;
816 }
817 removeLogFile(log);
818 }
819 }
820
821 private boolean removeLogFile(final ProcedureWALFile log) {
822 try {
823 if (LOG.isDebugEnabled()) {
824 LOG.debug("Remove log: " + log);
825 }
826 log.removeFile();
827 logs.remove(log);
828 LOG.info("Remove log: " + log);
829 LOG.info("Removed logs: " + logs);
830 if (logs.size() == 0) { LOG.error("Expected at least one log"); }
831 assert logs.size() > 0 : "expected at least one log";
832 } catch (IOException e) {
833 LOG.error("Unable to remove log: " + log, e);
834 return false;
835 }
836 return true;
837 }
838
839
840
841
842 public Path getLogDir() {
843 return this.logDir;
844 }
845
846 public FileSystem getFileSystem() {
847 return this.fs;
848 }
849
850 protected Path getLogFilePath(final long logId) throws IOException {
851 return new Path(logDir, String.format("state-%020d.log", logId));
852 }
853
854 private static long getLogIdFromName(final String name) {
855 int end = name.lastIndexOf(".log");
856 int start = name.lastIndexOf('-') + 1;
857 while (start < end) {
858 if (name.charAt(start) != '0')
859 break;
860 start++;
861 }
862 return Long.parseLong(name.substring(start, end));
863 }
864
865 private FileStatus[] getLogFiles() throws IOException {
866 try {
867 return fs.listStatus(logDir, new PathFilter() {
868 @Override
869 public boolean accept(Path path) {
870 String name = path.getName();
871 return name.startsWith("state-") && name.endsWith(".log");
872 }
873 });
874 } catch (FileNotFoundException e) {
875 LOG.warn("Log directory not found: " + e.getMessage());
876 return null;
877 }
878 }
879
880 private static long getMaxLogId(final FileStatus[] logFiles) {
881 long maxLogId = 0;
882 if (logFiles != null && logFiles.length > 0) {
883 for (int i = 0; i < logFiles.length; ++i) {
884 maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName()));
885 }
886 }
887 return maxLogId;
888 }
889
890
891
892
893 private long initOldLogs(final FileStatus[] logFiles) throws IOException {
894 this.logs.clear();
895
896 long maxLogId = 0;
897 if (logFiles != null && logFiles.length > 0) {
898 for (int i = 0; i < logFiles.length; ++i) {
899 final Path logPath = logFiles[i].getPath();
900 leaseRecovery.recoverFileLease(fs, logPath);
901 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
902
903 ProcedureWALFile log = initOldLog(logFiles[i]);
904 if (log != null) {
905 this.logs.add(log);
906 }
907 }
908 Collections.sort(this.logs);
909 initTrackerFromOldLogs();
910 }
911 return maxLogId;
912 }
913
914 private void initTrackerFromOldLogs() {
915
916 if (!logs.isEmpty()) {
917 ProcedureWALFile log = logs.getLast();
918 try {
919 log.readTracker(storeTracker);
920 } catch (IOException e) {
921 LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
922
923 storeTracker.reset();
924 storeTracker.setPartialFlag(true);
925 }
926 }
927 }
928
929 private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
930 ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
931 if (logFile.getLen() == 0) {
932 LOG.warn("Remove uninitialized log: " + logFile);
933 log.removeFile();
934 return null;
935 }
936 if (LOG.isDebugEnabled()) {
937 LOG.debug("Opening state-log: " + logFile);
938 }
939 try {
940 log.open();
941 } catch (ProcedureWALFormat.InvalidWALDataException e) {
942 LOG.warn("Remove uninitialized log: " + logFile, e);
943 log.removeFile();
944 return null;
945 } catch (IOException e) {
946 String msg = "Unable to read state log: " + logFile;
947 LOG.error(msg, e);
948 throw new IOException(msg, e);
949 }
950
951 if (log.isCompacted()) {
952 try {
953 log.readTrailer();
954 } catch (IOException e) {
955 LOG.warn("Unfinished compacted log: " + logFile, e);
956 log.removeFile();
957 return null;
958 }
959 }
960 return log;
961 }
962 }