1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.DataInput;
23 import java.io.DataInputStream;
24 import java.io.DataOutput;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.nio.ByteBuffer;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FSDataOutputStream;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.KeyValue;
39 import org.apache.hadoop.hbase.KeyValue.KVComparator;
40 import org.apache.hadoop.hbase.KeyValueUtil;
41 import org.apache.hadoop.hbase.classification.InterfaceAudience;
42 import org.apache.hadoop.hbase.io.HeapSize;
43 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44 import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
45 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
46 import org.apache.hadoop.hbase.util.ByteBufferUtils;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.ClassSize;
49 import org.apache.hadoop.io.WritableUtils;
50 import org.apache.hadoop.util.StringUtils;
51
52
53
54
55
56
57
58
59
60
61
62
63 @InterfaceAudience.Private
64 public class HFileBlockIndex {
65
66 private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
67
68 static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
69
70
71
72
73
74 public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
75
76
77
78
79
80
81
82 static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
83 + Bytes.SIZEOF_LONG;
84
85
86
87
88 private static final String INLINE_BLOCKS_NOT_ALLOWED =
89 "Inline blocks are not allowed in the single-level-only mode";
90
91
92
93
94
95
96
97 private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
98 2 * Bytes.SIZEOF_INT;
99
100
101
102
103
104
105
106
107
108
109
110 public static class BlockIndexReader implements HeapSize {
111
112 private final KVComparator comparator;
113
114
115 private byte[][] blockKeys;
116 private long[] blockOffsets;
117 private int[] blockDataSizes;
118 private int rootCount = 0;
119
120
121 private long midLeafBlockOffset = -1;
122 private int midLeafBlockOnDiskSize = -1;
123 private int midKeyEntry = -1;
124
125
126 private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
127
128
129
130
131
132 private int searchTreeLevel;
133
134
135 private CachingBlockReader cachingBlockReader;
136
137 public BlockIndexReader(final KVComparator c, final int treeLevel,
138 final CachingBlockReader cachingBlockReader) {
139 this(c, treeLevel);
140 this.cachingBlockReader = cachingBlockReader;
141 }
142
143 public BlockIndexReader(final KVComparator c, final int treeLevel)
144 {
145 comparator = c;
146 searchTreeLevel = treeLevel;
147 }
148
149
150
151
152 public boolean isEmpty() {
153 return blockKeys.length == 0;
154 }
155
156
157
158
159
160 public void ensureNonEmpty() {
161 if (blockKeys.length == 0) {
162 throw new IllegalStateException("Block index is empty or not loaded");
163 }
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181 public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
182 boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
183 throws IOException {
184 BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
185 cacheBlocks,
186 pread, isCompaction, expectedDataBlockEncoding);
187 if (blockWithScanInfo == null) {
188 return null;
189 } else {
190 return blockWithScanInfo.getHFileBlock();
191 }
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
214 boolean cacheBlocks,
215 boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
216 throws IOException {
217 int rootLevelIndex = rootBlockContainingKey(key);
218 if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
219 return null;
220 }
221
222
223 Cell nextIndexedKey = null;
224
225
226 long currentOffset = blockOffsets[rootLevelIndex];
227 int currentOnDiskSize = blockDataSizes[rootLevelIndex];
228
229 if (rootLevelIndex < blockKeys.length - 1) {
230 nextIndexedKey = new KeyValue.KeyOnlyKeyValue(blockKeys[rootLevelIndex + 1]);
231 } else {
232 nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
233 }
234
235 int lookupLevel = 1;
236 int index = -1;
237
238 HFileBlock block;
239 while (true) {
240
241 if (currentBlock != null && currentBlock.getOffset() == currentOffset)
242 {
243
244
245
246
247 block = currentBlock;
248 } else {
249
250
251 boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
252 BlockType expectedBlockType;
253 if (lookupLevel < searchTreeLevel - 1) {
254 expectedBlockType = BlockType.INTERMEDIATE_INDEX;
255 } else if (lookupLevel == searchTreeLevel - 1) {
256 expectedBlockType = BlockType.LEAF_INDEX;
257 } else {
258
259 expectedBlockType = BlockType.DATA;
260 }
261 block = cachingBlockReader.readBlock(currentOffset,
262 currentOnDiskSize, shouldCache, pread, isCompaction, true,
263 expectedBlockType, expectedDataBlockEncoding);
264 }
265
266 if (block == null) {
267 throw new IOException("Failed to read block at offset " +
268 currentOffset + ", onDiskSize=" + currentOnDiskSize);
269 }
270
271
272 if (block.getBlockType().isData()) {
273 break;
274 }
275
276
277
278 if (++lookupLevel > searchTreeLevel) {
279 throw new IOException("Search Tree Level overflow: lookupLevel="+
280 lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
281 }
282
283
284
285 ByteBuffer buffer = block.getBufferWithoutHeader();
286 index = locateNonRootIndexEntry(buffer, key, comparator);
287 if (index == -1) {
288
289
290 KeyValue kv = KeyValueUtil.ensureKeyValue(key);
291 throw new IOException("The key "
292 + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
293 + " is before the" + " first key of the non-root index block "
294 + block);
295 }
296
297 currentOffset = buffer.getLong();
298 currentOnDiskSize = buffer.getInt();
299
300
301 byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1);
302 if (tmpNextIndexedKey != null) {
303 nextIndexedKey = new KeyValue.KeyOnlyKeyValue(tmpNextIndexedKey);
304 }
305 }
306
307 if (lookupLevel != searchTreeLevel) {
308 throw new IOException("Reached a data block at level " + lookupLevel +
309 " but the number of levels is " + searchTreeLevel);
310 }
311
312
313 BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
314 return blockWithScanInfo;
315 }
316
317
318
319
320
321
322
323
324 public byte[] midkey() throws IOException {
325 if (rootCount == 0)
326 throw new IOException("HFile empty");
327
328 byte[] targetMidKey = this.midKey.get();
329 if (targetMidKey != null) {
330 return targetMidKey;
331 }
332
333 if (midLeafBlockOffset >= 0) {
334 if (cachingBlockReader == null) {
335 throw new IOException("Have to read the middle leaf block but " +
336 "no block reader available");
337 }
338
339
340 HFileBlock midLeafBlock = cachingBlockReader.readBlock(
341 midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
342 BlockType.LEAF_INDEX, null);
343
344 ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
345 int numDataBlocks = b.getInt();
346 int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
347 int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
348 keyRelOffset;
349 int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
350 + SECONDARY_INDEX_ENTRY_OVERHEAD;
351 targetMidKey = ByteBufferUtils.toBytes(b, keyOffset, keyLen);
352 } else {
353
354 targetMidKey = blockKeys[rootCount / 2];
355 }
356
357 this.midKey.set(targetMidKey);
358 return targetMidKey;
359 }
360
361
362
363
364 public byte[] getRootBlockKey(int i) {
365 return blockKeys[i];
366 }
367
368
369
370
371 public long getRootBlockOffset(int i) {
372 return blockOffsets[i];
373 }
374
375
376
377
378
379
380 public int getRootBlockDataSize(int i) {
381 return blockDataSizes[i];
382 }
383
384
385
386
387 public int getRootBlockCount() {
388 return rootCount;
389 }
390
391
392
393
394
395
396
397
398
399
400 public int rootBlockContainingKey(final byte[] key, int offset, int length) {
401 int pos = Bytes.binarySearch(blockKeys, key, offset, length, comparator);
402
403
404
405 if (pos >= 0) {
406
407 assert pos < blockKeys.length;
408 return pos;
409 }
410
411
412
413
414
415
416 int i = -pos - 1;
417 assert 0 <= i && i <= blockKeys.length;
418 return i - 1;
419 }
420
421
422
423
424
425
426
427 public int rootBlockContainingKey(final Cell key) {
428 int pos = Bytes.binarySearch(blockKeys, key, comparator);
429
430
431
432 if (pos >= 0) {
433
434 assert pos < blockKeys.length;
435 return pos;
436 }
437
438
439
440
441
442
443 int i = -pos - 1;
444 assert 0 <= i && i <= blockKeys.length;
445 return i - 1;
446 }
447
448
449
450
451
452
453
454
455 private void add(final byte[] key, final long offset, final int dataSize) {
456 blockOffsets[rootCount] = offset;
457 blockKeys[rootCount] = key;
458 blockDataSizes[rootCount] = dataSize;
459 rootCount++;
460 }
461
462
463
464
465
466
467
468 private byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
469 int numEntries = nonRootIndex.getInt(0);
470 if (i < 0 || i >= numEntries) {
471 return null;
472 }
473
474
475
476 int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
477
478 int targetKeyRelOffset = nonRootIndex.getInt(
479 Bytes.SIZEOF_INT * (i + 1));
480
481
482 int targetKeyOffset = entriesOffset
483 + targetKeyRelOffset
484 + SECONDARY_INDEX_ENTRY_OVERHEAD;
485
486
487
488
489 int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
490 targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
491
492 return ByteBufferUtils.toBytes(nonRootIndex, targetKeyOffset, targetKeyLength);
493 }
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511 static int binarySearchNonRootIndex(Cell key, ByteBuffer nonRootIndex,
512 KVComparator comparator) {
513
514 int numEntries = nonRootIndex.getInt(0);
515 int low = 0;
516 int high = numEntries - 1;
517 int mid = 0;
518
519
520
521 int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
522
523
524
525
526 KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
527 while (low <= high) {
528 mid = (low + high) >>> 1;
529
530
531 int midKeyRelOffset = nonRootIndex.getInt(
532 Bytes.SIZEOF_INT * (mid + 1));
533
534
535 int midKeyOffset = entriesOffset
536 + midKeyRelOffset
537 + SECONDARY_INDEX_ENTRY_OVERHEAD;
538
539
540
541
542 int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
543 midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
544
545
546
547
548
549 nonRootIndexKV.setKey(nonRootIndex.array(),
550 nonRootIndex.arrayOffset() + midKeyOffset, midLength);
551 int cmp = comparator.compareOnlyKeyPortion(key, nonRootIndexKV);
552
553
554 if (cmp > 0)
555 low = mid + 1;
556
557 else if (cmp < 0)
558 high = mid - 1;
559 else
560 return mid;
561 }
562
563
564
565
566
567 if (low != high + 1) {
568 throw new IllegalStateException("Binary search broken: low=" + low
569 + " " + "instead of " + (high + 1));
570 }
571
572
573
574 int i = low - 1;
575
576
577 if (i < -1 || i >= numEntries) {
578 throw new IllegalStateException("Binary search broken: result is " +
579 i + " but expected to be between -1 and (numEntries - 1) = " +
580 (numEntries - 1));
581 }
582
583 return i;
584 }
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600 static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, Cell key,
601 KVComparator comparator) {
602 int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
603
604 if (entryIndex != -1) {
605 int numEntries = nonRootBlock.getInt(0);
606
607
608 int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
609
610
611
612 int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT * (1 + entryIndex));
613
614 nonRootBlock.position(entriesOffset + entryRelOffset);
615 }
616
617 return entryIndex;
618 }
619
620
621
622
623
624
625
626
627
628
629
630 public void readRootIndex(DataInput in, final int numEntries)
631 throws IOException {
632 blockOffsets = new long[numEntries];
633 blockKeys = new byte[numEntries][];
634 blockDataSizes = new int[numEntries];
635
636
637 if (numEntries > 0) {
638 for (int i = 0; i < numEntries; ++i) {
639 long offset = in.readLong();
640 int dataSize = in.readInt();
641 byte[] key = Bytes.readByteArray(in);
642 add(key, offset, dataSize);
643 }
644 }
645 }
646
647
648
649
650
651
652
653
654
655
656
657
658 public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
659 DataInputStream in = blk.getByteStream();
660 readRootIndex(in, numEntries);
661 return in;
662 }
663
664
665
666
667
668
669
670
671
672
673 public void readMultiLevelIndexRoot(HFileBlock blk,
674 final int numEntries) throws IOException {
675 DataInputStream in = readRootIndex(blk, numEntries);
676
677
678 int checkSumBytes = blk.totalChecksumBytes();
679 if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
680
681 return;
682 }
683 midLeafBlockOffset = in.readLong();
684 midLeafBlockOnDiskSize = in.readInt();
685 midKeyEntry = in.readInt();
686 }
687
688 @Override
689 public String toString() {
690 StringBuilder sb = new StringBuilder();
691 sb.append("size=" + rootCount).append("\n");
692 for (int i = 0; i < rootCount; i++) {
693 sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
694 .append("\n offset=").append(blockOffsets[i])
695 .append(", dataSize=" + blockDataSizes[i]).append("\n");
696 }
697 return sb.toString();
698 }
699
700 @Override
701 public long heapSize() {
702 long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
703 2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
704
705
706 heapSize += MID_KEY_METADATA_SIZE;
707
708
709 if (blockKeys != null) {
710
711 heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
712 * ClassSize.REFERENCE);
713
714
715 for (byte[] key : blockKeys) {
716 heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
717 }
718 }
719
720 if (blockOffsets != null) {
721 heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
722 * Bytes.SIZEOF_LONG);
723 }
724
725 if (blockDataSizes != null) {
726 heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
727 * Bytes.SIZEOF_INT);
728 }
729
730 return ClassSize.align(heapSize);
731 }
732
733 }
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750 public static class BlockIndexWriter implements InlineBlockWriter {
751
752
753
754
755
756
757
758
759
760 private BlockIndexChunk rootChunk = new BlockIndexChunk();
761
762
763
764
765
766 private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
767
768
769
770
771
772
773
774
775
776
777 private int numLevels = 1;
778
779 private HFileBlock.Writer blockWriter;
780 private byte[] firstKey = null;
781
782
783
784
785
786
787 private long totalNumEntries;
788
789
790 private long totalBlockOnDiskSize;
791
792
793 private long totalBlockUncompressedSize;
794
795
796 private int maxChunkSize;
797
798
799 private boolean singleLevelOnly;
800
801
802 private CacheConfig cacheConf;
803
804
805 private String nameForCaching;
806
807
808 public BlockIndexWriter() {
809 this(null, null, null);
810 singleLevelOnly = true;
811 }
812
813
814
815
816
817
818
819 public BlockIndexWriter(HFileBlock.Writer blockWriter,
820 CacheConfig cacheConf, String nameForCaching) {
821 if ((cacheConf == null) != (nameForCaching == null)) {
822 throw new IllegalArgumentException("Block cache and file name for " +
823 "caching must be both specified or both null");
824 }
825
826 this.blockWriter = blockWriter;
827 this.cacheConf = cacheConf;
828 this.nameForCaching = nameForCaching;
829 this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
830 }
831
832 public void setMaxChunkSize(int maxChunkSize) {
833 if (maxChunkSize <= 0) {
834 throw new IllegalArgumentException("Invald maximum index block size");
835 }
836 this.maxChunkSize = maxChunkSize;
837 }
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857 public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
858 if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
859 throw new IOException("Trying to write a multi-level block index, " +
860 "but are " + curInlineChunk.getNumEntries() + " entries in the " +
861 "last inline chunk.");
862 }
863
864
865
866 byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
867 : null;
868
869 if (curInlineChunk != null) {
870 while (rootChunk.getRootSize() > maxChunkSize) {
871 rootChunk = writeIntermediateLevel(out, rootChunk);
872 numLevels += 1;
873 }
874 }
875
876
877 long rootLevelIndexPos = out.getPos();
878
879 {
880 DataOutput blockStream =
881 blockWriter.startWriting(BlockType.ROOT_INDEX);
882 rootChunk.writeRoot(blockStream);
883 if (midKeyMetadata != null)
884 blockStream.write(midKeyMetadata);
885 blockWriter.writeHeaderAndData(out);
886 }
887
888
889 totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
890 totalBlockUncompressedSize +=
891 blockWriter.getUncompressedSizeWithoutHeader();
892
893 if (LOG.isTraceEnabled()) {
894 LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
895 + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
896 + " root-level entries, " + totalNumEntries + " total entries, "
897 + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
898 " on-disk size, "
899 + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
900 " total uncompressed size.");
901 }
902 return rootLevelIndexPos;
903 }
904
905
906
907
908
909
910
911
912
913
914
915 public void writeSingleLevelIndex(DataOutput out, String description)
916 throws IOException {
917 expectNumLevels(1);
918
919 if (!singleLevelOnly)
920 throw new IOException("Single-level mode is turned off");
921
922 if (rootChunk.getNumEntries() > 0)
923 throw new IOException("Root-level entries already added in " +
924 "single-level mode");
925
926 rootChunk = curInlineChunk;
927 curInlineChunk = new BlockIndexChunk();
928
929 if (LOG.isTraceEnabled()) {
930 LOG.trace("Wrote a single-level " + description + " index with "
931 + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
932 + " bytes");
933 }
934 rootChunk.writeRoot(out);
935 }
936
937
938
939
940
941
942
943
944
945
946
947
948
949 private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
950 BlockIndexChunk currentLevel) throws IOException {
951
952 BlockIndexChunk parent = new BlockIndexChunk();
953
954
955 BlockIndexChunk curChunk = new BlockIndexChunk();
956
957 for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
958 curChunk.add(currentLevel.getBlockKey(i),
959 currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
960
961 if (curChunk.getRootSize() >= maxChunkSize)
962 writeIntermediateBlock(out, parent, curChunk);
963 }
964
965 if (curChunk.getNumEntries() > 0) {
966 writeIntermediateBlock(out, parent, curChunk);
967 }
968
969 return parent;
970 }
971
972 private void writeIntermediateBlock(FSDataOutputStream out,
973 BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
974 long beginOffset = out.getPos();
975 DataOutputStream dos = blockWriter.startWriting(
976 BlockType.INTERMEDIATE_INDEX);
977 curChunk.writeNonRoot(dos);
978 byte[] curFirstKey = curChunk.getBlockKey(0);
979 blockWriter.writeHeaderAndData(out);
980
981 if (cacheConf != null) {
982 HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
983 cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
984 beginOffset), blockForCaching);
985 }
986
987
988 totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
989 totalBlockUncompressedSize +=
990 blockWriter.getUncompressedSizeWithoutHeader();
991
992
993
994
995
996
997 parent.add(curFirstKey, beginOffset,
998 blockWriter.getOnDiskSizeWithHeader());
999
1000
1001 curChunk.clear();
1002 curFirstKey = null;
1003 }
1004
1005
1006
1007
1008 public final int getNumRootEntries() {
1009 return rootChunk.getNumEntries();
1010 }
1011
1012
1013
1014
1015 public int getNumLevels() {
1016 return numLevels;
1017 }
1018
1019 private void expectNumLevels(int expectedNumLevels) {
1020 if (numLevels != expectedNumLevels) {
1021 throw new IllegalStateException("Number of block index levels is "
1022 + numLevels + "but is expected to be " + expectedNumLevels);
1023 }
1024 }
1025
1026
1027
1028
1029
1030
1031 @Override
1032 public boolean shouldWriteBlock(boolean closing) {
1033 if (singleLevelOnly) {
1034 throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1035 }
1036
1037 if (curInlineChunk == null) {
1038 throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1039 "called with closing=true and then called again?");
1040 }
1041
1042 if (curInlineChunk.getNumEntries() == 0) {
1043 return false;
1044 }
1045
1046
1047 if (closing) {
1048 if (rootChunk.getNumEntries() == 0) {
1049
1050
1051
1052 expectNumLevels(1);
1053 rootChunk = curInlineChunk;
1054 curInlineChunk = null;
1055 return false;
1056 }
1057
1058 return true;
1059 } else {
1060 return curInlineChunk.getNonRootSize() >= maxChunkSize;
1061 }
1062 }
1063
1064
1065
1066
1067
1068
1069
1070 @Override
1071 public void writeInlineBlock(DataOutput out) throws IOException {
1072 if (singleLevelOnly)
1073 throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1074
1075
1076
1077 curInlineChunk.writeNonRoot(out);
1078
1079
1080
1081 firstKey = curInlineChunk.getBlockKey(0);
1082
1083
1084 curInlineChunk.clear();
1085 }
1086
1087
1088
1089
1090
1091 @Override
1092 public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1093
1094 totalBlockOnDiskSize += onDiskSize;
1095 totalBlockUncompressedSize += uncompressedSize;
1096
1097 if (singleLevelOnly)
1098 throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1099
1100 if (firstKey == null) {
1101 throw new IllegalStateException("Trying to add second-level index " +
1102 "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1103 "but the first key was not set in writeInlineBlock");
1104 }
1105
1106 if (rootChunk.getNumEntries() == 0) {
1107
1108 expectNumLevels(1);
1109 numLevels = 2;
1110 }
1111
1112
1113
1114 rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1115 firstKey = null;
1116 }
1117
1118 @Override
1119 public BlockType getInlineBlockType() {
1120 return BlockType.LEAF_INDEX;
1121 }
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133 public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1134 curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1135 ++totalNumEntries;
1136 }
1137
1138
1139
1140
1141 public void ensureSingleLevel() throws IOException {
1142 if (numLevels > 1) {
1143 throw new IOException ("Wrote a " + numLevels + "-level index with " +
1144 rootChunk.getNumEntries() + " root-level entries, but " +
1145 "this is expected to be a single-level block index.");
1146 }
1147 }
1148
1149
1150
1151
1152
1153
1154 @Override
1155 public boolean getCacheOnWrite() {
1156 return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1157 }
1158
1159
1160
1161
1162
1163
1164
1165 public long getTotalUncompressedSize() {
1166 return totalBlockUncompressedSize;
1167 }
1168
1169 }
1170
1171
1172
1173
1174
1175
1176 static class BlockIndexChunk {
1177
1178
1179 private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1180
1181
1182 private final List<Long> blockOffsets = new ArrayList<Long>();
1183
1184
1185 private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1186
1187
1188
1189
1190
1191
1192 private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1193
1194
1195
1196
1197
1198
1199 private int curTotalNonRootEntrySize = 0;
1200
1201
1202
1203
1204 private int curTotalRootSize = 0;
1205
1206
1207
1208
1209
1210
1211 private final List<Integer> secondaryIndexOffsetMarks =
1212 new ArrayList<Integer>();
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227 void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1228 long curTotalNumSubEntries) {
1229
1230 secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1231 curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1232 + firstKey.length;
1233
1234 curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1235 + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1236
1237 blockKeys.add(firstKey);
1238 blockOffsets.add(blockOffset);
1239 onDiskDataSizes.add(onDiskDataSize);
1240
1241 if (curTotalNumSubEntries != -1) {
1242 numSubEntriesAt.add(curTotalNumSubEntries);
1243
1244
1245 if (numSubEntriesAt.size() != blockKeys.size()) {
1246 throw new IllegalStateException("Only have key/value count " +
1247 "stats for " + numSubEntriesAt.size() + " block index " +
1248 "entries out of " + blockKeys.size());
1249 }
1250 }
1251 }
1252
1253
1254
1255
1256
1257
1258
1259 public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1260 add(firstKey, blockOffset, onDiskDataSize, -1);
1261 }
1262
1263 public void clear() {
1264 blockKeys.clear();
1265 blockOffsets.clear();
1266 onDiskDataSizes.clear();
1267 secondaryIndexOffsetMarks.clear();
1268 numSubEntriesAt.clear();
1269 curTotalNonRootEntrySize = 0;
1270 curTotalRootSize = 0;
1271 }
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290 public int getEntryBySubEntry(long k) {
1291
1292
1293
1294 int i = Collections.binarySearch(numSubEntriesAt, k);
1295
1296
1297
1298
1299 if (i >= 0)
1300 return i + 1;
1301
1302
1303 return -i - 1;
1304 }
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314 public byte[] getMidKeyMetadata() throws IOException {
1315 ByteArrayOutputStream baos = new ByteArrayOutputStream(
1316 MID_KEY_METADATA_SIZE);
1317 DataOutputStream baosDos = new DataOutputStream(baos);
1318 long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1319 if (totalNumSubEntries == 0) {
1320 throw new IOException("No leaf-level entries, mid-key unavailable");
1321 }
1322 long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1323 int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1324
1325 baosDos.writeLong(blockOffsets.get(midKeyEntry));
1326 baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1327
1328 long numSubEntriesBefore = midKeyEntry > 0
1329 ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1330 long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1331 if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1332 {
1333 throw new IOException("Could not identify mid-key index within the "
1334 + "leaf-level block containing mid-key: out of range ("
1335 + subEntryWithinEntry + ", numSubEntriesBefore="
1336 + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1337 + ")");
1338 }
1339
1340 baosDos.writeInt((int) subEntryWithinEntry);
1341
1342 if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1343 throw new IOException("Could not write mid-key metadata: size=" +
1344 baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1345 }
1346
1347
1348 baos.close();
1349
1350 return baos.toByteArray();
1351 }
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362 void writeNonRoot(DataOutput out) throws IOException {
1363
1364 out.writeInt(blockKeys.size());
1365
1366 if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1367 throw new IOException("Corrupted block index chunk writer: " +
1368 blockKeys.size() + " entries but " +
1369 secondaryIndexOffsetMarks.size() + " secondary index items");
1370 }
1371
1372
1373
1374
1375
1376 for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1377 out.writeInt(currentSecondaryIndex);
1378
1379
1380
1381 out.writeInt(curTotalNonRootEntrySize);
1382
1383 for (int i = 0; i < blockKeys.size(); ++i) {
1384 out.writeLong(blockOffsets.get(i));
1385 out.writeInt(onDiskDataSizes.get(i));
1386 out.write(blockKeys.get(i));
1387 }
1388 }
1389
1390
1391
1392
1393
1394 int getNonRootSize() {
1395 return Bytes.SIZEOF_INT
1396 + Bytes.SIZEOF_INT * (blockKeys.size() + 1)
1397 + curTotalNonRootEntrySize;
1398 }
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410 void writeRoot(DataOutput out) throws IOException {
1411 for (int i = 0; i < blockKeys.size(); ++i) {
1412 out.writeLong(blockOffsets.get(i));
1413 out.writeInt(onDiskDataSizes.get(i));
1414 Bytes.writeByteArray(out, blockKeys.get(i));
1415 }
1416 }
1417
1418
1419
1420
1421 int getRootSize() {
1422 return curTotalRootSize;
1423 }
1424
1425
1426
1427
1428 public int getNumEntries() {
1429 return blockKeys.size();
1430 }
1431
1432 public byte[] getBlockKey(int i) {
1433 return blockKeys.get(i);
1434 }
1435
1436 public long getBlockOffset(int i) {
1437 return blockOffsets.get(i);
1438 }
1439
1440 public int getOnDiskDataSize(int i) {
1441 return onDiskDataSizes.get(i);
1442 }
1443
1444 public long getCumulativeNumKV(int i) {
1445 if (i < 0)
1446 return 0;
1447 return numSubEntriesAt.get(i);
1448 }
1449
1450 }
1451
1452 public static int getMaxChunkSize(Configuration conf) {
1453 return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1454 }
1455 }