View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellComparator;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.KeyValue.KVComparator;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
40  import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
41  import org.apache.hadoop.hbase.util.BloomFilterWriter;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.io.Writable;
44  
45  /**
46   * Writes HFile format version 2.
47   */
48  @InterfaceAudience.Private
49  public class HFileWriterV2 extends AbstractHFileWriter {
50    static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
51  
52    /** Max memstore (mvcc) timestamp in FileInfo */
53    public static final byte [] MAX_MEMSTORE_TS_KEY =
54        Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
55  
56    /** KeyValue version in FileInfo */
57    public static final byte [] KEY_VALUE_VERSION =
58        Bytes.toBytes("KEY_VALUE_VERSION");
59  
60    /** Version for KeyValue which includes memstore timestamp */
61    public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
62  
63    /** Inline block writers for multi-level block index and compound Blooms. */
64    private List<InlineBlockWriter> inlineBlockWriters =
65        new ArrayList<InlineBlockWriter>();
66  
67    /** Unified version 2 block writer */
68    protected HFileBlock.Writer fsBlockWriter;
69  
70    private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
71    private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
72  
73    /** The offset of the first data block or -1 if the file is empty. */
74    private long firstDataBlockOffset = -1;
75  
76    /** The offset of the last data block or 0 if the file is empty. */
77    protected long lastDataBlockOffset;
78  
79    /**
80     * The last(stop) Cell of the previous data block.
81     * This reference should be short-lived since we write hfiles in a burst.
82     */
83    private Cell lastCellOfPreviousBlock = null;
84  
85    /** Additional data items to be written to the "load-on-open" section. */
86    private List<BlockWritable> additionalLoadOnOpenData =
87      new ArrayList<BlockWritable>();
88  
89    protected long maxMemstoreTS = 0;
90  
91    /** warn on cell with tags */
92    private static boolean WARN_CELL_WITH_TAGS = true;
93  
94    static class WriterFactoryV2 extends HFile.WriterFactory {
95      WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
96        super(conf, cacheConf);
97      }
98  
99      @Override
100     public Writer createWriter(FileSystem fs, Path path, 
101         FSDataOutputStream ostream,
102         KVComparator comparator, HFileContext context) throws IOException {
103       context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
104       return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
105           comparator, context);
106       }
107     }
108 
109   /** Constructor that takes a path, creates and closes the output stream. */
110   public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
111       FileSystem fs, Path path, FSDataOutputStream ostream, 
112       final KVComparator comparator, final HFileContext context) throws IOException {
113     super(cacheConf,
114         ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
115         path, comparator, context);
116     finishInit(conf);
117   }
118 
119   /** Additional initialization steps */
120   protected void finishInit(final Configuration conf) {
121     if (fsBlockWriter != null)
122       throw new IllegalStateException("finishInit called twice");
123 
124     fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
125 
126     // Data block index writer
127     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
128     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
129         cacheIndexesOnWrite ? cacheConf : null,
130         cacheIndexesOnWrite ? name : null);
131     dataBlockIndexWriter.setMaxChunkSize(
132         HFileBlockIndex.getMaxChunkSize(conf));
133     inlineBlockWriters.add(dataBlockIndexWriter);
134 
135     // Meta data block index writer
136     metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
137     if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
138   }
139 
140   /**
141    * At a block boundary, write all the inline blocks and opens new block.
142    *
143    * @throws IOException
144    */
145   protected void checkBlockBoundary() throws IOException {
146     if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
147       return;
148 
149     finishBlock();
150     writeInlineBlocks(false);
151     newBlock();
152   }
153 
154   /** Clean up the current data block */
155   private void finishBlock() throws IOException {
156     if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
157       return;
158 
159     // Update the first data block offset for scanning.
160     if (firstDataBlockOffset == -1) {
161       firstDataBlockOffset = outputStream.getPos();
162     }
163     // Update the last data block offset
164     lastDataBlockOffset = outputStream.getPos();
165     fsBlockWriter.writeHeaderAndData(outputStream);
166     int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
167 
168     Cell indexEntry =
169       CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
170     dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
171       lastDataBlockOffset, onDiskSize);
172     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
173     if (cacheConf.shouldCacheDataOnWrite()) {
174       doCacheOnWrite(lastDataBlockOffset);
175     }
176   }
177 
178   /** Gives inline block writers an opportunity to contribute blocks. */
179   private void writeInlineBlocks(boolean closing) throws IOException {
180     for (InlineBlockWriter ibw : inlineBlockWriters) {
181       while (ibw.shouldWriteBlock(closing)) {
182         long offset = outputStream.getPos();
183         boolean cacheThisBlock = ibw.getCacheOnWrite();
184         ibw.writeInlineBlock(fsBlockWriter.startWriting(
185             ibw.getInlineBlockType()));
186         fsBlockWriter.writeHeaderAndData(outputStream);
187         ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
188             fsBlockWriter.getUncompressedSizeWithoutHeader());
189         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
190 
191         if (cacheThisBlock) {
192           doCacheOnWrite(offset);
193         }
194       }
195     }
196   }
197 
198   /**
199    * Caches the last written HFile block.
200    * @param offset the offset of the block we want to cache. Used to determine
201    *          the cache key.
202    */
203   private void doCacheOnWrite(long offset) {
204     HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
205     cacheConf.getBlockCache().cacheBlock(
206         new BlockCacheKey(name, offset), cacheFormatBlock);
207   }
208 
209   /**
210    * Ready a new block for writing.
211    *
212    * @throws IOException
213    */
214   protected void newBlock() throws IOException {
215     // This is where the next block begins.
216     fsBlockWriter.startWriting(BlockType.DATA);
217     firstCellInBlock = null;
218     if (lastCell != null) {
219       lastCellOfPreviousBlock = lastCell;
220     }
221   }
222 
223   /**
224    * Add a meta block to the end of the file. Call before close(). Metadata
225    * blocks are expensive. Fill one with a bunch of serialized data rather than
226    * do a metadata block per metadata instance. If metadata is small, consider
227    * adding to file info using {@link #appendFileInfo(byte[], byte[])}
228    *
229    * @param metaBlockName
230    *          name of the block
231    * @param content
232    *          will call readFields to get data later (DO NOT REUSE)
233    */
234   @Override
235   public void appendMetaBlock(String metaBlockName, Writable content) {
236     byte[] key = Bytes.toBytes(metaBlockName);
237     int i;
238     for (i = 0; i < metaNames.size(); ++i) {
239       // stop when the current key is greater than our own
240       byte[] cur = metaNames.get(i);
241       if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
242           key.length) > 0) {
243         break;
244       }
245     }
246     metaNames.add(i, key);
247     metaData.add(i, content);
248   }
249 
250   /**
251    * Add key/value to file. Keys must be added in an order that agrees with the
252    * Comparator passed on construction.
253    *
254    * @param cell Cell to add. Cannot be empty nor null.
255    * @throws IOException
256    */
257   @Override
258   public void append(final Cell cell) throws IOException {
259     byte[] value = cell.getValueArray();
260     int voffset = cell.getValueOffset();
261     int vlength = cell.getValueLength();
262     // checkKey uses comparator to check we are writing in order.
263     boolean dupKey = checkKey(cell);
264     checkValue(value, voffset, vlength);
265     if (!dupKey) {
266       checkBlockBoundary();
267     }
268 
269     if (!fsBlockWriter.isWriting()) {
270       newBlock();
271     }
272 
273     synchronized (HFileWriterV2.class) {
274       if (WARN_CELL_WITH_TAGS && getFileContext().isIncludesTags()) {
275         LOG.warn("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
276           + " is required to support cell attributes/tags. Consider setting "
277           + HFile.FORMAT_VERSION_KEY + " accordingly.");
278         WARN_CELL_WITH_TAGS = false;
279       }
280     }
281 
282     fsBlockWriter.write(cell);
283 
284     totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
285     totalValueLength += vlength;
286 
287     // Are we the first key in this block?
288     if (firstCellInBlock == null) {
289       // If cell is big, block will be closed and this firstCellInBlock reference will only last
290       // a short while.
291       firstCellInBlock = cell;
292     }
293 
294     // TODO: What if cell is 10MB and we write infrequently?  We'll hold on to the cell here
295     // indefinetly?
296     lastCell = cell;
297     entryCount++;
298     this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
299   }
300 
301   @Override
302   public void close() throws IOException {
303     if (outputStream == null) {
304       return;
305     }
306     // Save data block encoder metadata in the file info.
307     blockEncoder.saveMetadata(this);
308     // Write out the end of the data blocks, then write meta data blocks.
309     // followed by fileinfo, data block index and meta block index.
310 
311     finishBlock();
312     writeInlineBlocks(true);
313 
314     FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
315 
316     // Write out the metadata blocks if any.
317     if (!metaNames.isEmpty()) {
318       for (int i = 0; i < metaNames.size(); ++i) {
319         // store the beginning offset
320         long offset = outputStream.getPos();
321         // write the metadata content
322         DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
323         metaData.get(i).write(dos);
324 
325         fsBlockWriter.writeHeaderAndData(outputStream);
326         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
327 
328         // Add the new meta block to the meta index.
329         metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
330             fsBlockWriter.getOnDiskSizeWithHeader());
331       }
332     }
333 
334     // Load-on-open section.
335 
336     // Data block index.
337     //
338     // In version 2, this section of the file starts with the root level data
339     // block index. We call a function that writes intermediate-level blocks
340     // first, then root level, and returns the offset of the root level block
341     // index.
342 
343     long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
344     trailer.setLoadOnOpenOffset(rootIndexOffset);
345 
346     // Meta block index.
347     metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
348         BlockType.ROOT_INDEX), "meta");
349     fsBlockWriter.writeHeaderAndData(outputStream);
350     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
351 
352     if (this.hFileContext.isIncludesMvcc()) {
353       appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
354       appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
355     }
356 
357     // File info
358     writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
359     fsBlockWriter.writeHeaderAndData(outputStream);
360     totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
361 
362     // Load-on-open data supplied by higher levels, e.g. Bloom filters.
363     for (BlockWritable w : additionalLoadOnOpenData){
364       fsBlockWriter.writeBlock(w, outputStream);
365       totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
366     }
367 
368     // Now finish off the trailer.
369     trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
370     trailer.setUncompressedDataIndexSize(
371         dataBlockIndexWriter.getTotalUncompressedSize());
372     trailer.setFirstDataBlockOffset(firstDataBlockOffset);
373     trailer.setLastDataBlockOffset(lastDataBlockOffset);
374     trailer.setComparatorClass(comparator.getClass());
375     trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
376 
377 
378     finishClose(trailer);
379 
380     fsBlockWriter.release();
381   }
382 
383   @Override
384   public void addInlineBlockWriter(InlineBlockWriter ibw) {
385     inlineBlockWriters.add(ibw);
386   }
387 
388   @Override
389   public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
390     this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
391   }
392 
393   @Override
394   public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
395     this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
396   }
397 
398   private void addBloomFilter(final BloomFilterWriter bfw,
399       final BlockType blockType) {
400     if (bfw.getKeyCount() <= 0)
401       return;
402 
403     if (blockType != BlockType.GENERAL_BLOOM_META &&
404         blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
405       throw new RuntimeException("Block Type: " + blockType.toString() +
406           "is not supported");
407     }
408     additionalLoadOnOpenData.add(new BlockWritable() {
409       @Override
410       public BlockType getBlockType() {
411         return blockType;
412       }
413 
414       @Override
415       public void writeToBlock(DataOutput out) throws IOException {
416         bfw.getMetaWriter().write(out);
417         Writable dataWriter = bfw.getDataWriter();
418         if (dataWriter != null)
419           dataWriter.write(out);
420       }
421     });
422   }
423 
424   protected int getMajorVersion() {
425     return 2;
426   }
427 
428   protected int getMinorVersion() {
429     return HFileReaderV2.MAX_MINOR_VERSION;
430   }
431 
432   @Override
433   public HFileContext getFileContext() {
434     return hFileContext;
435   }
436 }