1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNotSame;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.io.IOException;
29 import java.io.UnsupportedEncodingException;
30 import java.util.Arrays;
31 import java.util.HashMap;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.Random;
35 import java.util.Set;
36 import java.util.concurrent.Callable;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FileStatus;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.hbase.CategoryBasedTimeout;
45 import org.apache.hadoop.hbase.Cell;
46 import org.apache.hadoop.hbase.CellUtil;
47 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HBaseTestingUtility;
50 import org.apache.hadoop.hbase.HColumnDescriptor;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.HadoopShims;
54 import org.apache.hadoop.hbase.KeyValue;
55 import org.apache.hadoop.hbase.testclassification.LargeTests;
56 import org.apache.hadoop.hbase.PerformanceEvaluation;
57 import org.apache.hadoop.hbase.TableName;
58 import org.apache.hadoop.hbase.client.Admin;
59 import org.apache.hadoop.hbase.client.Connection;
60 import org.apache.hadoop.hbase.client.ConnectionFactory;
61 import org.apache.hadoop.hbase.client.HBaseAdmin;
62 import org.apache.hadoop.hbase.client.HTable;
63 import org.apache.hadoop.hbase.client.Put;
64 import org.apache.hadoop.hbase.client.RegionLocator;
65 import org.apache.hadoop.hbase.client.Result;
66 import org.apache.hadoop.hbase.client.ResultScanner;
67 import org.apache.hadoop.hbase.client.Scan;
68 import org.apache.hadoop.hbase.client.Table;
69 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
70 import org.apache.hadoop.hbase.io.compress.Compression;
71 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
72 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
73 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
74 import org.apache.hadoop.hbase.io.hfile.HFile;
75 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
76 import org.apache.hadoop.hbase.regionserver.BloomType;
77 import org.apache.hadoop.hbase.regionserver.StoreFile;
78 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
79 import org.apache.hadoop.hbase.util.Bytes;
80 import org.apache.hadoop.hbase.util.FSUtils;
81 import org.apache.hadoop.hbase.util.Threads;
82 import org.apache.hadoop.hbase.util.Writables;
83 import org.apache.hadoop.io.NullWritable;
84 import org.apache.hadoop.mapreduce.Job;
85 import org.apache.hadoop.mapreduce.Mapper;
86 import org.apache.hadoop.mapreduce.RecordWriter;
87 import org.apache.hadoop.mapreduce.TaskAttemptContext;
88 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
89 import org.junit.Ignore;
90 import org.junit.Rule;
91 import org.junit.Test;
92 import org.junit.experimental.categories.Category;
93 import org.junit.rules.TestRule;
94 import org.mockito.Mockito;
95
96
97
98
99
100
101
102 @Category(LargeTests.class)
103 public class TestHFileOutputFormat2 {
104 @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
105 withTimeout(this.getClass()).withLookingForStuckThread(true).build();
106 private final static int ROWSPERSPLIT = 1024;
107
108 private static final byte[][] FAMILIES
109 = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
110 , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
111 private static final TableName TABLE_NAME =
112 TableName.valueOf("TestTable");
113
114 private HBaseTestingUtility util = new HBaseTestingUtility();
115
116 private static final Log LOG = LogFactory.getLog(TestHFileOutputFormat2.class);
117
118
119
120
121 static class RandomKVGeneratingMapper
122 extends Mapper<NullWritable, NullWritable,
123 ImmutableBytesWritable, Cell> {
124
125 private int keyLength;
126 private static final int KEYLEN_DEFAULT=10;
127 private static final String KEYLEN_CONF="randomkv.key.length";
128
129 private int valLength;
130 private static final int VALLEN_DEFAULT=10;
131 private static final String VALLEN_CONF="randomkv.val.length";
132 private static final byte [] QUALIFIER = Bytes.toBytes("data");
133
134 @Override
135 protected void setup(Context context) throws IOException,
136 InterruptedException {
137 super.setup(context);
138
139 Configuration conf = context.getConfiguration();
140 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
141 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
142 }
143
144 @Override
145 protected void map(
146 NullWritable n1, NullWritable n2,
147 Mapper<NullWritable, NullWritable,
148 ImmutableBytesWritable,Cell>.Context context)
149 throws java.io.IOException ,InterruptedException
150 {
151
152 byte keyBytes[] = new byte[keyLength];
153 byte valBytes[] = new byte[valLength];
154
155 int taskId = context.getTaskAttemptID().getTaskID().getId();
156 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
157
158 Random random = new Random();
159 for (int i = 0; i < ROWSPERSPLIT; i++) {
160
161 random.nextBytes(keyBytes);
162
163 keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
164 random.nextBytes(valBytes);
165 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
166
167 for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
168 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
169 context.write(key, kv);
170 }
171 }
172 }
173 }
174
175 private void setupRandomGeneratorMapper(Job job) {
176 job.setInputFormatClass(NMapInputFormat.class);
177 job.setMapperClass(RandomKVGeneratingMapper.class);
178 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
179 job.setMapOutputValueClass(KeyValue.class);
180 }
181
182
183
184
185
186
187 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
188 public void test_LATEST_TIMESTAMP_isReplaced()
189 throws Exception {
190 Configuration conf = new Configuration(this.util.getConfiguration());
191 RecordWriter<ImmutableBytesWritable, Cell> writer = null;
192 TaskAttemptContext context = null;
193 Path dir =
194 util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
195 try {
196 Job job = new Job(conf);
197 FileOutputFormat.setOutputPath(job, dir);
198 context = createTestTaskAttemptContext(job);
199 HFileOutputFormat2 hof = new HFileOutputFormat2();
200 writer = hof.getRecordWriter(context);
201 final byte [] b = Bytes.toBytes("b");
202
203
204
205 KeyValue kv = new KeyValue(b, b, b);
206 KeyValue original = kv.clone();
207 writer.write(new ImmutableBytesWritable(), kv);
208 assertFalse(original.equals(kv));
209 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
210 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
211 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
212 assertNotSame(original.getTimestamp(), kv.getTimestamp());
213 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
214
215
216
217 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
218 original = kv.clone();
219 writer.write(new ImmutableBytesWritable(), kv);
220 assertTrue(original.equals(kv));
221 } finally {
222 if (writer != null && context != null) writer.close(context);
223 dir.getFileSystem(conf).delete(dir, true);
224 }
225 }
226
227 private TaskAttemptContext createTestTaskAttemptContext(final Job job)
228 throws Exception {
229 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
230 TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
231 job, "attempt_201402131733_0001_m_000000_0");
232 return context;
233 }
234
235
236
237
238
239 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
240 public void test_TIMERANGE() throws Exception {
241 Configuration conf = new Configuration(this.util.getConfiguration());
242 RecordWriter<ImmutableBytesWritable, Cell> writer = null;
243 TaskAttemptContext context = null;
244 Path dir =
245 util.getDataTestDir("test_TIMERANGE_present");
246 LOG.info("Timerange dir writing to dir: "+ dir);
247 try {
248
249 Job job = new Job(conf);
250 FileOutputFormat.setOutputPath(job, dir);
251 context = createTestTaskAttemptContext(job);
252 HFileOutputFormat2 hof = new HFileOutputFormat2();
253 writer = hof.getRecordWriter(context);
254
255
256 final byte [] b = Bytes.toBytes("b");
257
258
259 KeyValue kv = new KeyValue(b, b, b, 2000, b);
260 KeyValue original = kv.clone();
261 writer.write(new ImmutableBytesWritable(), kv);
262 assertEquals(original,kv);
263
264
265 kv = new KeyValue(b, b, b, 1000, b);
266 original = kv.clone();
267 writer.write(new ImmutableBytesWritable(), kv);
268 assertEquals(original, kv);
269
270
271 writer.close(context);
272
273
274
275
276 FileSystem fs = FileSystem.get(conf);
277 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
278 FileStatus[] sub1 = fs.listStatus(attemptDirectory);
279 FileStatus[] file = fs.listStatus(sub1[0].getPath());
280
281
282 HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
283 new CacheConfig(conf), conf);
284 Map<byte[],byte[]> finfo = rd.loadFileInfo();
285 byte[] range = finfo.get("TIMERANGE".getBytes());
286 assertNotNull(range);
287
288
289 TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
290 Writables.copyWritable(range, timeRangeTracker);
291 LOG.info(timeRangeTracker.getMinimumTimestamp() +
292 "...." + timeRangeTracker.getMaximumTimestamp());
293 assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
294 assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
295 rd.close();
296 } finally {
297 if (writer != null && context != null) writer.close(context);
298 dir.getFileSystem(conf).delete(dir, true);
299 }
300 }
301
302
303
304
305 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
306 public void testWritingPEData() throws Exception {
307 Configuration conf = util.getConfiguration();
308 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
309 FileSystem fs = testDir.getFileSystem(conf);
310
311
312 conf.setInt("mapreduce.task.io.sort.mb", 20);
313
314 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
315
316 Job job = new Job(conf, "testWritingPEData");
317 setupRandomGeneratorMapper(job);
318
319
320 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
321 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
322
323 Arrays.fill(startKey, (byte)0);
324 Arrays.fill(endKey, (byte)0xff);
325
326 job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
327
328 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
329 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
330 job.setReducerClass(KeyValueSortReducer.class);
331 job.setOutputFormatClass(HFileOutputFormat2.class);
332 job.setNumReduceTasks(4);
333 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
334 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
335 KeyValueSerialization.class.getName());
336
337 FileOutputFormat.setOutputPath(job, testDir);
338 assertTrue(job.waitForCompletion(false));
339 FileStatus [] files = fs.listStatus(testDir);
340 assertTrue(files.length > 0);
341 }
342
343 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
344 public void testJobConfiguration() throws Exception {
345 Configuration conf = new Configuration(this.util.getConfiguration());
346 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
347 .toString());
348 Job job = new Job(conf);
349 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
350 RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
351 setupMockStartKeys(regionLocator);
352 HFileOutputFormat2.configureIncrementalLoad(job, new HTableDescriptor(), regionLocator);
353 assertEquals(job.getNumReduceTasks(), 4);
354 }
355
356 private byte [][] generateRandomStartKeys(int numKeys) {
357 Random random = new Random();
358 byte[][] ret = new byte[numKeys][];
359
360 ret[0] = HConstants.EMPTY_BYTE_ARRAY;
361 for (int i = 1; i < numKeys; i++) {
362 ret[i] =
363 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
364 }
365 return ret;
366 }
367
368 private byte[][] generateRandomSplitKeys(int numKeys) {
369 Random random = new Random();
370 byte[][] ret = new byte[numKeys][];
371 for (int i = 0; i < numKeys; i++) {
372 ret[i] =
373 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
374 }
375 return ret;
376 }
377
378 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
379 public void testMRIncrementalLoad() throws Exception {
380 LOG.info("\nStarting test testMRIncrementalLoad\n");
381 doIncrementalLoadTest(false);
382 }
383
384 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
385 public void testMRIncrementalLoadWithSplit() throws Exception {
386 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
387 doIncrementalLoadTest(true);
388 }
389
390
391
392
393
394
395
396
397
398 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
399 public void testMRIncrementalLoadWithLocality() throws Exception {
400 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
401 doIncrementalLoadTest(false);
402 doIncrementalLoadTest(true);
403 }
404
405 private void doIncrementalLoadTest(
406 boolean shouldChangeRegions) throws Exception {
407 util = new HBaseTestingUtility();
408 Configuration conf = util.getConfiguration();
409 byte[][] splitKeys = generateRandomSplitKeys(4);
410 util.setJobWithoutMRCluster();
411 util.startMiniCluster();
412 try {
413 HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
414 Admin admin = table.getConnection().getAdmin();
415 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
416 assertEquals("Should start with empty table",
417 0, util.countRows(table));
418 int numRegions = -1;
419 try (RegionLocator r = table.getRegionLocator()) {
420 numRegions = r.getStartKeys().length;
421 }
422 assertEquals("Should make 5 regions", numRegions, 5);
423
424 runIncrementalPELoad(conf, table.getTableDescriptor(), table.getRegionLocator(), testDir);
425
426 assertEquals("HFOF should not touch actual table",
427 0, util.countRows(table));
428
429
430
431 int dir = 0;
432 for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
433 for (byte[] family : FAMILIES) {
434 if (Bytes.toString(family).equals(f.getPath().getName())) {
435 ++dir;
436 }
437 }
438 }
439 assertEquals("Column family not found in FS.", FAMILIES.length, dir);
440
441
442 if (shouldChangeRegions) {
443 LOG.info("Changing regions in table");
444 admin.disableTable(table.getName());
445 while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
446 getRegionStates().isRegionsInTransition()) {
447 Threads.sleep(200);
448 LOG.info("Waiting on table to finish disabling");
449 }
450 util.deleteTable(table.getName());
451 byte[][] newSplitKeys = generateRandomSplitKeys(14);
452 table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
453
454 while (table.getRegionLocator().getAllRegionLocations().size() != 15 ||
455 !admin.isTableAvailable(table.getName())) {
456 Thread.sleep(200);
457 LOG.info("Waiting for new region assignment to happen");
458 }
459 }
460
461
462 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
463
464
465 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
466 assertEquals("LoadIncrementalHFiles should put expected data in table",
467 expectedRows, util.countRows(table));
468 Scan scan = new Scan();
469 ResultScanner results = table.getScanner(scan);
470 for (Result res : results) {
471 assertEquals(FAMILIES.length, res.rawCells().length);
472 Cell first = res.rawCells()[0];
473 for (Cell kv : res.rawCells()) {
474 assertTrue(CellUtil.matchingRow(first, kv));
475 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
476 }
477 }
478 results.close();
479 String tableDigestBefore = util.checksumRows(table);
480
481
482 admin.disableTable(TABLE_NAME);
483 while (!admin.isTableDisabled(TABLE_NAME)) {
484 Thread.sleep(200);
485 LOG.info("Waiting for table to disable");
486 }
487 admin.enableTable(TABLE_NAME);
488 util.waitTableAvailable(TABLE_NAME);
489 assertEquals("Data should remain after reopening of regions",
490 tableDigestBefore, util.checksumRows(table));
491 } finally {
492 util.shutdownMiniCluster();
493 }
494 }
495
496 private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
497 RegionLocator regionLocator, Path outDir) throws IOException, UnsupportedEncodingException,
498 InterruptedException, ClassNotFoundException {
499 Job job = new Job(conf, "testLocalMRIncrementalLoad");
500 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
501 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
502 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
503 KeyValueSerialization.class.getName());
504 setupRandomGeneratorMapper(job);
505 HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
506 FileOutputFormat.setOutputPath(job, outDir);
507
508 assertFalse(util.getTestFileSystem().exists(outDir)) ;
509
510 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
511
512 assertTrue(job.waitForCompletion(true));
513 }
514
515
516
517
518
519
520
521
522
523
524 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
525 public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
526 for (int numCfs = 0; numCfs <= 3; numCfs++) {
527 Configuration conf = new Configuration(this.util.getConfiguration());
528 Map<String, Compression.Algorithm> familyToCompression =
529 getMockColumnFamiliesForCompression(numCfs);
530 Table table = Mockito.mock(HTable.class);
531 setupMockColumnFamiliesForCompression(table, familyToCompression);
532 HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor());
533
534
535 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
536 .createFamilyCompressionMap(conf);
537
538
539
540 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
541 assertEquals("Compression configuration incorrect for column family:"
542 + entry.getKey(), entry.getValue(),
543 retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
544 }
545 }
546 }
547
548 private void setupMockColumnFamiliesForCompression(Table table,
549 Map<String, Compression.Algorithm> familyToCompression) throws IOException {
550 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
551 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
552 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
553 .setMaxVersions(1)
554 .setCompressionType(entry.getValue())
555 .setBlockCacheEnabled(false)
556 .setTimeToLive(0));
557 }
558 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
559 }
560
561
562
563
564
565 private Map<String, Compression.Algorithm>
566 getMockColumnFamiliesForCompression (int numCfs) {
567 Map<String, Compression.Algorithm> familyToCompression
568 = new HashMap<String, Compression.Algorithm>();
569
570 if (numCfs-- > 0) {
571 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
572 }
573 if (numCfs-- > 0) {
574 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
575 }
576 if (numCfs-- > 0) {
577 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
578 }
579 if (numCfs-- > 0) {
580 familyToCompression.put("Family3", Compression.Algorithm.NONE);
581 }
582 return familyToCompression;
583 }
584
585
586
587
588
589
590
591
592
593
594
595 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
596 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
597 for (int numCfs = 0; numCfs <= 2; numCfs++) {
598 Configuration conf = new Configuration(this.util.getConfiguration());
599 Map<String, BloomType> familyToBloomType =
600 getMockColumnFamiliesForBloomType(numCfs);
601 Table table = Mockito.mock(HTable.class);
602 setupMockColumnFamiliesForBloomType(table,
603 familyToBloomType);
604 HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf);
605
606
607
608 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
609 HFileOutputFormat2
610 .createFamilyBloomTypeMap(conf);
611
612
613
614 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
615 assertEquals("BloomType configuration incorrect for column family:"
616 + entry.getKey(), entry.getValue(),
617 retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
618 }
619 }
620 }
621
622 private void setupMockColumnFamiliesForBloomType(Table table,
623 Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
624 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
625 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
626 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
627 .setMaxVersions(1)
628 .setBloomFilterType(entry.getValue())
629 .setBlockCacheEnabled(false)
630 .setTimeToLive(0));
631 }
632 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
633 }
634
635
636
637
638
639 private Map<String, BloomType>
640 getMockColumnFamiliesForBloomType (int numCfs) {
641 Map<String, BloomType> familyToBloomType =
642 new HashMap<String, BloomType>();
643
644 if (numCfs-- > 0) {
645 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
646 }
647 if (numCfs-- > 0) {
648 familyToBloomType.put("Family2=asdads&!AASD",
649 BloomType.ROWCOL);
650 }
651 if (numCfs-- > 0) {
652 familyToBloomType.put("Family3", BloomType.NONE);
653 }
654 return familyToBloomType;
655 }
656
657
658
659
660
661
662
663
664
665
666 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
667 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
668 for (int numCfs = 0; numCfs <= 3; numCfs++) {
669 Configuration conf = new Configuration(this.util.getConfiguration());
670 Map<String, Integer> familyToBlockSize =
671 getMockColumnFamiliesForBlockSize(numCfs);
672 Table table = Mockito.mock(HTable.class);
673 setupMockColumnFamiliesForBlockSize(table,
674 familyToBlockSize);
675 HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf);
676
677
678
679 Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
680 HFileOutputFormat2
681 .createFamilyBlockSizeMap(conf);
682
683
684
685 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
686 ) {
687 assertEquals("BlockSize configuration incorrect for column family:"
688 + entry.getKey(), entry.getValue(),
689 retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
690 }
691 }
692 }
693
694 private void setupMockColumnFamiliesForBlockSize(Table table,
695 Map<String, Integer> familyToDataBlockEncoding) throws IOException {
696 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
697 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
698 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
699 .setMaxVersions(1)
700 .setBlocksize(entry.getValue())
701 .setBlockCacheEnabled(false)
702 .setTimeToLive(0));
703 }
704 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
705 }
706
707
708
709
710
711 private Map<String, Integer>
712 getMockColumnFamiliesForBlockSize (int numCfs) {
713 Map<String, Integer> familyToBlockSize =
714 new HashMap<String, Integer>();
715
716 if (numCfs-- > 0) {
717 familyToBlockSize.put("Family1!@#!@#&", 1234);
718 }
719 if (numCfs-- > 0) {
720 familyToBlockSize.put("Family2=asdads&!AASD",
721 Integer.MAX_VALUE);
722 }
723 if (numCfs-- > 0) {
724 familyToBlockSize.put("Family2=asdads&!AASD",
725 Integer.MAX_VALUE);
726 }
727 if (numCfs-- > 0) {
728 familyToBlockSize.put("Family3", 0);
729 }
730 return familyToBlockSize;
731 }
732
733
734
735
736
737
738
739
740
741 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
742 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
743 for (int numCfs = 0; numCfs <= 3; numCfs++) {
744 Configuration conf = new Configuration(this.util.getConfiguration());
745 Map<String, DataBlockEncoding> familyToDataBlockEncoding =
746 getMockColumnFamiliesForDataBlockEncoding(numCfs);
747 Table table = Mockito.mock(HTable.class);
748 setupMockColumnFamiliesForDataBlockEncoding(table,
749 familyToDataBlockEncoding);
750 HTableDescriptor tableDescriptor = table.getTableDescriptor();
751 HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
752
753
754
755 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
756 HFileOutputFormat2
757 .createFamilyDataBlockEncodingMap(conf);
758
759
760
761 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
762 assertEquals("DataBlockEncoding configuration incorrect for column family:"
763 + entry.getKey(), entry.getValue(),
764 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
765 }
766 }
767 }
768
769 private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
770 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
771 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
772 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
773 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
774 .setMaxVersions(1)
775 .setDataBlockEncoding(entry.getValue())
776 .setBlockCacheEnabled(false)
777 .setTimeToLive(0));
778 }
779 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
780 }
781
782
783
784
785
786 private Map<String, DataBlockEncoding>
787 getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
788 Map<String, DataBlockEncoding> familyToDataBlockEncoding =
789 new HashMap<String, DataBlockEncoding>();
790
791 if (numCfs-- > 0) {
792 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
793 }
794 if (numCfs-- > 0) {
795 familyToDataBlockEncoding.put("Family2=asdads&!AASD",
796 DataBlockEncoding.FAST_DIFF);
797 }
798 if (numCfs-- > 0) {
799 familyToDataBlockEncoding.put("Family2=asdads&!AASD",
800 DataBlockEncoding.PREFIX);
801 }
802 if (numCfs-- > 0) {
803 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
804 }
805 return familyToDataBlockEncoding;
806 }
807
808 private void setupMockStartKeys(RegionLocator table) throws IOException {
809 byte[][] mockKeys = new byte[][] {
810 HConstants.EMPTY_BYTE_ARRAY,
811 Bytes.toBytes("aaa"),
812 Bytes.toBytes("ggg"),
813 Bytes.toBytes("zzz")
814 };
815 Mockito.doReturn(mockKeys).when(table).getStartKeys();
816 }
817
818
819
820
821
822 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
823 public void testColumnFamilySettings() throws Exception {
824 Configuration conf = new Configuration(this.util.getConfiguration());
825 RecordWriter<ImmutableBytesWritable, Cell> writer = null;
826 TaskAttemptContext context = null;
827 Path dir = util.getDataTestDir("testColumnFamilySettings");
828
829
830 Table table = Mockito.mock(Table.class);
831 RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
832 HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
833 Mockito.doReturn(htd).when(table).getTableDescriptor();
834 for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
835 htd.addFamily(hcd);
836 }
837
838
839 setupMockStartKeys(regionLocator);
840
841 try {
842
843
844
845 conf.set("io.seqfile.compression.type", "NONE");
846 conf.set("hbase.fs.tmp.dir", dir.toString());
847 Job job = new Job(conf, "testLocalMRIncrementalLoad");
848 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
849 setupRandomGeneratorMapper(job);
850 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
851 FileOutputFormat.setOutputPath(job, dir);
852 context = createTestTaskAttemptContext(job);
853 HFileOutputFormat2 hof = new HFileOutputFormat2();
854 writer = hof.getRecordWriter(context);
855
856
857 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
858 writer.close(context);
859
860
861 FileSystem fs = dir.getFileSystem(conf);
862
863
864 hof.getOutputCommitter(context).commitTask(context);
865 hof.getOutputCommitter(context).commitJob(context);
866 FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
867 assertEquals(htd.getFamilies().size(), families.length);
868 for (FileStatus f : families) {
869 String familyStr = f.getPath().getName();
870 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
871
872
873 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
874 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), conf);
875 Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
876
877 byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
878 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
879 assertEquals("Incorrect bloom filter used for column family " + familyStr +
880 "(reader: " + reader + ")",
881 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
882 assertEquals("Incorrect compression used for column family " + familyStr +
883 "(reader: " + reader + ")", hcd.getCompression(), reader.getFileContext().getCompression());
884 }
885 } finally {
886 dir.getFileSystem(conf).delete(dir, true);
887 }
888 }
889
890
891
892
893
894 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
895 TaskAttemptContext context, Set<byte[]> families, int numRows)
896 throws IOException, InterruptedException {
897 byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
898 int valLength = 10;
899 byte valBytes[] = new byte[valLength];
900
901 int taskId = context.getTaskAttemptID().getTaskID().getId();
902 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
903 final byte [] qualifier = Bytes.toBytes("data");
904 Random random = new Random();
905 for (int i = 0; i < numRows; i++) {
906
907 Bytes.putInt(keyBytes, 0, i);
908 random.nextBytes(valBytes);
909 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
910
911 for (byte[] family : families) {
912 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
913 writer.write(key, kv);
914 }
915 }
916 }
917
918
919
920
921
922
923
924 @Ignore ("Flakey: See HBASE-9051") @Test
925 public void testExcludeAllFromMinorCompaction() throws Exception {
926 Configuration conf = util.getConfiguration();
927 conf.setInt("hbase.hstore.compaction.min", 2);
928 generateRandomStartKeys(5);
929 util.setJobWithoutMRCluster();
930 util.startMiniCluster();
931 try (Connection conn = ConnectionFactory.createConnection();
932 Admin admin = conn.getAdmin()) {
933 final FileSystem fs = util.getDFSCluster().getFileSystem();
934 HTable table = util.createTable(TABLE_NAME, FAMILIES);
935 assertEquals("Should start with empty table", 0, util.countRows(table));
936
937
938 final Path storePath = new Path(
939 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
940 new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
941 Bytes.toString(FAMILIES[0])));
942 assertEquals(0, fs.listStatus(storePath).length);
943
944
945 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
946 true);
947
948 for (int i = 0; i < 2; i++) {
949 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
950 runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
951 testDir);
952
953 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
954 }
955
956
957 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
958 assertEquals("LoadIncrementalHFiles should put expected data in table",
959 expectedRows, util.countRows(table));
960
961
962 assertEquals(2, fs.listStatus(storePath).length);
963
964
965 admin.compact(TABLE_NAME);
966 try {
967 quickPoll(new Callable<Boolean>() {
968 @Override
969 public Boolean call() throws Exception {
970 return fs.listStatus(storePath).length == 1;
971 }
972 }, 5000);
973 throw new IOException("SF# = " + fs.listStatus(storePath).length);
974 } catch (AssertionError ae) {
975
976 }
977
978
979 admin.majorCompact(TABLE_NAME);
980 quickPoll(new Callable<Boolean>() {
981 @Override
982 public Boolean call() throws Exception {
983 return fs.listStatus(storePath).length == 1;
984 }
985 }, 5000);
986
987 } finally {
988 util.shutdownMiniCluster();
989 }
990 }
991
992 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
993 public void testExcludeMinorCompaction() throws Exception {
994 Configuration conf = util.getConfiguration();
995 conf.setInt("hbase.hstore.compaction.min", 2);
996 generateRandomStartKeys(5);
997 util.setJobWithoutMRCluster();
998 util.startMiniCluster();
999 try (Connection conn = ConnectionFactory.createConnection(conf);
1000 Admin admin = conn.getAdmin()){
1001 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1002 final FileSystem fs = util.getDFSCluster().getFileSystem();
1003 Table table = util.createTable(TABLE_NAME, FAMILIES);
1004 assertEquals("Should start with empty table", 0, util.countRows(table));
1005
1006
1007 final Path storePath = new Path(
1008 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
1009 new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
1010 Bytes.toString(FAMILIES[0])));
1011 assertEquals(0, fs.listStatus(storePath).length);
1012
1013
1014 Put p = new Put(Bytes.toBytes("test"));
1015 p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1016 table.put(p);
1017 admin.flush(TABLE_NAME);
1018 assertEquals(1, util.countRows(table));
1019 quickPoll(new Callable<Boolean>() {
1020 @Override
1021 public Boolean call() throws Exception {
1022 return fs.listStatus(storePath).length == 1;
1023 }
1024 }, 5000);
1025
1026
1027 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1028 true);
1029
1030 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
1031 runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir);
1032
1033
1034 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1035
1036
1037 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1038 assertEquals("LoadIncrementalHFiles should put expected data in table",
1039 expectedRows + 1, util.countRows(table));
1040
1041
1042 assertEquals(2, fs.listStatus(storePath).length);
1043
1044
1045 admin.compact(TABLE_NAME);
1046 try {
1047 quickPoll(new Callable<Boolean>() {
1048 @Override
1049 public Boolean call() throws Exception {
1050 return fs.listStatus(storePath).length == 1;
1051 }
1052 }, 5000);
1053 throw new IOException("SF# = " + fs.listStatus(storePath).length);
1054 } catch (AssertionError ae) {
1055
1056 }
1057
1058
1059 admin.majorCompact(TABLE_NAME);
1060 quickPoll(new Callable<Boolean>() {
1061 @Override
1062 public Boolean call() throws Exception {
1063 return fs.listStatus(storePath).length == 1;
1064 }
1065 }, 5000);
1066
1067 } finally {
1068 util.shutdownMiniCluster();
1069 }
1070 }
1071
1072 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1073 int sleepMs = 10;
1074 int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1075 while (retries-- > 0) {
1076 if (c.call().booleanValue()) {
1077 return;
1078 }
1079 Thread.sleep(sleepMs);
1080 }
1081 fail();
1082 }
1083
1084 public static void main(String args[]) throws Exception {
1085 new TestHFileOutputFormat2().manualTest(args);
1086 }
1087
1088 public void manualTest(String args[]) throws Exception {
1089 Configuration conf = HBaseConfiguration.create();
1090 util = new HBaseTestingUtility(conf);
1091 if ("newtable".equals(args[0])) {
1092 TableName tname = TableName.valueOf(args[1]);
1093 byte[][] splitKeys = generateRandomSplitKeys(4);
1094 try (HTable table = util.createTable(tname, FAMILIES, splitKeys)) {
1095 }
1096 } else if ("incremental".equals(args[0])) {
1097 TableName tname = TableName.valueOf(args[1]);
1098 try(Connection c = ConnectionFactory.createConnection(conf);
1099 Admin admin = c.getAdmin();
1100 RegionLocator regionLocator = c.getRegionLocator(tname)) {
1101 Path outDir = new Path("incremental-out");
1102 runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir);
1103 }
1104 } else {
1105 throw new RuntimeException(
1106 "usage: TestHFileOutputFormat2 newtable | incremental");
1107 }
1108 }
1109
1110 }
1111