View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.nio.ByteBuffer;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.hadoop.fs.FSDataInputStream;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.fs.HFileSystem;
37  import org.apache.hadoop.hbase.io.ByteBufferInputStream;
38  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40  import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
41  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
42  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
43  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
44  import org.apache.hadoop.hbase.util.ByteBufferUtils;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ChecksumType;
47  import org.apache.hadoop.hbase.util.ClassSize;
48  import org.apache.hadoop.io.IOUtils;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.base.Preconditions;
52  
53  /**
54   * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
55   * <ul>
56   * <li>In version 1 all blocks are always compressed or uncompressed, as
57   * specified by the {@link HFile}'s compression algorithm, with a type-specific
58   * magic record stored in the beginning of the compressed data (i.e. one needs
59   * to uncompress the compressed block to determine the block type). There is
60   * only a single compression algorithm setting for all blocks. Offset and size
61   * information from the block index are required to read a block.
62   * <li>In version 2 a block is structured as follows:
63   * <ul>
64   * <li>header (see Writer#finishBlock())
65   * <ul>
66   * <li>Magic record identifying the block type (8 bytes)
67   * <li>Compressed block size, excluding header, including checksum (4 bytes)
68   * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
69   * <li>The offset of the previous block of the same type (8 bytes). This is
70   * used to be able to navigate to the previous block without going to the block
71   * <li>For minorVersions &gt;=1, the ordinal describing checksum type (1 byte)
72   * <li>For minorVersions &gt;=1, the number of data bytes/checksum chunk (4 bytes)
73   * <li>For minorVersions &gt;=1, the size of data on disk, including header,
74   * excluding checksums (4 bytes)
75   * </ul>
76   * </li>
77   * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
78   * same for all the blocks in the {@link HFile}, similarly to what was done in
79   * version 1.
80   * <li>For minorVersions &gt;=1, a series of 4 byte checksums, one each for
81   * the number of bytes specified by bytesPerChecksum.
82   * </ul>
83   * </ul>
84   */
85  @InterfaceAudience.Private
86  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="HE_EQUALS_USE_HASHCODE",
87    justification="Fix!!! Fine for now bug FIXXXXXXX!!!!")
88  public class HFileBlock implements Cacheable {
89  
90    /**
91     * On a checksum failure on a Reader, these many suceeding read
92     * requests switch back to using hdfs checksums before auto-reenabling
93     * hbase checksum verification.
94     */
95    static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
96  
97    public static final boolean FILL_HEADER = true;
98    public static final boolean DONT_FILL_HEADER = false;
99  
100   /**
101    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
102    * This extends normal header by adding the id of encoder.
103    */
104   public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
105       + DataBlockEncoding.ID_SIZE;
106 
107   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
108      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
109 
110   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
111       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
112 
113   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
114   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
115       + Bytes.SIZEOF_LONG;
116 
117   /**
118    * Each checksum value is an integer that can be stored in 4 bytes.
119    */
120   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
121 
122   static final CacheableDeserializer<Cacheable> blockDeserializer =
123       new CacheableDeserializer<Cacheable>() {
124         public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
125           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
126           ByteBuffer newByteBuffer;
127           if (reuse) {
128             newByteBuffer = buf.slice();
129           } else {
130            newByteBuffer = ByteBuffer.allocate(buf.limit());
131            newByteBuffer.put(buf);
132           }
133           buf.position(buf.limit());
134           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
135           boolean usesChecksum = buf.get() == (byte)1;
136           HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
137           hFileBlock.offset = buf.getLong();
138           hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
139           if (hFileBlock.hasNextBlockHeader()) {
140             hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
141           }
142           return hFileBlock;
143         }
144 
145         @Override
146         public int getDeserialiserIdentifier() {
147           return deserializerIdentifier;
148         }
149 
150         @Override
151         public HFileBlock deserialize(ByteBuffer b) throws IOException {
152           return deserialize(b, false);
153         }
154       };
155   private static final int deserializerIdentifier;
156   static {
157     deserializerIdentifier = CacheableDeserializerIdManager
158         .registerDeserializer(blockDeserializer);
159   }
160 
161   /** Type of block. Header field 0. */
162   private BlockType blockType;
163 
164   /** Size on disk excluding header, including checksum. Header field 1. */
165   private int onDiskSizeWithoutHeader;
166 
167   /** Size of pure data. Does not include header or checksums. Header field 2. */
168   private final int uncompressedSizeWithoutHeader;
169 
170   /** The offset of the previous block on disk. Header field 3. */
171   private final long prevBlockOffset;
172 
173   /**
174    * Size on disk of header + data. Excludes checksum. Header field 6,
175    * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
176    */
177   private final int onDiskDataSizeWithHeader;
178 
179   /** The in-memory representation of the hfile block */
180   private ByteBuffer buf;
181 
182   /** Meta data that holds meta information on the hfileblock */
183   private HFileContext fileContext;
184 
185   /**
186    * The offset of this block in the file. Populated by the reader for
187    * convenience of access. This offset is not part of the block header.
188    */
189   private long offset = -1;
190 
191   /**
192    * The on-disk size of the next block, including the header, obtained by
193    * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
194    * header, or -1 if unknown.
195    */
196   private int nextBlockOnDiskSizeWithHeader = -1;
197 
198   /**
199    * Creates a new {@link HFile} block from the given fields. This constructor
200    * is mostly used when the block data has already been read and uncompressed,
201    * and is sitting in a byte buffer.
202    *
203    * @param blockType the type of this block, see {@link BlockType}
204    * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
205    * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
206    * @param prevBlockOffset see {@link #prevBlockOffset}
207    * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
208    *          uncompressed data. This
209    * @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
210    * @param offset the file offset the block was read from
211    * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
212    * @param fileContext HFile meta data
213    */
214   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
215       long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
216       int onDiskDataSizeWithHeader, HFileContext fileContext) {
217     this.blockType = blockType;
218     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
219     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
220     this.prevBlockOffset = prevBlockOffset;
221     this.buf = buf;
222     this.offset = offset;
223     this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
224     this.fileContext = fileContext;
225     if (fillHeader)
226       overwriteHeader();
227     this.buf.rewind();
228   }
229 
230   /**
231    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
232    */
233   HFileBlock(HFileBlock that) {
234     this.blockType = that.blockType;
235     this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
236     this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
237     this.prevBlockOffset = that.prevBlockOffset;
238     this.buf = that.buf.duplicate();
239     this.offset = that.offset;
240     this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
241     this.fileContext = that.fileContext;
242     this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
243   }
244 
245   /**
246    * Creates a block from an existing buffer starting with a header. Rewinds
247    * and takes ownership of the buffer. By definition of rewind, ignores the
248    * buffer position, but if you slice the buffer beforehand, it will rewind
249    * to that point. The reason this has a minorNumber and not a majorNumber is
250    * because majorNumbers indicate the format of a HFile whereas minorNumbers
251    * indicate the format inside a HFileBlock.
252    */
253   HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
254     b.rewind();
255     blockType = BlockType.read(b);
256     onDiskSizeWithoutHeader = b.getInt();
257     uncompressedSizeWithoutHeader = b.getInt();
258     prevBlockOffset = b.getLong();
259     HFileContextBuilder contextBuilder = new HFileContextBuilder();
260     contextBuilder.withHBaseCheckSum(usesHBaseChecksum);
261     if (usesHBaseChecksum) {
262       contextBuilder.withChecksumType(ChecksumType.codeToType(b.get()));
263       contextBuilder.withBytesPerCheckSum(b.getInt());
264       this.onDiskDataSizeWithHeader = b.getInt();
265     } else {
266       contextBuilder.withChecksumType(ChecksumType.NULL);
267       contextBuilder.withBytesPerCheckSum(0);
268       this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
269                                        HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
270     }
271     this.fileContext = contextBuilder.build();
272     buf = b;
273     buf.rewind();
274   }
275 
276   public BlockType getBlockType() {
277     return blockType;
278   }
279 
280   /** @return get data block encoding id that was used to encode this block */
281   public short getDataBlockEncodingId() {
282     if (blockType != BlockType.ENCODED_DATA) {
283       throw new IllegalArgumentException("Querying encoder ID of a block " +
284           "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
285     }
286     return buf.getShort(headerSize());
287   }
288 
289   /**
290    * @return the on-disk size of header + data part + checksum.
291    */
292   public int getOnDiskSizeWithHeader() {
293     return onDiskSizeWithoutHeader + headerSize();
294   }
295 
296   /**
297    * @return the on-disk size of the data part + checksum (header excluded).
298    */
299   public int getOnDiskSizeWithoutHeader() {
300     return onDiskSizeWithoutHeader;
301   }
302 
303   /**
304    * @return the uncompressed size of data part (header and checksum excluded).
305    */
306    public int getUncompressedSizeWithoutHeader() {
307     return uncompressedSizeWithoutHeader;
308   }
309 
310   /**
311    * @return the offset of the previous block of the same type in the file, or
312    *         -1 if unknown
313    */
314   public long getPrevBlockOffset() {
315     return prevBlockOffset;
316   }
317 
318   /**
319    * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
320    * is modified as side-effect.
321    */
322   private void overwriteHeader() {
323     buf.rewind();
324     blockType.write(buf);
325     buf.putInt(onDiskSizeWithoutHeader);
326     buf.putInt(uncompressedSizeWithoutHeader);
327     buf.putLong(prevBlockOffset);
328     if (this.fileContext.isUseHBaseChecksum()) {
329       buf.put(fileContext.getChecksumType().getCode());
330       buf.putInt(fileContext.getBytesPerChecksum());
331       buf.putInt(onDiskDataSizeWithHeader);
332     }
333   }
334 
335   /**
336    * Returns a buffer that does not include the header or checksum.
337    *
338    * @return the buffer with header skipped and checksum omitted.
339    */
340   public ByteBuffer getBufferWithoutHeader() {
341     ByteBuffer dup = this.buf.duplicate();
342     dup.position(headerSize());
343     dup.limit(buf.limit() - totalChecksumBytes());
344     return dup.slice();
345   }
346 
347   /**
348    * Returns the buffer this block stores internally. The clients must not
349    * modify the buffer object. This method has to be public because it is
350    * used in {@link org.apache.hadoop.hbase.util.CompoundBloomFilter}
351    * to avoid object creation on every Bloom filter lookup, but has to
352    * be used with caution. Checksum data is not included in the returned
353    * buffer but header data is.
354    *
355    * @return the buffer of this block for read-only operations
356    */
357   public ByteBuffer getBufferReadOnly() {
358     ByteBuffer dup = this.buf.duplicate();
359     dup.limit(buf.limit() - totalChecksumBytes());
360     return dup.slice();
361   }
362 
363   /**
364    * Returns the buffer of this block, including header data. The clients must
365    * not modify the buffer object. This method has to be public because it is
366    * used in {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} to avoid buffer copy.
367    *
368    * @return the buffer with header and checksum included for read-only operations
369    */
370   public ByteBuffer getBufferReadOnlyWithHeader() {
371     ByteBuffer dup = this.buf.duplicate();
372     return dup.slice();
373   }
374 
375   /**
376    * Returns a byte buffer of this block, including header data and checksum, positioned at
377    * the beginning of header. The underlying data array is not copied.
378    *
379    * @return the byte buffer with header and checksum included
380    */
381   ByteBuffer getBufferWithHeader() {
382     ByteBuffer dupBuf = buf.duplicate();
383     dupBuf.rewind();
384     return dupBuf;
385   }
386 
387   private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
388       String fieldName) throws IOException {
389     if (valueFromBuf != valueFromField) {
390       throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
391           + ") is different from that in the field (" + valueFromField + ")");
392     }
393   }
394 
395   private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
396       throws IOException {
397     if (valueFromBuf != valueFromField) {
398       throw new IOException("Block type stored in the buffer: " +
399         valueFromBuf + ", block type field: " + valueFromField);
400     }
401   }
402 
403   /**
404    * Checks if the block is internally consistent, i.e. the first
405    * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
406    * valid header consistent with the fields. Assumes a packed block structure.
407    * This function is primary for testing and debugging, and is not
408    * thread-safe, because it alters the internal buffer pointer.
409    */
410   void sanityCheck() throws IOException {
411     buf.rewind();
412 
413     sanityCheckAssertion(BlockType.read(buf), blockType);
414 
415     sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
416         "onDiskSizeWithoutHeader");
417 
418     sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
419         "uncompressedSizeWithoutHeader");
420 
421     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
422     if (this.fileContext.isUseHBaseChecksum()) {
423       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
424       sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
425       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
426     }
427 
428     int cksumBytes = totalChecksumBytes();
429     int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
430     if (buf.limit() != expectedBufLimit) {
431       throw new AssertionError("Expected buffer limit " + expectedBufLimit
432           + ", got " + buf.limit());
433     }
434 
435     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
436     // block's header, so there are two sensible values for buffer capacity.
437     int hdrSize = headerSize();
438     if (buf.capacity() != expectedBufLimit &&
439         buf.capacity() != expectedBufLimit + hdrSize) {
440       throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
441           ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
442     }
443   }
444 
445   @Override
446   public String toString() {
447     StringBuilder sb = new StringBuilder()
448       .append("HFileBlock [")
449       .append(" fileOffset=").append(offset)
450       .append(" headerSize()=").append(headerSize())
451       .append(" blockType=").append(blockType)
452       .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
453       .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
454       .append(" prevBlockOffset=").append(prevBlockOffset)
455       .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
456     if (fileContext.isUseHBaseChecksum()) {
457       sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
458         .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
459         .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
460     } else {
461       sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
462         .append("(").append(onDiskSizeWithoutHeader)
463         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
464     }
465     String dataBegin = null;
466     if (buf.hasArray()) {
467       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
468           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
469     } else {
470       ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
471       byte[] dataBeginBytes = new byte[Math.min(32,
472           bufWithoutHeader.limit() - bufWithoutHeader.position())];
473       bufWithoutHeader.get(dataBeginBytes);
474       dataBegin = Bytes.toStringBinary(dataBeginBytes);
475     }
476     sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
477       .append(" totalChecksumBytes()=").append(totalChecksumBytes())
478       .append(" isUnpacked()=").append(isUnpacked())
479       .append(" buf=[ ").append(buf).append(" ]")
480       .append(" dataBeginsWith=").append(dataBegin)
481       .append(" fileContext=").append(fileContext)
482       .append(" ]");
483     return sb.toString();
484   }
485 
486   /**
487    * Called after reading a block with provided onDiskSizeWithHeader.
488    */
489   private void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader)
490   throws IOException {
491     if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
492       String dataBegin = null;
493       if (buf.hasArray()) {
494         dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
495       } else {
496         ByteBuffer bufDup = getBufferReadOnly();
497         byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
498         bufDup.get(dataBeginBytes);
499         dataBegin = Bytes.toStringBinary(dataBeginBytes);
500       }
501       String blockInfoMsg =
502         "Block offset: " + offset + ", data starts with: " + dataBegin;
503       throw new IOException("On-disk size without header provided is "
504           + expectedOnDiskSizeWithoutHeader + ", but block "
505           + "header contains " + onDiskSizeWithoutHeader + ". " +
506           blockInfoMsg);
507     }
508   }
509 
510   /**
511    * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
512    * encoded structure. Internal structures are shared between instances where applicable.
513    */
514   HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
515     if (!fileContext.isCompressedOrEncrypted()) {
516       // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
517       // which is used for block serialization to L2 cache, does not preserve encoding and
518       // encryption details.
519       return this;
520     }
521 
522     HFileBlock unpacked = new HFileBlock(this);
523     unpacked.allocateBuffer(); // allocates space for the decompressed block
524 
525     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
526       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
527 
528     ByteBuffer dup = this.buf.duplicate();
529     dup.position(this.headerSize());
530     dup = dup.slice();
531     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
532       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
533       dup);
534 
535     // Preserve the next block's header bytes in the new block if we have them.
536     if (unpacked.hasNextBlockHeader()) {
537       // Both the buffers are limited till checksum bytes and avoid the next block's header.
538       // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
539       // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
540       // new BB objects
541       ByteBuffer inDup = this.buf.duplicate();
542       inDup.limit(inDup.limit() + headerSize());
543       ByteBuffer outDup = unpacked.buf.duplicate();
544       outDup.limit(outDup.limit() + unpacked.headerSize());
545       ByteBufferUtils.copyFromBufferToBuffer(
546           outDup,
547           inDup,
548           this.onDiskDataSizeWithHeader,
549           unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
550               + unpacked.totalChecksumBytes(), unpacked.headerSize());
551     }
552     return unpacked;
553   }
554 
555   /**
556    * Return true when this buffer includes next block's header.
557    */
558   private boolean hasNextBlockHeader() {
559     return nextBlockOnDiskSizeWithHeader > 0;
560   }
561 
562   /**
563    * Always allocates a new buffer of the correct size. Copies header bytes
564    * from the existing buffer. Does not change header fields.
565    * Reserve room to keep checksum bytes too.
566    */
567   private void allocateBuffer() {
568     int cksumBytes = totalChecksumBytes();
569     int headerSize = headerSize();
570     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
571         cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
572 
573     // TODO we need consider allocating offheap here?
574     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
575 
576     // Copy header bytes into newBuf.
577     // newBuf is HBB so no issue in calling array()
578     ByteBuffer dup = buf.duplicate();
579     dup.position(0);
580     dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
581 
582     buf = newBuf;
583     // set limit to exclude next block's header
584     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
585   }
586 
587   /**
588    * Return true when this block's buffer has been unpacked, false otherwise. Note this is a
589    * calculated heuristic, not tracked attribute of the block.
590    */
591   public boolean isUnpacked() {
592     final int cksumBytes = totalChecksumBytes();
593     final int headerSize = headerSize();
594     final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
595     final int bufCapacity = buf.capacity();
596     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
597   }
598 
599   /** An additional sanity-check in case no compression or encryption is being used. */
600   public void assumeUncompressed() throws IOException {
601     if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
602         totalChecksumBytes()) {
603       throw new IOException("Using no compression but "
604           + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
605           + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
606           + ", numChecksumbytes=" + totalChecksumBytes());
607     }
608   }
609 
610   /**
611    * @param expectedType the expected type of this block
612    * @throws IOException if this block's type is different than expected
613    */
614   public void expectType(BlockType expectedType) throws IOException {
615     if (blockType != expectedType) {
616       throw new IOException("Invalid block type: expected=" + expectedType
617           + ", actual=" + blockType);
618     }
619   }
620 
621   /** @return the offset of this block in the file it was read from */
622   public long getOffset() {
623     if (offset < 0) {
624       throw new IllegalStateException(
625           "HFile block offset not initialized properly");
626     }
627     return offset;
628   }
629 
630   /**
631    * @return a byte stream reading the data + checksum of this block
632    */
633   public DataInputStream getByteStream() {
634     ByteBuffer dup = this.buf.duplicate();
635     dup.position(this.headerSize());
636     return new DataInputStream(new ByteBufferInputStream(dup));
637   }
638 
639   @Override
640   public long heapSize() {
641     long size = ClassSize.align(
642         ClassSize.OBJECT +
643         // Block type, byte buffer and meta references
644         3 * ClassSize.REFERENCE +
645         // On-disk size, uncompressed size, and next block's on-disk size
646         // bytePerChecksum and onDiskDataSize
647         4 * Bytes.SIZEOF_INT +
648         // This and previous block offset
649         2 * Bytes.SIZEOF_LONG +
650         // Heap size of the meta object. meta will be always not null.
651         fileContext.heapSize()
652     );
653 
654     if (buf != null) {
655       // Deep overhead of the byte buffer. Needs to be aligned separately.
656       size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
657     }
658 
659     return ClassSize.align(size);
660   }
661 
662   /**
663    * Read from an input stream. Analogous to
664    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
665    * number of "extra" bytes that would be desirable but not absolutely
666    * necessary to read.
667    *
668    * @param in the input stream to read from
669    * @param buf the buffer to read into
670    * @param bufOffset the destination offset in the buffer
671    * @param necessaryLen the number of bytes that are absolutely necessary to
672    *          read
673    * @param extraLen the number of extra bytes that would be nice to read
674    * @return true if succeeded reading the extra bytes
675    * @throws IOException if failed to read the necessary bytes
676    */
677   public static boolean readWithExtra(InputStream in, byte[] buf,
678       int bufOffset, int necessaryLen, int extraLen) throws IOException {
679     int bytesRemaining = necessaryLen + extraLen;
680     while (bytesRemaining > 0) {
681       int ret = in.read(buf, bufOffset, bytesRemaining);
682       if (ret == -1 && bytesRemaining <= extraLen) {
683         // We could not read the "extra data", but that is OK.
684         break;
685       }
686 
687       if (ret < 0) {
688         throw new IOException("Premature EOF from inputStream (read "
689             + "returned " + ret + ", was trying to read " + necessaryLen
690             + " necessary bytes and " + extraLen + " extra bytes, "
691             + "successfully read "
692             + (necessaryLen + extraLen - bytesRemaining));
693       }
694       bufOffset += ret;
695       bytesRemaining -= ret;
696     }
697     return bytesRemaining <= 0;
698   }
699 
700   /**
701    * Read from an input stream. Analogous to
702    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
703    * positional read and specifies a number of "extra" bytes that would be
704    * desirable but not absolutely necessary to read.
705    *
706    * @param in the input stream to read from
707    * @param position the position within the stream from which to start reading
708    * @param buf the buffer to read into
709    * @param bufOffset the destination offset in the buffer
710    * @param necessaryLen the number of bytes that are absolutely necessary to
711    *     read
712    * @param extraLen the number of extra bytes that would be nice to read
713    * @return true if and only if extraLen is > 0 and reading those extra bytes
714    *     was successful
715    * @throws IOException if failed to read the necessary bytes
716    */
717   @VisibleForTesting
718   static boolean positionalReadWithExtra(FSDataInputStream in,
719       long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
720       throws IOException {
721     int bytesRemaining = necessaryLen + extraLen;
722     int bytesRead = 0;
723     while (bytesRead < necessaryLen) {
724       int ret = in.read(position, buf, bufOffset, bytesRemaining);
725       if (ret < 0) {
726         throw new IOException("Premature EOF from inputStream (positional read "
727             + "returned " + ret + ", was trying to read " + necessaryLen
728             + " necessary bytes and " + extraLen + " extra bytes, "
729             + "successfully read " + bytesRead);
730       }
731       position += ret;
732       bufOffset += ret;
733       bytesRemaining -= ret;
734       bytesRead += ret;
735     }
736     return bytesRead != necessaryLen && bytesRemaining <= 0;
737   }
738 
739   /**
740    * @return the on-disk size of the next block (including the header size)
741    *         that was read by peeking into the next block's header
742    */
743   public int getNextBlockOnDiskSizeWithHeader() {
744     return nextBlockOnDiskSizeWithHeader;
745   }
746 
747   /**
748    * Unified version 2 {@link HFile} block writer. The intended usage pattern
749    * is as follows:
750    * <ol>
751    * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
752    * <li>Call {@link Writer#startWriting} and get a data stream to write to.
753    * <li>Write your data into the stream.
754    * <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
755    * store the serialized block into an external stream.
756    * <li>Repeat to write more blocks.
757    * </ol>
758    * <p>
759    */
760   public static class Writer {
761 
762     private enum State {
763       INIT,
764       WRITING,
765       BLOCK_READY
766     };
767 
768     /** Writer state. Used to ensure the correct usage protocol. */
769     private State state = State.INIT;
770 
771     /** Data block encoder used for data blocks */
772     private final HFileDataBlockEncoder dataBlockEncoder;
773 
774     private HFileBlockEncodingContext dataBlockEncodingCtx;
775 
776     /** block encoding context for non-data blocks */
777     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
778 
779     /**
780      * The stream we use to accumulate data in uncompressed format for each
781      * block. We reset this stream at the end of each block and reuse it. The
782      * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this
783      * stream.
784      */
785     private ByteArrayOutputStream baosInMemory;
786 
787     /**
788      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
789      * changed in {@link #finishBlock()} from {@link BlockType#DATA}
790      * to {@link BlockType#ENCODED_DATA}.
791      */
792     private BlockType blockType;
793 
794     /**
795      * A stream that we write uncompressed bytes to, which compresses them and
796      * writes them to {@link #baosInMemory}.
797      */
798     private DataOutputStream userDataStream;
799 
800     // Size of actual data being written. Not considering the block encoding/compression. This
801     // includes the header size also.
802     private int unencodedDataSizeWritten;
803 
804     /**
805      * Bytes to be written to the file system, including the header. Compressed
806      * if compression is turned on. It also includes the checksum data that
807      * immediately follows the block data. (header + data + checksums)
808      */
809     private byte[] onDiskBytesWithHeader;
810 
811     /**
812      * The size of the checksum data on disk. It is used only if data is
813      * not compressed. If data is compressed, then the checksums are already
814      * part of onDiskBytesWithHeader. If data is uncompressed, then this
815      * variable stores the checksum data for this block.
816      */
817     private byte[] onDiskChecksum;
818 
819     /**
820      * Valid in the READY state. Contains the header and the uncompressed (but
821      * potentially encoded, if this is a data block) bytes, so the length is
822      * {@link #uncompressedSizeWithoutHeader} +
823      * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
824      * Does not store checksums.
825      */
826     private byte[] uncompressedBytesWithHeader;
827 
828     /**
829      * Current block's start offset in the {@link HFile}. Set in
830      * {@link #writeHeaderAndData(FSDataOutputStream)}.
831      */
832     private long startOffset;
833 
834     /**
835      * Offset of previous block by block type. Updated when the next block is
836      * started.
837      */
838     private long[] prevOffsetByType;
839 
840     /** The offset of the previous block of the same type */
841     private long prevOffset;
842     /** Meta data that holds information about the hfileblock**/
843     private HFileContext fileContext;
844 
845     /**
846      * @param dataBlockEncoder data block encoding algorithm to use
847      */
848     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
849       this.dataBlockEncoder = dataBlockEncoder != null
850           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
851       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
852           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
853       dataBlockEncodingCtx = this.dataBlockEncoder
854           .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
855 
856       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
857         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
858             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
859             fileContext.getBytesPerChecksum());
860       }
861 
862       baosInMemory = new ByteArrayOutputStream();
863 
864       prevOffsetByType = new long[BlockType.values().length];
865       for (int i = 0; i < prevOffsetByType.length; ++i)
866         prevOffsetByType[i] = -1;
867 
868       this.fileContext = fileContext;
869     }
870 
871     /**
872      * Starts writing into the block. The previous block's data is discarded.
873      *
874      * @return the stream the user can write their data into
875      * @throws IOException
876      */
877     public DataOutputStream startWriting(BlockType newBlockType)
878         throws IOException {
879       if (state == State.BLOCK_READY && startOffset != -1) {
880         // We had a previous block that was written to a stream at a specific
881         // offset. Save that offset as the last offset of a block of that type.
882         prevOffsetByType[blockType.getId()] = startOffset;
883       }
884 
885       startOffset = -1;
886       blockType = newBlockType;
887 
888       baosInMemory.reset();
889       baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
890 
891       state = State.WRITING;
892 
893       // We will compress it later in finishBlock()
894       userDataStream = new DataOutputStream(baosInMemory);
895       if (newBlockType == BlockType.DATA) {
896         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
897       }
898       this.unencodedDataSizeWritten = 0;
899       return userDataStream;
900     }
901 
902     /**
903      * Writes the Cell to this block
904      * @param cell
905      * @throws IOException
906      */
907     public void write(Cell cell) throws IOException{
908       expectState(State.WRITING);
909       this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
910           this.userDataStream);
911     }
912 
913     /**
914      * Returns the stream for the user to write to. The block writer takes care
915      * of handling compression and buffering for caching on write. Can only be
916      * called in the "writing" state.
917      *
918      * @return the data output stream for the user to write to
919      */
920     DataOutputStream getUserDataStream() {
921       expectState(State.WRITING);
922       return userDataStream;
923     }
924 
925     /**
926      * Transitions the block writer from the "writing" state to the "block
927      * ready" state.  Does nothing if a block is already finished.
928      */
929     void ensureBlockReady() throws IOException {
930       Preconditions.checkState(state != State.INIT,
931           "Unexpected state: " + state);
932 
933       if (state == State.BLOCK_READY)
934         return;
935 
936       // This will set state to BLOCK_READY.
937       finishBlock();
938     }
939 
940     /**
941      * An internal method that flushes the compressing stream (if using
942      * compression), serializes the header, and takes care of the separate
943      * uncompressed stream for caching on write, if applicable. Sets block
944      * write state to "block ready".
945      */
946     private void finishBlock() throws IOException {
947       if (blockType == BlockType.DATA) {
948         BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
949             new BufferGrabbingByteArrayOutputStream();
950         baosInMemory.writeTo(baosInMemoryCopy);
951         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
952             baosInMemoryCopy.buf, blockType);
953         blockType = dataBlockEncodingCtx.getBlockType();
954       }
955       userDataStream.flush();
956       // This does an array copy, so it is safe to cache this byte array.
957       uncompressedBytesWithHeader = baosInMemory.toByteArray();
958       prevOffset = prevOffsetByType[blockType.getId()];
959 
960       // We need to set state before we can package the block up for
961       // cache-on-write. In a way, the block is ready, but not yet encoded or
962       // compressed.
963       state = State.BLOCK_READY;
964       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
965         onDiskBytesWithHeader = dataBlockEncodingCtx
966             .compressAndEncrypt(uncompressedBytesWithHeader);
967       } else {
968         onDiskBytesWithHeader = defaultBlockEncodingCtx
969             .compressAndEncrypt(uncompressedBytesWithHeader);
970       }
971       int numBytes = (int) ChecksumUtil.numBytes(
972           onDiskBytesWithHeader.length,
973           fileContext.getBytesPerChecksum());
974 
975       // put the header for on disk bytes
976       putHeader(onDiskBytesWithHeader, 0,
977           onDiskBytesWithHeader.length + numBytes,
978           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
979       // set the header for the uncompressed bytes (for cache-on-write)
980       putHeader(uncompressedBytesWithHeader, 0,
981           onDiskBytesWithHeader.length + numBytes,
982           uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
983 
984       onDiskChecksum = new byte[numBytes];
985       ChecksumUtil.generateChecksums(
986           onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
987           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
988     }
989 
990     public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
991       private byte[] buf;
992 
993       @Override
994       public void write(byte[] b, int off, int len) {
995         this.buf = b;
996       }
997 
998       public byte[] getBuffer() {
999         return this.buf;
1000       }
1001     }
1002 
1003     /**
1004      * Put the header into the given byte array at the given offset.
1005      * @param onDiskSize size of the block on disk header + data + checksum
1006      * @param uncompressedSize size of the block after decompression (but
1007      *          before optional data block decoding) including header
1008      * @param onDiskDataSize size of the block on disk with header
1009      *        and data but not including the checksums
1010      */
1011     private void putHeader(byte[] dest, int offset, int onDiskSize,
1012         int uncompressedSize, int onDiskDataSize) {
1013       offset = blockType.put(dest, offset);
1014       offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1015       offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
1016       offset = Bytes.putLong(dest, offset, prevOffset);
1017       offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode());
1018       offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum());
1019       Bytes.putInt(dest, offset, onDiskDataSize);
1020     }
1021 
1022     /**
1023      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
1024      * the offset of this block so that it can be referenced in the next block
1025      * of the same type.
1026      *
1027      * @param out
1028      * @throws IOException
1029      */
1030     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
1031       long offset = out.getPos();
1032       if (startOffset != -1 && offset != startOffset) {
1033         throw new IOException("A " + blockType + " block written to a "
1034             + "stream twice, first at offset " + startOffset + ", then at "
1035             + offset);
1036       }
1037       startOffset = offset;
1038 
1039       finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1040     }
1041 
1042     /**
1043      * Writes the header and the compressed data of this block (or uncompressed
1044      * data when not using compression) into the given stream. Can be called in
1045      * the "writing" state or in the "block ready" state. If called in the
1046      * "writing" state, transitions the writer to the "block ready" state.
1047      *
1048      * @param out the output stream to write the
1049      * @throws IOException
1050      */
1051     protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
1052       throws IOException {
1053       ensureBlockReady();
1054       out.write(onDiskBytesWithHeader);
1055       out.write(onDiskChecksum);
1056     }
1057 
1058     /**
1059      * Returns the header or the compressed data (or uncompressed data when not
1060      * using compression) as a byte array. Can be called in the "writing" state
1061      * or in the "block ready" state. If called in the "writing" state,
1062      * transitions the writer to the "block ready" state. This returns
1063      * the header + data + checksums stored on disk.
1064      *
1065      * @return header and data as they would be stored on disk in a byte array
1066      * @throws IOException
1067      */
1068     byte[] getHeaderAndDataForTest() throws IOException {
1069       ensureBlockReady();
1070       // This is not very optimal, because we are doing an extra copy.
1071       // But this method is used only by unit tests.
1072       byte[] output =
1073           new byte[onDiskBytesWithHeader.length
1074               + onDiskChecksum.length];
1075       System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
1076           onDiskBytesWithHeader.length);
1077       System.arraycopy(onDiskChecksum, 0, output,
1078           onDiskBytesWithHeader.length, onDiskChecksum.length);
1079       return output;
1080     }
1081 
1082     /**
1083      * Releases resources used by this writer.
1084      */
1085     public void release() {
1086       if (dataBlockEncodingCtx != null) {
1087         dataBlockEncodingCtx.close();
1088         dataBlockEncodingCtx = null;
1089       }
1090       if (defaultBlockEncodingCtx != null) {
1091         defaultBlockEncodingCtx.close();
1092         defaultBlockEncodingCtx = null;
1093       }
1094     }
1095 
1096     /**
1097      * Returns the on-disk size of the data portion of the block. This is the
1098      * compressed size if compression is enabled. Can only be called in the
1099      * "block ready" state. Header is not compressed, and its size is not
1100      * included in the return value.
1101      *
1102      * @return the on-disk size of the block, not including the header.
1103      */
1104     int getOnDiskSizeWithoutHeader() {
1105       expectState(State.BLOCK_READY);
1106       return onDiskBytesWithHeader.length
1107           + onDiskChecksum.length
1108           - HConstants.HFILEBLOCK_HEADER_SIZE;
1109     }
1110 
1111     /**
1112      * Returns the on-disk size of the block. Can only be called in the
1113      * "block ready" state.
1114      *
1115      * @return the on-disk size of the block ready to be written, including the
1116      *         header size, the data and the checksum data.
1117      */
1118     int getOnDiskSizeWithHeader() {
1119       expectState(State.BLOCK_READY);
1120       return onDiskBytesWithHeader.length + onDiskChecksum.length;
1121     }
1122 
1123     /**
1124      * The uncompressed size of the block data. Does not include header size.
1125      */
1126     int getUncompressedSizeWithoutHeader() {
1127       expectState(State.BLOCK_READY);
1128       return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
1129     }
1130 
1131     /**
1132      * The uncompressed size of the block data, including header size.
1133      */
1134     int getUncompressedSizeWithHeader() {
1135       expectState(State.BLOCK_READY);
1136       return uncompressedBytesWithHeader.length;
1137     }
1138 
1139     /** @return true if a block is being written  */
1140     public boolean isWriting() {
1141       return state == State.WRITING;
1142     }
1143 
1144     /**
1145      * Returns the number of bytes written into the current block so far, or
1146      * zero if not writing the block at the moment. Note that this will return
1147      * zero in the "block ready" state as well.
1148      *
1149      * @return the number of bytes written
1150      */
1151     public int blockSizeWritten() {
1152       if (state != State.WRITING) return 0;
1153       return this.unencodedDataSizeWritten;
1154     }
1155 
1156     /**
1157      * Returns the header followed by the uncompressed data, even if using
1158      * compression. This is needed for storing uncompressed blocks in the block
1159      * cache. Can be called in the "writing" state or the "block ready" state.
1160      * Returns only the header and data, does not include checksum data.
1161      *
1162      * @return uncompressed block bytes for caching on write
1163      */
1164     ByteBuffer getUncompressedBufferWithHeader() {
1165       expectState(State.BLOCK_READY);
1166       return ByteBuffer.wrap(uncompressedBytesWithHeader);
1167     }
1168 
1169     /**
1170      * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
1171      * needed for storing packed blocks in the block cache. Expects calling semantics identical to
1172      * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
1173      * Does not include checksum data.
1174      *
1175      * @return packed block bytes for caching on write
1176      */
1177     ByteBuffer getOnDiskBufferWithHeader() {
1178       expectState(State.BLOCK_READY);
1179       return ByteBuffer.wrap(onDiskBytesWithHeader);
1180     }
1181 
1182     private void expectState(State expectedState) {
1183       if (state != expectedState) {
1184         throw new IllegalStateException("Expected state: " + expectedState +
1185             ", actual state: " + state);
1186       }
1187     }
1188 
1189     /**
1190      * Takes the given {@link BlockWritable} instance, creates a new block of
1191      * its appropriate type, writes the writable into this block, and flushes
1192      * the block into the output stream. The writer is instructed not to buffer
1193      * uncompressed bytes for cache-on-write.
1194      *
1195      * @param bw the block-writable object to write as a block
1196      * @param out the file system output stream
1197      * @throws IOException
1198      */
1199     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
1200         throws IOException {
1201       bw.writeToBlock(startWriting(bw.getBlockType()));
1202       writeHeaderAndData(out);
1203     }
1204 
1205     /**
1206      * Creates a new HFileBlock. Checksums have already been validated, so
1207      * the byte buffer passed into the constructor of this newly created
1208      * block does not have checksum data even though the header minor
1209      * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
1210      * 0 value in bytesPerChecksum.
1211      */
1212     public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
1213       HFileContext newContext = new HFileContextBuilder()
1214                                 .withBlockSize(fileContext.getBlocksize())
1215                                 .withBytesPerCheckSum(0)
1216                                 .withChecksumType(ChecksumType.NULL) // no checksums in cached data
1217                                 .withCompression(fileContext.getCompression())
1218                                 .withDataBlockEncoding(fileContext.getDataBlockEncoding())
1219                                 .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
1220                                 .withCompressTags(fileContext.isCompressTags())
1221                                 .withIncludesMvcc(fileContext.isIncludesMvcc())
1222                                 .withIncludesTags(fileContext.isIncludesTags())
1223                                 .build();
1224       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1225           getUncompressedSizeWithoutHeader(), prevOffset,
1226           cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
1227             getOnDiskBufferWithHeader() :
1228             getUncompressedBufferWithHeader(),
1229           FILL_HEADER, startOffset,
1230           onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
1231     }
1232   }
1233 
1234   /** Something that can be written into a block. */
1235   public interface BlockWritable {
1236 
1237     /** The type of block this data should use. */
1238     BlockType getBlockType();
1239 
1240     /**
1241      * Writes the block to the provided stream. Must not write any magic
1242      * records.
1243      *
1244      * @param out a stream to write uncompressed data into
1245      */
1246     void writeToBlock(DataOutput out) throws IOException;
1247   }
1248 
1249   // Block readers and writers
1250 
1251   /** An interface allowing to iterate {@link HFileBlock}s. */
1252   public interface BlockIterator {
1253 
1254     /**
1255      * Get the next block, or null if there are no more blocks to iterate.
1256      */
1257     HFileBlock nextBlock() throws IOException;
1258 
1259     /**
1260      * Similar to {@link #nextBlock()} but checks block type, throws an
1261      * exception if incorrect, and returns the HFile block
1262      */
1263     HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
1264   }
1265 
1266   /** A full-fledged reader with iteration ability. */
1267   public interface FSReader {
1268 
1269     /**
1270      * Reads the block at the given offset in the file with the given on-disk
1271      * size and uncompressed size.
1272      *
1273      * @param offset
1274      * @param onDiskSize the on-disk size of the entire block, including all
1275      *          applicable headers, or -1 if unknown
1276      * @param uncompressedSize the uncompressed size of the compressed part of
1277      *          the block, or -1 if unknown
1278      * @return the newly read block
1279      */
1280     HFileBlock readBlockData(long offset, long onDiskSize,
1281         int uncompressedSize, boolean pread) throws IOException;
1282 
1283     /**
1284      * Creates a block iterator over the given portion of the {@link HFile}.
1285      * The iterator returns blocks starting with offset such that offset &lt;=
1286      * startOffset &lt; endOffset. Returned blocks are always unpacked.
1287      *
1288      * @param startOffset the offset of the block to start iteration with
1289      * @param endOffset the offset to end iteration at (exclusive)
1290      * @return an iterator of blocks between the two given offsets
1291      */
1292     BlockIterator blockRange(long startOffset, long endOffset);
1293 
1294     /** Closes the backing streams */
1295     void closeStreams() throws IOException;
1296 
1297     /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
1298     HFileBlockDecodingContext getBlockDecodingContext();
1299 
1300     /** Get the default decoder for blocks from this file. */
1301     HFileBlockDecodingContext getDefaultBlockDecodingContext();
1302   }
1303 
1304   /**
1305    * A common implementation of some methods of {@link FSReader} and some
1306    * tools for implementing HFile format version-specific block readers.
1307    */
1308   private abstract static class AbstractFSReader implements FSReader {
1309     /** Compression algorithm used by the {@link HFile} */
1310 
1311     /** The size of the file we are reading from, or -1 if unknown. */
1312     protected long fileSize;
1313 
1314     /** The size of the header */
1315     protected final int hdrSize;
1316 
1317     /** The filesystem used to access data */
1318     protected HFileSystem hfs;
1319 
1320     /** The path (if any) where this data is coming from */
1321     protected Path path;
1322 
1323     private final Lock streamLock = new ReentrantLock();
1324 
1325     /** The default buffer size for our buffered streams */
1326     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
1327 
1328     protected HFileContext fileContext;
1329 
1330     public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
1331         throws IOException {
1332       this.fileSize = fileSize;
1333       this.hfs = hfs;
1334       this.path = path;
1335       this.fileContext = fileContext;
1336       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
1337     }
1338 
1339     @Override
1340     public BlockIterator blockRange(final long startOffset,
1341         final long endOffset) {
1342       final FSReader owner = this; // handle for inner class
1343       return new BlockIterator() {
1344         private long offset = startOffset;
1345 
1346         @Override
1347         public HFileBlock nextBlock() throws IOException {
1348           if (offset >= endOffset)
1349             return null;
1350           HFileBlock b = readBlockData(offset, -1, -1, false);
1351           offset += b.getOnDiskSizeWithHeader();
1352           return b.unpack(fileContext, owner);
1353         }
1354 
1355         @Override
1356         public HFileBlock nextBlockWithBlockType(BlockType blockType)
1357             throws IOException {
1358           HFileBlock blk = nextBlock();
1359           if (blk.getBlockType() != blockType) {
1360             throw new IOException("Expected block of type " + blockType
1361                 + " but found " + blk.getBlockType());
1362           }
1363           return blk;
1364         }
1365       };
1366     }
1367 
1368     /**
1369      * Does a positional read or a seek and read into the given buffer. Returns
1370      * the on-disk size of the next block, or -1 if it could not be determined.
1371      *
1372      * @param dest destination buffer
1373      * @param destOffset offset in the destination buffer
1374      * @param size size of the block to be read
1375      * @param peekIntoNextBlock whether to read the next block's on-disk size
1376      * @param fileOffset position in the stream to read at
1377      * @param pread whether we should do a positional read
1378      * @param istream The input source of data
1379      * @return the on-disk size of the next block with header size included, or
1380      *         -1 if it could not be determined
1381      * @throws IOException
1382      */
1383     protected int readAtOffset(FSDataInputStream istream,
1384         byte[] dest, int destOffset, int size,
1385         boolean peekIntoNextBlock, long fileOffset, boolean pread)
1386         throws IOException {
1387       if (peekIntoNextBlock &&
1388           destOffset + size + hdrSize > dest.length) {
1389         // We are asked to read the next block's header as well, but there is
1390         // not enough room in the array.
1391         throw new IOException("Attempted to read " + size + " bytes and " +
1392             hdrSize + " bytes of next header into a " + dest.length +
1393             "-byte array at offset " + destOffset);
1394       }
1395 
1396       if (!pread && streamLock.tryLock()) {
1397         // Seek + read. Better for scanning.
1398         try {
1399           istream.seek(fileOffset);
1400 
1401           long realOffset = istream.getPos();
1402           if (realOffset != fileOffset) {
1403             throw new IOException("Tried to seek to " + fileOffset + " to "
1404                 + "read " + size + " bytes, but pos=" + realOffset
1405                 + " after seek");
1406           }
1407 
1408           if (!peekIntoNextBlock) {
1409             IOUtils.readFully(istream, dest, destOffset, size);
1410             return -1;
1411           }
1412 
1413           // Try to read the next block header.
1414           if (!readWithExtra(istream, dest, destOffset, size, hdrSize))
1415             return -1;
1416         } finally {
1417           streamLock.unlock();
1418         }
1419       } else {
1420         // Positional read. Better for random reads; or when the streamLock is already locked.
1421         int extraSize = peekIntoNextBlock ? hdrSize : 0;
1422         if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset,
1423             size, extraSize)) {
1424           return -1;
1425         }
1426       }
1427 
1428       assert peekIntoNextBlock;
1429       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
1430     }
1431 
1432   }
1433 
1434   /**
1435    * We always prefetch the header of the next block, so that we know its
1436    * on-disk size in advance and can read it in one operation.
1437    */
1438   private static class PrefetchedHeader {
1439     long offset = -1;
1440     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
1441     final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
1442   }
1443 
1444   /** Reads version 2 blocks from the filesystem. */
1445   static class FSReaderImpl extends AbstractFSReader {
1446     /** The file system stream of the underlying {@link HFile} that
1447      * does or doesn't do checksum validations in the filesystem */
1448     protected FSDataInputStreamWrapper streamWrapper;
1449 
1450     private HFileBlockDecodingContext encodedBlockDecodingCtx;
1451 
1452     /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
1453     private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
1454 
1455     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
1456         new ThreadLocal<PrefetchedHeader>() {
1457           @Override
1458           public PrefetchedHeader initialValue() {
1459             return new PrefetchedHeader();
1460           }
1461         };
1462 
1463     public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
1464         HFileContext fileContext) throws IOException {
1465       super(fileSize, hfs, path, fileContext);
1466       this.streamWrapper = stream;
1467       // Older versions of HBase didn't support checksum.
1468       this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
1469       defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
1470       encodedBlockDecodingCtx = defaultDecodingCtx;
1471     }
1472 
1473     /**
1474      * A constructor that reads files with the latest minor version.
1475      * This is used by unit tests only.
1476      */
1477     FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
1478     throws IOException {
1479       this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
1480     }
1481 
1482     /**
1483      * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
1484      * little memory allocation as possible, using the provided on-disk size.
1485      *
1486      * @param offset the offset in the stream to read at
1487      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1488      *          the header, or -1 if unknown
1489      * @param uncompressedSize the uncompressed size of the the block. Always
1490      *          expected to be -1. This parameter is only used in version 1.
1491      * @param pread whether to use a positional read
1492      */
1493     @Override
1494     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
1495         int uncompressedSize, boolean pread)
1496     throws IOException {
1497 
1498       // get a copy of the current state of whether to validate
1499       // hbase checksums or not for this read call. This is not
1500       // thread-safe but the one constaint is that if we decide
1501       // to skip hbase checksum verification then we are
1502       // guaranteed to use hdfs checksum verification.
1503       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
1504       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
1505 
1506       HFileBlock blk = readBlockDataInternal(is, offset,
1507                          onDiskSizeWithHeaderL,
1508                          uncompressedSize, pread,
1509                          doVerificationThruHBaseChecksum);
1510       if (blk == null) {
1511         HFile.LOG.warn("HBase checksum verification failed for file " +
1512                        path + " at offset " +
1513                        offset + " filesize " + fileSize +
1514                        ". Retrying read with HDFS checksums turned on...");
1515 
1516         if (!doVerificationThruHBaseChecksum) {
1517           String msg = "HBase checksum verification failed for file " +
1518                        path + " at offset " +
1519                        offset + " filesize " + fileSize +
1520                        " but this cannot happen because doVerify is " +
1521                        doVerificationThruHBaseChecksum;
1522           HFile.LOG.warn(msg);
1523           throw new IOException(msg); // cannot happen case here
1524         }
1525         HFile.checksumFailures.incrementAndGet(); // update metrics
1526 
1527         // If we have a checksum failure, we fall back into a mode where
1528         // the next few reads use HDFS level checksums. We aim to make the
1529         // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
1530         // hbase checksum verification, but since this value is set without
1531         // holding any locks, it can so happen that we might actually do
1532         // a few more than precisely this number.
1533         is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
1534         doVerificationThruHBaseChecksum = false;
1535         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
1536                                     uncompressedSize, pread,
1537                                     doVerificationThruHBaseChecksum);
1538         if (blk != null) {
1539           HFile.LOG.warn("HDFS checksum verification suceeded for file " +
1540                          path + " at offset " +
1541                          offset + " filesize " + fileSize);
1542         }
1543       }
1544       if (blk == null && !doVerificationThruHBaseChecksum) {
1545         String msg = "readBlockData failed, possibly due to " +
1546                      "checksum verification failed for file " + path +
1547                      " at offset " + offset + " filesize " + fileSize;
1548         HFile.LOG.warn(msg);
1549         throw new IOException(msg);
1550       }
1551 
1552       // If there is a checksum mismatch earlier, then retry with
1553       // HBase checksums switched off and use HDFS checksum verification.
1554       // This triggers HDFS to detect and fix corrupt replicas. The
1555       // next checksumOffCount read requests will use HDFS checksums.
1556       // The decrementing of this.checksumOffCount is not thread-safe,
1557       // but it is harmless because eventually checksumOffCount will be
1558       // a negative number.
1559       streamWrapper.checksumOk();
1560       return blk;
1561     }
1562 
1563     /**
1564      * Reads a version 2 block.
1565      *
1566      * @param offset the offset in the stream to read at
1567      * @param onDiskSizeWithHeaderL the on-disk size of the block, including
1568      *          the header, or -1 if unknown
1569      * @param uncompressedSize the uncompressed size of the the block. Always
1570      *          expected to be -1. This parameter is only used in version 1.
1571      * @param pread whether to use a positional read
1572      * @param verifyChecksum Whether to use HBase checksums.
1573      *        If HBase checksum is switched off, then use HDFS checksum.
1574      * @return the HFileBlock or null if there is a HBase checksum mismatch
1575      */
1576     private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
1577         long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
1578         boolean verifyChecksum)
1579     throws IOException {
1580       if (offset < 0) {
1581         throw new IOException("Invalid offset=" + offset + " trying to read "
1582             + "block (onDiskSize=" + onDiskSizeWithHeaderL
1583             + ", uncompressedSize=" + uncompressedSize + ")");
1584       }
1585 
1586       if (uncompressedSize != -1) {
1587         throw new IOException("Version 2 block reader API does not need " +
1588             "the uncompressed size parameter");
1589       }
1590 
1591       if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1)
1592           || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
1593         throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
1594             + ": expected to be at least " + hdrSize
1595             + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
1596             + offset + ", uncompressedSize=" + uncompressedSize + ")");
1597       }
1598 
1599       int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
1600       // See if we can avoid reading the header. This is desirable, because
1601       // we will not incur a backward seek operation if we have already
1602       // read this block's header as part of the previous read's look-ahead.
1603       // And we also want to skip reading the header again if it has already
1604       // been read.
1605       // TODO: How often does this optimization fire? Has to be same thread so the thread local
1606       // is pertinent and we have to be reading next block as in a big scan.
1607       PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
1608       ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
1609 
1610       // Allocate enough space to fit the next block's header too.
1611       int nextBlockOnDiskSize = 0;
1612       byte[] onDiskBlock = null;
1613 
1614       HFileBlock b = null;
1615       if (onDiskSizeWithHeader > 0) {
1616         // We know the total on-disk size. Read the entire block into memory,
1617         // then parse the header. This code path is used when
1618         // doing a random read operation relying on the block index, as well as
1619         // when the client knows the on-disk size from peeking into the next
1620         // block's header (e.g. this block's header) when reading the previous
1621         // block. This is the faster and more preferable case.
1622 
1623         // Size that we have to skip in case we have already read the header.
1624         int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
1625         onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the
1626                                                                 // next block's header
1627         nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
1628             preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
1629             true, offset + preReadHeaderSize, pread);
1630         if (headerBuf != null) {
1631           // the header has been read when reading the previous block, copy
1632           // to this block's header
1633           // headerBuf is HBB
1634           assert headerBuf.hasArray();
1635           System.arraycopy(headerBuf.array(),
1636               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1637         } else {
1638           headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
1639         }
1640         // We know the total on-disk size but not the uncompressed size. Parse the header.
1641         try {
1642           // TODO: FIX!!! Expensive parse just to get a length
1643           b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1644         } catch (IOException ex) {
1645           // Seen in load testing. Provide comprehensive debug info.
1646           throw new IOException("Failed to read compressed block at "
1647               + offset
1648               + ", onDiskSizeWithoutHeader="
1649               + onDiskSizeWithHeader
1650               + ", preReadHeaderSize="
1651               + hdrSize
1652               + ", header.length="
1653               + prefetchedHeader.header.length
1654               + ", header bytes: "
1655               + Bytes.toStringBinary(prefetchedHeader.header, 0,
1656                   hdrSize), ex);
1657         }
1658         // if the caller specifies a onDiskSizeWithHeader, validate it.
1659         int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
1660         assert onDiskSizeWithoutHeader >= 0;
1661         b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
1662       } else {
1663         // Check headerBuf to see if we have read this block's header as part of
1664         // reading the previous block. This is an optimization of peeking into
1665         // the next block's header (e.g.this block's header) when reading the
1666         // previous block. This is the faster and more preferable case. If the
1667         // header is already there, don't read the header again.
1668 
1669         // Unfortunately, we still have to do a separate read operation to
1670         // read the header.
1671         if (headerBuf == null) {
1672           // From the header, determine the on-disk size of the given hfile
1673           // block, and read the remaining data, thereby incurring two read
1674           // operations. This might happen when we are doing the first read
1675           // in a series of reads or a random read, and we don't have access
1676           // to the block index. This is costly and should happen very rarely.
1677           headerBuf = ByteBuffer.allocate(hdrSize);
1678           // headerBuf is HBB
1679           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
1680               hdrSize, false, offset, pread);
1681         }
1682         // TODO: FIX!!! Expensive parse just to get a length
1683         b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
1684         onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
1685         // headerBuf is HBB
1686         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
1687         nextBlockOnDiskSize =
1688           readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
1689               - hdrSize, true, offset + hdrSize, pread);
1690         onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
1691       }
1692 
1693       if (!fileContext.isCompressedOrEncrypted()) {
1694         b.assumeUncompressed();
1695       }
1696 
1697       if (verifyChecksum && !validateBlockChecksum(b, offset, onDiskBlock, hdrSize)) {
1698         return null;             // checksum mismatch
1699       }
1700 
1701       // The onDiskBlock will become the headerAndDataBuffer for this block.
1702       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
1703       // contains the header of next block, so no need to set next
1704       // block's header in it.
1705       b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
1706         this.fileContext.isUseHBaseChecksum());
1707 
1708       b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
1709 
1710       // Set prefetched header
1711       if (b.hasNextBlockHeader()) {
1712         prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
1713         System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize);
1714       }
1715 
1716       b.offset = offset;
1717       b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
1718       b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
1719       return b;
1720     }
1721 
1722     void setIncludesMemstoreTS(boolean includesMemstoreTS) {
1723       this.fileContext.setIncludesMvcc(includesMemstoreTS);
1724     }
1725 
1726     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
1727       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
1728     }
1729 
1730     @Override
1731     public HFileBlockDecodingContext getBlockDecodingContext() {
1732       return this.encodedBlockDecodingCtx;
1733     }
1734 
1735     @Override
1736     public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
1737       return this.defaultDecodingCtx;
1738     }
1739 
1740     /**
1741      * Generates the checksum for the header as well as the data and
1742      * then validates that it matches the value stored in the header.
1743      * If there is a checksum mismatch, then return false. Otherwise
1744      * return true.
1745      */
1746     protected boolean validateBlockChecksum(HFileBlock block, long offset, byte[] data,
1747         int hdrSize)
1748     throws IOException {
1749       return ChecksumUtil.validateBlockChecksum(path, offset, block, data, hdrSize);
1750     }
1751 
1752     @Override
1753     public void closeStreams() throws IOException {
1754       streamWrapper.close();
1755     }
1756 
1757     @Override
1758     public String toString() {
1759       return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext;
1760     }
1761   }
1762 
1763   @Override
1764   public int getSerializedLength() {
1765     if (buf != null) {
1766       // include extra bytes for the next header when it's available.
1767       int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
1768       return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1769     }
1770     return 0;
1771   }
1772 
1773   @Override
1774   public void serialize(ByteBuffer destination) {
1775     ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
1776         - EXTRA_SERIALIZATION_SPACE);
1777     serializeExtraInfo(destination);
1778   }
1779 
1780   public void serializeExtraInfo(ByteBuffer destination) {
1781     destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
1782     destination.putLong(this.offset);
1783     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
1784     destination.rewind();
1785   }
1786 
1787   @Override
1788   public CacheableDeserializer<Cacheable> getDeserializer() {
1789     return HFileBlock.blockDeserializer;
1790   }
1791 
1792   @Override
1793   public boolean equals(Object comparison) {
1794     if (this == comparison) {
1795       return true;
1796     }
1797     if (comparison == null) {
1798       return false;
1799     }
1800     if (comparison.getClass() != this.getClass()) {
1801       return false;
1802     }
1803 
1804     HFileBlock castedComparison = (HFileBlock) comparison;
1805 
1806     if (castedComparison.blockType != this.blockType) {
1807       return false;
1808     }
1809     if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
1810       return false;
1811     }
1812     if (castedComparison.offset != this.offset) {
1813       return false;
1814     }
1815     if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
1816       return false;
1817     }
1818     if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
1819       return false;
1820     }
1821     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
1822       return false;
1823     }
1824     if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
1825         castedComparison.buf.limit()) != 0) {
1826       return false;
1827     }
1828     return true;
1829   }
1830 
1831   public DataBlockEncoding getDataBlockEncoding() {
1832     if (blockType == BlockType.ENCODED_DATA) {
1833       return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
1834     }
1835     return DataBlockEncoding.NONE;
1836   }
1837 
1838   byte getChecksumType() {
1839     return this.fileContext.getChecksumType().getCode();
1840   }
1841 
1842   int getBytesPerChecksum() {
1843     return this.fileContext.getBytesPerChecksum();
1844   }
1845 
1846   /** @return the size of data on disk + header. Excludes checksum. */
1847   int getOnDiskDataSizeWithHeader() {
1848     return this.onDiskDataSizeWithHeader;
1849   }
1850 
1851   /**
1852    * Calcuate the number of bytes required to store all the checksums
1853    * for this block. Each checksum value is a 4 byte integer.
1854    */
1855   int totalChecksumBytes() {
1856     // If the hfile block has minorVersion 0, then there are no checksum
1857     // data to validate. Similarly, a zero value in this.bytesPerChecksum
1858     // indicates that cached blocks do not have checksum data because
1859     // checksums were already validated when the block was read from disk.
1860     if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
1861       return 0;
1862     }
1863     return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
1864         this.fileContext.getBytesPerChecksum());
1865   }
1866 
1867   /**
1868    * Returns the size of this block header.
1869    */
1870   public int headerSize() {
1871     return headerSize(this.fileContext.isUseHBaseChecksum());
1872   }
1873 
1874   /**
1875    * Maps a minor version to the size of the header.
1876    */
1877   public static int headerSize(boolean usesHBaseChecksum) {
1878     if (usesHBaseChecksum) {
1879       return HConstants.HFILEBLOCK_HEADER_SIZE;
1880     }
1881     return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
1882   }
1883 
1884   /**
1885    * Return the appropriate DUMMY_HEADER for the minor version
1886    */
1887   public byte[] getDummyHeaderForVersion() {
1888     return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
1889   }
1890 
1891   /**
1892    * Return the appropriate DUMMY_HEADER for the minor version
1893    */
1894   static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
1895     if (usesHBaseChecksum) {
1896       return HConstants.HFILEBLOCK_DUMMY_HEADER;
1897     }
1898     return DUMMY_HEADER_NO_CHECKSUM;
1899   }
1900 
1901   /**
1902    * @return the HFileContext used to create this HFileBlock. Not necessary the
1903    * fileContext for the file from which this block's data was originally read.
1904    */
1905   public HFileContext getHFileContext() {
1906     return this.fileContext;
1907   }
1908 
1909   /**
1910    * Convert the contents of the block header into a human readable string.
1911    * This is mostly helpful for debugging. This assumes that the block
1912    * has minor version > 0.
1913    */
1914   static String toStringHeader(ByteBuffer buf) throws IOException {
1915     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
1916     buf.get(magicBuf);
1917     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
1918     int compressedBlockSizeNoHeader = buf.getInt();
1919     int uncompressedBlockSizeNoHeader = buf.getInt();
1920     long prevBlockOffset = buf.getLong();
1921     byte cksumtype = buf.get();
1922     long bytesPerChecksum = buf.getInt();
1923     long onDiskDataSizeWithHeader = buf.getInt();
1924     return " Header dump: magic: " + Bytes.toString(magicBuf) +
1925                    " blockType " + bt +
1926                    " compressedBlockSizeNoHeader " +
1927                    compressedBlockSizeNoHeader +
1928                    " uncompressedBlockSizeNoHeader " +
1929                    uncompressedBlockSizeNoHeader +
1930                    " prevBlockOffset " + prevBlockOffset +
1931                    " checksumType " + ChecksumType.codeToType(cksumtype) +
1932                    " bytesPerChecksum " + bytesPerChecksum +
1933                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
1934   }
1935 }