1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.io.hfile.bucket;
22
23 import java.io.File;
24 import java.io.FileInputStream;
25 import java.io.FileNotFoundException;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.ObjectInputStream;
29 import java.io.ObjectOutputStream;
30 import java.io.Serializable;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Comparator;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.PriorityQueue;
38 import java.util.Set;
39 import java.util.concurrent.ArrayBlockingQueue;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicLong;
47 import java.util.concurrent.locks.Lock;
48 import java.util.concurrent.locks.ReentrantLock;
49 import java.util.concurrent.locks.ReentrantReadWriteLock;
50
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.hbase.classification.InterfaceAudience;
54 import org.apache.hadoop.hbase.io.HeapSize;
55 import org.apache.hadoop.hbase.io.hfile.BlockCache;
56 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
57 import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
58 import org.apache.hadoop.hbase.io.hfile.BlockPriority;
59 import org.apache.hadoop.hbase.io.hfile.BlockType;
60 import org.apache.hadoop.hbase.io.hfile.CacheStats;
61 import org.apache.hadoop.hbase.io.hfile.Cacheable;
62 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
63 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
64 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
65 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
66 import org.apache.hadoop.hbase.util.ConcurrentIndex;
67 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
68 import org.apache.hadoop.hbase.util.HasThread;
69 import org.apache.hadoop.hbase.util.IdReadWriteLock;
70 import org.apache.hadoop.util.StringUtils;
71
72 import com.google.common.annotations.VisibleForTesting;
73 import com.google.common.collect.ImmutableList;
74 import com.google.common.util.concurrent.ThreadFactoryBuilder;
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 @InterfaceAudience.Private
95 public class BucketCache implements BlockCache, HeapSize {
96 private static final Log LOG = LogFactory.getLog(BucketCache.class);
97
98
99 private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
100 private static final float DEFAULT_MULTI_FACTOR = 0.50f;
101 private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
102 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
103
104 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
105 private static final float DEFAULT_MIN_FACTOR = 0.85f;
106
107
108 private static final int statThreadPeriod = 5 * 60;
109
110 final static int DEFAULT_WRITER_THREADS = 3;
111 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
112
113
114 final IOEngine ioEngine;
115
116
117 @VisibleForTesting
118 final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
119
120 @VisibleForTesting
121 ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
122
123
124
125
126
127
128 private volatile boolean cacheEnabled;
129
130
131
132
133
134
135
136
137 @VisibleForTesting
138 final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
139 new ArrayList<BlockingQueue<RAMQueueEntry>>();
140 @VisibleForTesting
141 final WriterThread[] writerThreads;
142
143
144 private volatile boolean freeInProgress = false;
145 private final Lock freeSpaceLock = new ReentrantLock();
146
147 private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
148
149 private final AtomicLong realCacheSize = new AtomicLong(0);
150 private final AtomicLong heapSize = new AtomicLong(0);
151
152 private final AtomicLong blockNumber = new AtomicLong(0);
153
154
155 private final AtomicLong accessCount = new AtomicLong(0);
156
157 private static final int DEFAULT_CACHE_WAIT_TIME = 50;
158
159
160
161 boolean wait_when_cache = false;
162
163 private final BucketCacheStats cacheStats = new BucketCacheStats();
164
165 private final String persistencePath;
166 private final long cacheCapacity;
167
168 private final long blockSize;
169
170
171 private final int ioErrorsTolerationDuration;
172
173 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
174
175
176
177 private volatile long ioErrorStartTime = -1;
178
179
180
181
182
183 @VisibleForTesting
184 final IdReadWriteLock offsetLock = new IdReadWriteLock();
185
186 private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
187 new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
188 @Override
189 public int compare(BlockCacheKey a, BlockCacheKey b) {
190 if (a.getOffset() == b.getOffset()) {
191 return 0;
192 } else if (a.getOffset() < b.getOffset()) {
193 return -1;
194 }
195 return 1;
196 }
197 });
198
199
200 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
201 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
202
203
204 private BucketAllocator bucketAllocator;
205
206 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
207 int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
208 IOException {
209 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
210 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
211 }
212
213 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
214 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
215 throws FileNotFoundException, IOException {
216 this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
217 this.writerThreads = new WriterThread[writerThreadNum];
218 long blockNumCapacity = capacity / blockSize;
219 if (blockNumCapacity >= Integer.MAX_VALUE) {
220
221 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
222 }
223
224 this.cacheCapacity = capacity;
225 this.persistencePath = persistencePath;
226 this.blockSize = blockSize;
227 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
228
229 bucketAllocator = new BucketAllocator(capacity, bucketSizes);
230 for (int i = 0; i < writerThreads.length; ++i) {
231 writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
232 }
233
234 assert writerQueues.size() == writerThreads.length;
235 this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
236
237 this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
238
239 if (ioEngine.isPersistent() && persistencePath != null) {
240 try {
241 retrieveFromFile(bucketSizes);
242 } catch (IOException ioex) {
243 LOG.error("Can't restore from file because of", ioex);
244 } catch (ClassNotFoundException cnfe) {
245 LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
246 throw new RuntimeException(cnfe);
247 }
248 }
249 final String threadName = Thread.currentThread().getName();
250 this.cacheEnabled = true;
251 for (int i = 0; i < writerThreads.length; ++i) {
252 writerThreads[i] = new WriterThread(writerQueues.get(i));
253 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
254 writerThreads[i].setDaemon(true);
255 }
256 startWriterThreads();
257
258
259
260
261 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
262 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
263 LOG.info("Started bucket cache; ioengine=" + ioEngineName +
264 ", capacity=" + StringUtils.byteDesc(capacity) +
265 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
266 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
267 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
268 }
269
270
271
272
273
274 @VisibleForTesting
275 protected void startWriterThreads() {
276 for (WriterThread thread : writerThreads) {
277 thread.start();
278 }
279 }
280
281 @VisibleForTesting
282 boolean isCacheEnabled() {
283 return this.cacheEnabled;
284 }
285
286 public long getMaxSize() {
287 return this.cacheCapacity;
288 }
289
290 public String getIoEngine() {
291 return ioEngine.toString();
292 }
293
294
295
296
297
298
299
300
301 private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
302 throws IOException {
303 if (ioEngineName.startsWith("file:"))
304 return new FileIOEngine(ioEngineName.substring(5), capacity);
305 else if (ioEngineName.startsWith("offheap"))
306 return new ByteBufferIOEngine(capacity, true);
307 else if (ioEngineName.startsWith("heap"))
308 return new ByteBufferIOEngine(capacity, false);
309 else
310 throw new IllegalArgumentException(
311 "Don't understand io engine name for cache - prefix with file:, heap or offheap");
312 }
313
314
315
316
317
318
319 @Override
320 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
321 cacheBlock(cacheKey, buf, false, false);
322 }
323
324
325
326
327
328
329
330
331 @Override
332 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
333 final boolean cacheDataInL1) {
334 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
335 }
336
337
338
339
340
341
342
343
344 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
345 boolean wait) {
346 if (!cacheEnabled) {
347 return;
348 }
349
350 if (backingMap.containsKey(cacheKey)) {
351 return;
352 }
353
354
355
356
357 RAMQueueEntry re =
358 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
359 if (ramCache.putIfAbsent(cacheKey, re) != null) {
360 return;
361 }
362 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
363 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
364 boolean successfulAddition = false;
365 if (wait) {
366 try {
367 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
368 } catch (InterruptedException e) {
369 Thread.currentThread().interrupt();
370 }
371 } else {
372 successfulAddition = bq.offer(re);
373 }
374 if (!successfulAddition) {
375 ramCache.remove(cacheKey);
376 cacheStats.failInsert();
377 } else {
378 this.blockNumber.incrementAndGet();
379 this.heapSize.addAndGet(cachedItem.heapSize());
380 blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
381 }
382 }
383
384
385
386
387
388
389
390
391
392 @Override
393 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
394 boolean updateCacheMetrics) {
395 if (!cacheEnabled) {
396 return null;
397 }
398 RAMQueueEntry re = ramCache.get(key);
399 if (re != null) {
400 if (updateCacheMetrics) {
401 cacheStats.hit(caching, key.isPrimary());
402 }
403 re.access(accessCount.incrementAndGet());
404 return re.getData();
405 }
406 BucketEntry bucketEntry = backingMap.get(key);
407 if (bucketEntry != null) {
408 long start = System.nanoTime();
409 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
410 try {
411 lock.readLock().lock();
412
413
414
415 if (bucketEntry.equals(backingMap.get(key))) {
416 int len = bucketEntry.getLength();
417 ByteBuffer bb = ByteBuffer.allocate(len);
418 int lenRead = ioEngine.read(bb, bucketEntry.offset());
419 if (lenRead != len) {
420 throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
421 }
422 CacheableDeserializer<Cacheable> deserializer =
423 bucketEntry.deserializerReference(this.deserialiserMap);
424 Cacheable cachedBlock = deserializer.deserialize(bb, true);
425 long timeTaken = System.nanoTime() - start;
426 if (updateCacheMetrics) {
427 cacheStats.hit(caching, key.isPrimary());
428 cacheStats.ioHit(timeTaken);
429 }
430 bucketEntry.access(accessCount.incrementAndGet());
431 if (this.ioErrorStartTime > 0) {
432 ioErrorStartTime = -1;
433 }
434 return cachedBlock;
435 }
436 } catch (IOException ioex) {
437 LOG.error("Failed reading block " + key + " from bucket cache", ioex);
438 checkIOErrorIsTolerated();
439 } finally {
440 lock.readLock().unlock();
441 }
442 }
443 if (!repeat && updateCacheMetrics) {
444 cacheStats.miss(caching, key.isPrimary());
445 }
446 return null;
447 }
448
449 @VisibleForTesting
450 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
451 bucketAllocator.freeBlock(bucketEntry.offset());
452 realCacheSize.addAndGet(-1 * bucketEntry.getLength());
453 blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
454 if (decrementBlockNumber) {
455 this.blockNumber.decrementAndGet();
456 }
457 }
458
459 @Override
460 public boolean evictBlock(BlockCacheKey cacheKey) {
461 if (!cacheEnabled) {
462 return false;
463 }
464 RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
465 if (removedBlock != null) {
466 this.blockNumber.decrementAndGet();
467 this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
468 }
469 BucketEntry bucketEntry = backingMap.get(cacheKey);
470 if (bucketEntry == null) {
471 if (removedBlock != null) {
472 cacheStats.evicted(0, cacheKey.isPrimary());
473 return true;
474 } else {
475 return false;
476 }
477 }
478 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
479 try {
480 lock.writeLock().lock();
481 if (backingMap.remove(cacheKey, bucketEntry)) {
482 blockEvicted(cacheKey, bucketEntry, removedBlock == null);
483 } else {
484 return false;
485 }
486 } finally {
487 lock.writeLock().unlock();
488 }
489 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
490 return true;
491 }
492
493
494
495
496 private static class StatisticsThread extends Thread {
497 private final BucketCache bucketCache;
498
499 public StatisticsThread(BucketCache bucketCache) {
500 super("BucketCacheStatsThread");
501 setDaemon(true);
502 this.bucketCache = bucketCache;
503 }
504
505 @Override
506 public void run() {
507 bucketCache.logStats();
508 }
509 }
510
511 public void logStats() {
512 long totalSize = bucketAllocator.getTotalSize();
513 long usedSize = bucketAllocator.getUsedSize();
514 long freeSize = totalSize - usedSize;
515 long cacheSize = getRealCacheSize();
516 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " +
517 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
518 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
519 "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
520 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
521 "accesses=" + cacheStats.getRequestCount() + ", " +
522 "hits=" + cacheStats.getHitCount() + ", " +
523 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
524 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
525 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
526 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
527 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
528 "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
529 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
530 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
531 "evictions=" + cacheStats.getEvictionCount() + ", " +
532 "evicted=" + cacheStats.getEvictedCount() + ", " +
533 "evictedPerRun=" + cacheStats.evictedPerEviction());
534 cacheStats.reset();
535 }
536
537 public long getRealCacheSize() {
538 return this.realCacheSize.get();
539 }
540
541 private long acceptableSize() {
542 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
543 }
544
545 private long singleSize() {
546 return (long) Math.floor(bucketAllocator.getTotalSize()
547 * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
548 }
549
550 private long multiSize() {
551 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
552 * DEFAULT_MIN_FACTOR);
553 }
554
555 private long memorySize() {
556 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
557 * DEFAULT_MIN_FACTOR);
558 }
559
560
561
562
563
564
565
566 private void freeSpace(final String why) {
567
568 if (!freeSpaceLock.tryLock()) return;
569 try {
570 freeInProgress = true;
571 long bytesToFreeWithoutExtra = 0;
572
573 StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null;
574 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
575 long[] bytesToFreeForBucket = new long[stats.length];
576 for (int i = 0; i < stats.length; i++) {
577 bytesToFreeForBucket[i] = 0;
578 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
579 freeGoal = Math.max(freeGoal, 1);
580 if (stats[i].freeCount() < freeGoal) {
581 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
582 bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
583 if (msgBuffer != null) {
584 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
585 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
586 }
587 }
588 }
589 if (msgBuffer != null) {
590 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
591 }
592
593 if (bytesToFreeWithoutExtra <= 0) {
594 return;
595 }
596 long currentSize = bucketAllocator.getUsedSize();
597 long totalSize=bucketAllocator.getTotalSize();
598 if (LOG.isDebugEnabled() && msgBuffer != null) {
599 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
600 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
601 StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
602 }
603
604 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
605 * (1 + DEFAULT_EXTRA_FREE_FACTOR));
606
607
608 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
609 blockSize, singleSize());
610 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
611 blockSize, multiSize());
612 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
613 blockSize, memorySize());
614
615
616
617 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
618 switch (bucketEntryWithKey.getValue().getPriority()) {
619 case SINGLE: {
620 bucketSingle.add(bucketEntryWithKey);
621 break;
622 }
623 case MULTI: {
624 bucketMulti.add(bucketEntryWithKey);
625 break;
626 }
627 case MEMORY: {
628 bucketMemory.add(bucketEntryWithKey);
629 break;
630 }
631 }
632 }
633
634 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3);
635
636 bucketQueue.add(bucketSingle);
637 bucketQueue.add(bucketMulti);
638 bucketQueue.add(bucketMemory);
639
640 int remainingBuckets = 3;
641 long bytesFreed = 0;
642
643 BucketEntryGroup bucketGroup;
644 while ((bucketGroup = bucketQueue.poll()) != null) {
645 long overflow = bucketGroup.overflow();
646 if (overflow > 0) {
647 long bucketBytesToFree = Math.min(overflow,
648 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
649 bytesFreed += bucketGroup.free(bucketBytesToFree);
650 }
651 remainingBuckets--;
652 }
653
654
655
656
657
658 stats = bucketAllocator.getIndexStatistics();
659 boolean needFreeForExtra = false;
660 for (int i = 0; i < stats.length; i++) {
661 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
662 freeGoal = Math.max(freeGoal, 1);
663 if (stats[i].freeCount() < freeGoal) {
664 needFreeForExtra = true;
665 break;
666 }
667 }
668
669 if (needFreeForExtra) {
670 bucketQueue.clear();
671 remainingBuckets = 2;
672
673 bucketQueue.add(bucketSingle);
674 bucketQueue.add(bucketMulti);
675
676 while ((bucketGroup = bucketQueue.poll()) != null) {
677 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
678 bytesFreed += bucketGroup.free(bucketBytesToFree);
679 remainingBuckets--;
680 }
681 }
682
683 if (LOG.isDebugEnabled()) {
684 long single = bucketSingle.totalSize();
685 long multi = bucketMulti.totalSize();
686 long memory = bucketMemory.totalSize();
687 if (LOG.isDebugEnabled()) {
688 LOG.debug("Bucket cache free space completed; " + "freed="
689 + StringUtils.byteDesc(bytesFreed) + ", " + "total="
690 + StringUtils.byteDesc(totalSize) + ", " + "single="
691 + StringUtils.byteDesc(single) + ", " + "multi="
692 + StringUtils.byteDesc(multi) + ", " + "memory="
693 + StringUtils.byteDesc(memory));
694 }
695 }
696
697 } finally {
698 cacheStats.evict();
699 freeInProgress = false;
700 freeSpaceLock.unlock();
701 }
702 }
703
704
705 @VisibleForTesting
706 class WriterThread extends HasThread {
707 private final BlockingQueue<RAMQueueEntry> inputQueue;
708 private volatile boolean writerEnabled = true;
709
710 WriterThread(BlockingQueue<RAMQueueEntry> queue) {
711 this.inputQueue = queue;
712 }
713
714
715 @VisibleForTesting
716 void disableWriter() {
717 this.writerEnabled = false;
718 }
719
720 public void run() {
721 List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
722 try {
723 while (cacheEnabled && writerEnabled) {
724 try {
725 try {
726
727 entries = getRAMQueueEntries(inputQueue, entries);
728 } catch (InterruptedException ie) {
729 if (!cacheEnabled) break;
730 }
731 doDrain(entries);
732 } catch (Exception ioe) {
733 LOG.error("WriterThread encountered error", ioe);
734 }
735 }
736 } catch (Throwable t) {
737 LOG.warn("Failed doing drain", t);
738 }
739 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
740 }
741
742
743
744
745
746
747
748
749
750 @VisibleForTesting
751 void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
752 if (entries.isEmpty()) {
753 return;
754 }
755
756
757
758
759
760
761 final int size = entries.size();
762 BucketEntry[] bucketEntries = new BucketEntry[size];
763
764
765 int index = 0;
766 while (cacheEnabled && index < size) {
767 RAMQueueEntry re = null;
768 try {
769 re = entries.get(index);
770 if (re == null) {
771 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
772 index++;
773 continue;
774 }
775 BucketEntry bucketEntry =
776 re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
777
778 bucketEntries[index] = bucketEntry;
779 if (ioErrorStartTime > 0) {
780 ioErrorStartTime = -1;
781 }
782 index++;
783 } catch (BucketAllocatorException fle) {
784 LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
785
786 bucketEntries[index] = null;
787 index++;
788 } catch (CacheFullException cfe) {
789
790 if (!freeInProgress) {
791 freeSpace("Full!");
792 } else {
793 Thread.sleep(50);
794 }
795 } catch (IOException ioex) {
796
797 LOG.error("Failed writing to bucket cache", ioex);
798 checkIOErrorIsTolerated();
799 }
800 }
801
802
803 try {
804 ioEngine.sync();
805 } catch (IOException ioex) {
806 LOG.error("Failed syncing IO engine", ioex);
807 checkIOErrorIsTolerated();
808
809 for (int i = 0; i < entries.size(); ++i) {
810 if (bucketEntries[i] != null) {
811 bucketAllocator.freeBlock(bucketEntries[i].offset());
812 bucketEntries[i] = null;
813 }
814 }
815 }
816
817
818
819 for (int i = 0; i < size; ++i) {
820 BlockCacheKey key = entries.get(i).getKey();
821
822 if (bucketEntries[i] != null) {
823 backingMap.put(key, bucketEntries[i]);
824 }
825
826 RAMQueueEntry ramCacheEntry = ramCache.remove(key);
827 if (ramCacheEntry != null) {
828 heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
829 } else if (bucketEntries[i] != null){
830
831 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
832 try {
833 lock.writeLock().lock();
834 if (backingMap.remove(key, bucketEntries[i])) {
835 blockEvicted(key, bucketEntries[i], false);
836 }
837 } finally {
838 lock.writeLock().unlock();
839 }
840 }
841 }
842
843 long used = bucketAllocator.getUsedSize();
844 if (used > acceptableSize()) {
845 freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
846 }
847 return;
848 }
849 }
850
851
852
853
854
855
856
857
858
859 @VisibleForTesting
860 static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
861 final List<RAMQueueEntry> receptical)
862 throws InterruptedException {
863
864
865 receptical.clear();
866 receptical.add(q.take());
867 q.drainTo(receptical);
868 return receptical;
869 }
870
871 private void persistToFile() throws IOException {
872 assert !cacheEnabled;
873 FileOutputStream fos = null;
874 ObjectOutputStream oos = null;
875 try {
876 if (!ioEngine.isPersistent())
877 throw new IOException(
878 "Attempt to persist non-persistent cache mappings!");
879 fos = new FileOutputStream(persistencePath, false);
880 oos = new ObjectOutputStream(fos);
881 oos.writeLong(cacheCapacity);
882 oos.writeUTF(ioEngine.getClass().getName());
883 oos.writeUTF(backingMap.getClass().getName());
884 oos.writeObject(deserialiserMap);
885 oos.writeObject(backingMap);
886 } finally {
887 if (oos != null) oos.close();
888 if (fos != null) fos.close();
889 }
890 }
891
892 @SuppressWarnings("unchecked")
893 private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
894 ClassNotFoundException {
895 File persistenceFile = new File(persistencePath);
896 if (!persistenceFile.exists()) {
897 return;
898 }
899 assert !cacheEnabled;
900 FileInputStream fis = null;
901 ObjectInputStream ois = null;
902 try {
903 if (!ioEngine.isPersistent())
904 throw new IOException(
905 "Attempt to restore non-persistent cache mappings!");
906 fis = new FileInputStream(persistencePath);
907 ois = new ObjectInputStream(fis);
908 long capacitySize = ois.readLong();
909 if (capacitySize != cacheCapacity)
910 throw new IOException("Mismatched cache capacity:"
911 + StringUtils.byteDesc(capacitySize) + ", expected: "
912 + StringUtils.byteDesc(cacheCapacity));
913 String ioclass = ois.readUTF();
914 String mapclass = ois.readUTF();
915 if (!ioEngine.getClass().getName().equals(ioclass))
916 throw new IOException("Class name for IO engine mismatch: " + ioclass
917 + ", expected:" + ioEngine.getClass().getName());
918 if (!backingMap.getClass().getName().equals(mapclass))
919 throw new IOException("Class name for cache map mismatch: " + mapclass
920 + ", expected:" + backingMap.getClass().getName());
921 UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
922 .readObject();
923 BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
924 backingMap, realCacheSize);
925 backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
926 .readObject();
927 bucketAllocator = allocator;
928 deserialiserMap = deserMap;
929 } finally {
930 if (ois != null) ois.close();
931 if (fis != null) fis.close();
932 if (!persistenceFile.delete()) {
933 throw new IOException("Failed deleting persistence file "
934 + persistenceFile.getAbsolutePath());
935 }
936 }
937 }
938
939
940
941
942
943
944 private void checkIOErrorIsTolerated() {
945 long now = EnvironmentEdgeManager.currentTime();
946 if (this.ioErrorStartTime > 0) {
947 if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
948 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
949 "ms, disabing cache, please check your IOEngine");
950 disableCache();
951 }
952 } else {
953 this.ioErrorStartTime = now;
954 }
955 }
956
957
958
959
960
961 private void disableCache() {
962 if (!cacheEnabled)
963 return;
964 cacheEnabled = false;
965 ioEngine.shutdown();
966 this.scheduleThreadPool.shutdown();
967 for (int i = 0; i < writerThreads.length; ++i)
968 writerThreads[i].interrupt();
969 this.ramCache.clear();
970 if (!ioEngine.isPersistent() || persistencePath == null) {
971 this.backingMap.clear();
972 }
973 }
974
975 private void join() throws InterruptedException {
976 for (int i = 0; i < writerThreads.length; ++i)
977 writerThreads[i].join();
978 }
979
980 @Override
981 public void shutdown() {
982 disableCache();
983 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
984 + "; path to write=" + persistencePath);
985 if (ioEngine.isPersistent() && persistencePath != null) {
986 try {
987 join();
988 persistToFile();
989 } catch (IOException ex) {
990 LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
991 } catch (InterruptedException e) {
992 LOG.warn("Failed to persist data on exit", e);
993 }
994 }
995 }
996
997 @Override
998 public CacheStats getStats() {
999 return cacheStats;
1000 }
1001
1002 public BucketAllocator getAllocator() {
1003 return this.bucketAllocator;
1004 }
1005
1006 @Override
1007 public long heapSize() {
1008 return this.heapSize.get();
1009 }
1010
1011 @Override
1012 public long size() {
1013 return this.realCacheSize.get();
1014 }
1015
1016 @Override
1017 public long getFreeSize() {
1018 return this.bucketAllocator.getFreeSize();
1019 }
1020
1021 @Override
1022 public long getBlockCount() {
1023 return this.blockNumber.get();
1024 }
1025
1026 @Override
1027 public long getCurrentSize() {
1028 return this.bucketAllocator.getUsedSize();
1029 }
1030
1031
1032
1033
1034
1035
1036
1037
1038 @Override
1039 public int evictBlocksByHfileName(String hfileName) {
1040
1041
1042 Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
1043 if (keySet == null) {
1044 return 0;
1045 }
1046 int numEvicted = 0;
1047 List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
1048 for (BlockCacheKey key : keysForHFile) {
1049 if (evictBlock(key)) {
1050 ++numEvicted;
1051 }
1052 }
1053
1054 return numEvicted;
1055 }
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065 static class BucketEntry implements Serializable {
1066 private static final long serialVersionUID = -6741504807982257534L;
1067
1068
1069 static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
1070
1071 @Override
1072 public int compare(BucketEntry o1, BucketEntry o2) {
1073 long accessCounter1 = o1.accessCounter;
1074 long accessCounter2 = o2.accessCounter;
1075 return accessCounter1 < accessCounter2 ? 1 : accessCounter1 == accessCounter2 ? 0 : -1;
1076 }
1077 };
1078
1079 private int offsetBase;
1080 private int length;
1081 private byte offset1;
1082 byte deserialiserIndex;
1083 private volatile long accessCounter;
1084 private BlockPriority priority;
1085
1086
1087
1088 private final long cachedTime = System.nanoTime();
1089
1090 BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1091 setOffset(offset);
1092 this.length = length;
1093 this.accessCounter = accessCounter;
1094 if (inMemory) {
1095 this.priority = BlockPriority.MEMORY;
1096 } else {
1097 this.priority = BlockPriority.SINGLE;
1098 }
1099 }
1100
1101 long offset() {
1102 long o = ((long) offsetBase) & 0xFFFFFFFF;
1103 o += (((long) (offset1)) & 0xFF) << 32;
1104 return o << 8;
1105 }
1106
1107 private void setOffset(long value) {
1108 assert (value & 0xFF) == 0;
1109 value >>= 8;
1110 offsetBase = (int) value;
1111 offset1 = (byte) (value >> 32);
1112 }
1113
1114 public int getLength() {
1115 return length;
1116 }
1117
1118 protected CacheableDeserializer<Cacheable> deserializerReference(
1119 UniqueIndexMap<Integer> deserialiserMap) {
1120 return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1121 .unmap(deserialiserIndex));
1122 }
1123
1124 protected void setDeserialiserReference(
1125 CacheableDeserializer<Cacheable> deserializer,
1126 UniqueIndexMap<Integer> deserialiserMap) {
1127 this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1128 .getDeserialiserIdentifier()));
1129 }
1130
1131
1132
1133
1134 public void access(long accessCounter) {
1135 this.accessCounter = accessCounter;
1136 if (this.priority == BlockPriority.SINGLE) {
1137 this.priority = BlockPriority.MULTI;
1138 }
1139 }
1140
1141 public BlockPriority getPriority() {
1142 return this.priority;
1143 }
1144
1145 public long getCachedTime() {
1146 return cachedTime;
1147 }
1148 }
1149
1150
1151
1152
1153
1154
1155
1156 private class BucketEntryGroup implements Comparable<BucketEntryGroup> {
1157
1158 private CachedEntryQueue queue;
1159 private long totalSize = 0;
1160 private long bucketSize;
1161
1162 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1163 this.bucketSize = bucketSize;
1164 queue = new CachedEntryQueue(bytesToFree, blockSize);
1165 totalSize = 0;
1166 }
1167
1168 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1169 totalSize += block.getValue().getLength();
1170 queue.add(block);
1171 }
1172
1173 public long free(long toFree) {
1174 Map.Entry<BlockCacheKey, BucketEntry> entry;
1175 long freedBytes = 0;
1176 while ((entry = queue.pollLast()) != null) {
1177 evictBlock(entry.getKey());
1178 freedBytes += entry.getValue().getLength();
1179 if (freedBytes >= toFree) {
1180 return freedBytes;
1181 }
1182 }
1183 return freedBytes;
1184 }
1185
1186 public long overflow() {
1187 return totalSize - bucketSize;
1188 }
1189
1190 public long totalSize() {
1191 return totalSize;
1192 }
1193
1194 @Override
1195 public int compareTo(BucketEntryGroup that) {
1196 if (this.overflow() == that.overflow())
1197 return 0;
1198 return this.overflow() > that.overflow() ? 1 : -1;
1199 }
1200
1201 @Override
1202 public boolean equals(Object that) {
1203 return this == that;
1204 }
1205
1206 }
1207
1208
1209
1210
1211 @VisibleForTesting
1212 static class RAMQueueEntry {
1213 private BlockCacheKey key;
1214 private Cacheable data;
1215 private long accessCounter;
1216 private boolean inMemory;
1217
1218 public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
1219 boolean inMemory) {
1220 this.key = bck;
1221 this.data = data;
1222 this.accessCounter = accessCounter;
1223 this.inMemory = inMemory;
1224 }
1225
1226 public Cacheable getData() {
1227 return data;
1228 }
1229
1230 public BlockCacheKey getKey() {
1231 return key;
1232 }
1233
1234 public void access(long accessCounter) {
1235 this.accessCounter = accessCounter;
1236 }
1237
1238 public BucketEntry writeToCache(final IOEngine ioEngine,
1239 final BucketAllocator bucketAllocator,
1240 final UniqueIndexMap<Integer> deserialiserMap,
1241 final AtomicLong realCacheSize) throws CacheFullException, IOException,
1242 BucketAllocatorException {
1243 int len = data.getSerializedLength();
1244
1245 if (len == 0) return null;
1246 long offset = bucketAllocator.allocateBlock(len);
1247 BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
1248 bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1249 try {
1250 if (data instanceof HFileBlock) {
1251 HFileBlock block = (HFileBlock) data;
1252 ByteBuffer sliceBuf = block.getBufferReadOnlyWithHeader();
1253 sliceBuf.rewind();
1254 assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
1255 len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1256 ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
1257 block.serializeExtraInfo(extraInfoBuffer);
1258 ioEngine.write(sliceBuf, offset);
1259 ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
1260 } else {
1261 ByteBuffer bb = ByteBuffer.allocate(len);
1262 data.serialize(bb);
1263 ioEngine.write(bb, offset);
1264 }
1265 } catch (IOException ioe) {
1266
1267 bucketAllocator.freeBlock(offset);
1268 throw ioe;
1269 }
1270
1271 realCacheSize.addAndGet(len);
1272 return bucketEntry;
1273 }
1274 }
1275
1276
1277
1278
1279
1280 void stopWriterThreads() throws InterruptedException {
1281 for (WriterThread writerThread : writerThreads) {
1282 writerThread.disableWriter();
1283 writerThread.interrupt();
1284 writerThread.join();
1285 }
1286 }
1287
1288 @Override
1289 public Iterator<CachedBlock> iterator() {
1290
1291 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1292 this.backingMap.entrySet().iterator();
1293 return new Iterator<CachedBlock>() {
1294 private final long now = System.nanoTime();
1295
1296 @Override
1297 public boolean hasNext() {
1298 return i.hasNext();
1299 }
1300
1301 @Override
1302 public CachedBlock next() {
1303 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1304 return new CachedBlock() {
1305 @Override
1306 public String toString() {
1307 return BlockCacheUtil.toString(this, now);
1308 }
1309
1310 @Override
1311 public BlockPriority getBlockPriority() {
1312 return e.getValue().getPriority();
1313 }
1314
1315 @Override
1316 public BlockType getBlockType() {
1317
1318 return null;
1319 }
1320
1321 @Override
1322 public long getOffset() {
1323 return e.getKey().getOffset();
1324 }
1325
1326 @Override
1327 public long getSize() {
1328 return e.getValue().getLength();
1329 }
1330
1331 @Override
1332 public long getCachedTime() {
1333 return e.getValue().getCachedTime();
1334 }
1335
1336 @Override
1337 public String getFilename() {
1338 return e.getKey().getHfileName();
1339 }
1340
1341 @Override
1342 public int compareTo(CachedBlock other) {
1343 int diff = this.getFilename().compareTo(other.getFilename());
1344 if (diff != 0) return diff;
1345 diff = (int)(this.getOffset() - other.getOffset());
1346 if (diff != 0) return diff;
1347 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1348 throw new IllegalStateException("" + this.getCachedTime() + ", " +
1349 other.getCachedTime());
1350 }
1351 return (int)(other.getCachedTime() - this.getCachedTime());
1352 }
1353
1354 @Override
1355 public int hashCode() {
1356 return e.getKey().hashCode();
1357 }
1358
1359 @Override
1360 public boolean equals(Object obj) {
1361 if (obj instanceof CachedBlock) {
1362 CachedBlock cb = (CachedBlock)obj;
1363 return compareTo(cb) == 0;
1364 } else {
1365 return false;
1366 }
1367 }
1368 };
1369 }
1370
1371 @Override
1372 public void remove() {
1373 throw new UnsupportedOperationException();
1374 }
1375 };
1376 }
1377
1378 @Override
1379 public BlockCache[] getBlockCaches() {
1380 return null;
1381 }
1382 }