1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.CellComparator;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.KeyValue.KVComparator;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
40 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
41 import org.apache.hadoop.hbase.util.BloomFilterWriter;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.io.Writable;
44
45
46
47
48 @InterfaceAudience.Private
49 public class HFileWriterV2 extends AbstractHFileWriter {
50 static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
51
52
53 public static final byte [] MAX_MEMSTORE_TS_KEY =
54 Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
55
56
57 public static final byte [] KEY_VALUE_VERSION =
58 Bytes.toBytes("KEY_VALUE_VERSION");
59
60
61 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
62
63
64 private List<InlineBlockWriter> inlineBlockWriters =
65 new ArrayList<InlineBlockWriter>();
66
67
68 protected HFileBlock.Writer fsBlockWriter;
69
70 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
71 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
72
73
74 private long firstDataBlockOffset = -1;
75
76
77 protected long lastDataBlockOffset;
78
79
80
81
82
83 private Cell lastCellOfPreviousBlock = null;
84
85
86 private List<BlockWritable> additionalLoadOnOpenData =
87 new ArrayList<BlockWritable>();
88
89 protected long maxMemstoreTS = 0;
90
91
92 private static boolean WARN_CELL_WITH_TAGS = true;
93
94 static class WriterFactoryV2 extends HFile.WriterFactory {
95 WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
96 super(conf, cacheConf);
97 }
98
99 @Override
100 public Writer createWriter(FileSystem fs, Path path,
101 FSDataOutputStream ostream,
102 KVComparator comparator, HFileContext context) throws IOException {
103 context.setIncludesTags(false);
104 return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
105 comparator, context);
106 }
107 }
108
109
110 public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
111 FileSystem fs, Path path, FSDataOutputStream ostream,
112 final KVComparator comparator, final HFileContext context) throws IOException {
113 super(cacheConf,
114 ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
115 path, comparator, context);
116 finishInit(conf);
117 }
118
119
120 protected void finishInit(final Configuration conf) {
121 if (fsBlockWriter != null)
122 throw new IllegalStateException("finishInit called twice");
123
124 fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
125
126
127 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
128 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
129 cacheIndexesOnWrite ? cacheConf : null,
130 cacheIndexesOnWrite ? name : null);
131 dataBlockIndexWriter.setMaxChunkSize(
132 HFileBlockIndex.getMaxChunkSize(conf));
133 inlineBlockWriters.add(dataBlockIndexWriter);
134
135
136 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
137 if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
138 }
139
140
141
142
143
144
145 protected void checkBlockBoundary() throws IOException {
146 if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
147 return;
148
149 finishBlock();
150 writeInlineBlocks(false);
151 newBlock();
152 }
153
154
155 private void finishBlock() throws IOException {
156 if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
157 return;
158
159
160 if (firstDataBlockOffset == -1) {
161 firstDataBlockOffset = outputStream.getPos();
162 }
163
164 lastDataBlockOffset = outputStream.getPos();
165 fsBlockWriter.writeHeaderAndData(outputStream);
166 int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
167
168 Cell indexEntry =
169 CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
170 dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
171 lastDataBlockOffset, onDiskSize);
172 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
173 if (cacheConf.shouldCacheDataOnWrite()) {
174 doCacheOnWrite(lastDataBlockOffset);
175 }
176 }
177
178
179 private void writeInlineBlocks(boolean closing) throws IOException {
180 for (InlineBlockWriter ibw : inlineBlockWriters) {
181 while (ibw.shouldWriteBlock(closing)) {
182 long offset = outputStream.getPos();
183 boolean cacheThisBlock = ibw.getCacheOnWrite();
184 ibw.writeInlineBlock(fsBlockWriter.startWriting(
185 ibw.getInlineBlockType()));
186 fsBlockWriter.writeHeaderAndData(outputStream);
187 ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
188 fsBlockWriter.getUncompressedSizeWithoutHeader());
189 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
190
191 if (cacheThisBlock) {
192 doCacheOnWrite(offset);
193 }
194 }
195 }
196 }
197
198
199
200
201
202
203 private void doCacheOnWrite(long offset) {
204 HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
205 cacheConf.getBlockCache().cacheBlock(
206 new BlockCacheKey(name, offset), cacheFormatBlock);
207 }
208
209
210
211
212
213
214 protected void newBlock() throws IOException {
215
216 fsBlockWriter.startWriting(BlockType.DATA);
217 firstCellInBlock = null;
218 if (lastCell != null) {
219 lastCellOfPreviousBlock = lastCell;
220 }
221 }
222
223
224
225
226
227
228
229
230
231
232
233
234 @Override
235 public void appendMetaBlock(String metaBlockName, Writable content) {
236 byte[] key = Bytes.toBytes(metaBlockName);
237 int i;
238 for (i = 0; i < metaNames.size(); ++i) {
239
240 byte[] cur = metaNames.get(i);
241 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
242 key.length) > 0) {
243 break;
244 }
245 }
246 metaNames.add(i, key);
247 metaData.add(i, content);
248 }
249
250
251
252
253
254
255
256
257 @Override
258 public void append(final Cell cell) throws IOException {
259 byte[] value = cell.getValueArray();
260 int voffset = cell.getValueOffset();
261 int vlength = cell.getValueLength();
262
263 boolean dupKey = checkKey(cell);
264 checkValue(value, voffset, vlength);
265 if (!dupKey) {
266 checkBlockBoundary();
267 }
268
269 if (!fsBlockWriter.isWriting()) {
270 newBlock();
271 }
272
273 synchronized (HFileWriterV2.class) {
274 if (WARN_CELL_WITH_TAGS && getFileContext().isIncludesTags()) {
275 LOG.warn("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
276 + " is required to support cell attributes/tags. Consider setting "
277 + HFile.FORMAT_VERSION_KEY + " accordingly.");
278 WARN_CELL_WITH_TAGS = false;
279 }
280 }
281
282 fsBlockWriter.write(cell);
283
284 totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
285 totalValueLength += vlength;
286
287
288 if (firstCellInBlock == null) {
289
290
291 firstCellInBlock = cell;
292 }
293
294
295
296 lastCell = cell;
297 entryCount++;
298 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
299 }
300
301 @Override
302 public void close() throws IOException {
303 if (outputStream == null) {
304 return;
305 }
306
307 blockEncoder.saveMetadata(this);
308
309
310
311 finishBlock();
312 writeInlineBlocks(true);
313
314 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
315
316
317 if (!metaNames.isEmpty()) {
318 for (int i = 0; i < metaNames.size(); ++i) {
319
320 long offset = outputStream.getPos();
321
322 DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
323 metaData.get(i).write(dos);
324
325 fsBlockWriter.writeHeaderAndData(outputStream);
326 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
327
328
329 metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
330 fsBlockWriter.getOnDiskSizeWithHeader());
331 }
332 }
333
334
335
336
337
338
339
340
341
342
343 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
344 trailer.setLoadOnOpenOffset(rootIndexOffset);
345
346
347 metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
348 BlockType.ROOT_INDEX), "meta");
349 fsBlockWriter.writeHeaderAndData(outputStream);
350 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
351
352 if (this.hFileContext.isIncludesMvcc()) {
353 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
354 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
355 }
356
357
358 writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
359 fsBlockWriter.writeHeaderAndData(outputStream);
360 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
361
362
363 for (BlockWritable w : additionalLoadOnOpenData){
364 fsBlockWriter.writeBlock(w, outputStream);
365 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
366 }
367
368
369 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
370 trailer.setUncompressedDataIndexSize(
371 dataBlockIndexWriter.getTotalUncompressedSize());
372 trailer.setFirstDataBlockOffset(firstDataBlockOffset);
373 trailer.setLastDataBlockOffset(lastDataBlockOffset);
374 trailer.setComparatorClass(comparator.getClass());
375 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
376
377
378 finishClose(trailer);
379
380 fsBlockWriter.release();
381 }
382
383 @Override
384 public void addInlineBlockWriter(InlineBlockWriter ibw) {
385 inlineBlockWriters.add(ibw);
386 }
387
388 @Override
389 public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
390 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
391 }
392
393 @Override
394 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
395 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
396 }
397
398 private void addBloomFilter(final BloomFilterWriter bfw,
399 final BlockType blockType) {
400 if (bfw.getKeyCount() <= 0)
401 return;
402
403 if (blockType != BlockType.GENERAL_BLOOM_META &&
404 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
405 throw new RuntimeException("Block Type: " + blockType.toString() +
406 "is not supported");
407 }
408 additionalLoadOnOpenData.add(new BlockWritable() {
409 @Override
410 public BlockType getBlockType() {
411 return blockType;
412 }
413
414 @Override
415 public void writeToBlock(DataOutput out) throws IOException {
416 bfw.getMetaWriter().write(out);
417 Writable dataWriter = bfw.getDataWriter();
418 if (dataWriter != null)
419 dataWriter.write(out);
420 }
421 });
422 }
423
424 protected int getMajorVersion() {
425 return 2;
426 }
427
428 protected int getMinorVersion() {
429 return HFileReaderV2.MAX_MINOR_VERSION;
430 }
431
432 @Override
433 public HFileContext getFileContext() {
434 return hFileContext;
435 }
436 }