View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNotSame;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.io.IOException;
29  import java.io.UnsupportedEncodingException;
30  import java.util.Arrays;
31  import java.util.HashMap;
32  import java.util.Map;
33  import java.util.Map.Entry;
34  import java.util.Random;
35  import java.util.Set;
36  import java.util.concurrent.Callable;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.FileStatus;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.CategoryBasedTimeout;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HBaseTestingUtility;
50  import org.apache.hadoop.hbase.HColumnDescriptor;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.HadoopShims;
54  import org.apache.hadoop.hbase.KeyValue;
55  import org.apache.hadoop.hbase.testclassification.LargeTests;
56  import org.apache.hadoop.hbase.PerformanceEvaluation;
57  import org.apache.hadoop.hbase.TableName;
58  import org.apache.hadoop.hbase.client.Admin;
59  import org.apache.hadoop.hbase.client.Connection;
60  import org.apache.hadoop.hbase.client.ConnectionFactory;
61  import org.apache.hadoop.hbase.client.HBaseAdmin;
62  import org.apache.hadoop.hbase.client.HTable;
63  import org.apache.hadoop.hbase.client.Put;
64  import org.apache.hadoop.hbase.client.RegionLocator;
65  import org.apache.hadoop.hbase.client.Result;
66  import org.apache.hadoop.hbase.client.ResultScanner;
67  import org.apache.hadoop.hbase.client.Scan;
68  import org.apache.hadoop.hbase.client.Table;
69  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
70  import org.apache.hadoop.hbase.io.compress.Compression;
71  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
72  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
73  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
74  import org.apache.hadoop.hbase.io.hfile.HFile;
75  import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
76  import org.apache.hadoop.hbase.regionserver.BloomType;
77  import org.apache.hadoop.hbase.regionserver.StoreFile;
78  import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
79  import org.apache.hadoop.hbase.util.Bytes;
80  import org.apache.hadoop.hbase.util.FSUtils;
81  import org.apache.hadoop.hbase.util.Threads;
82  import org.apache.hadoop.hbase.util.Writables;
83  import org.apache.hadoop.io.NullWritable;
84  import org.apache.hadoop.mapreduce.Job;
85  import org.apache.hadoop.mapreduce.Mapper;
86  import org.apache.hadoop.mapreduce.RecordWriter;
87  import org.apache.hadoop.mapreduce.TaskAttemptContext;
88  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
89  import org.junit.Ignore;
90  import org.junit.Rule;
91  import org.junit.Test;
92  import org.junit.experimental.categories.Category;
93  import org.junit.rules.TestRule;
94  import org.mockito.Mockito;
95  
96  /**
97   * Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
98   * Sets up and runs a mapreduce job that writes hfile output.
99   * Creates a few inner classes to implement splits and an inputformat that
100  * emits keys and values like those of {@link PerformanceEvaluation}.
101  */
102 @Category(LargeTests.class)
103 public class TestHFileOutputFormat2  {
104   @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
105       withTimeout(this.getClass()).withLookingForStuckThread(true).build();
106   private final static int ROWSPERSPLIT = 1024;
107 
108   private static final byte[][] FAMILIES
109     = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
110       , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
111   private static final TableName TABLE_NAME =
112       TableName.valueOf("TestTable");
113 
114   private HBaseTestingUtility util = new HBaseTestingUtility();
115 
116   private static final Log LOG = LogFactory.getLog(TestHFileOutputFormat2.class);
117 
118   /**
119    * Simple mapper that makes KeyValue output.
120    */
121   static class RandomKVGeneratingMapper
122       extends Mapper<NullWritable, NullWritable,
123                  ImmutableBytesWritable, Cell> {
124 
125     private int keyLength;
126     private static final int KEYLEN_DEFAULT=10;
127     private static final String KEYLEN_CONF="randomkv.key.length";
128 
129     private int valLength;
130     private static final int VALLEN_DEFAULT=10;
131     private static final String VALLEN_CONF="randomkv.val.length";
132     private static final byte [] QUALIFIER = Bytes.toBytes("data");
133 
134     @Override
135     protected void setup(Context context) throws IOException,
136         InterruptedException {
137       super.setup(context);
138 
139       Configuration conf = context.getConfiguration();
140       keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
141       valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
142     }
143 
144     @Override
145     protected void map(
146         NullWritable n1, NullWritable n2,
147         Mapper<NullWritable, NullWritable,
148                ImmutableBytesWritable,Cell>.Context context)
149         throws java.io.IOException ,InterruptedException
150     {
151 
152       byte keyBytes[] = new byte[keyLength];
153       byte valBytes[] = new byte[valLength];
154 
155       int taskId = context.getTaskAttemptID().getTaskID().getId();
156       assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
157 
158       Random random = new Random();
159       for (int i = 0; i < ROWSPERSPLIT; i++) {
160 
161         random.nextBytes(keyBytes);
162         // Ensure that unique tasks generate unique keys
163         keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
164         random.nextBytes(valBytes);
165         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
166 
167         for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
168           Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
169           context.write(key, kv);
170         }
171       }
172     }
173   }
174 
175   private void setupRandomGeneratorMapper(Job job) {
176     job.setInputFormatClass(NMapInputFormat.class);
177     job.setMapperClass(RandomKVGeneratingMapper.class);
178     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
179     job.setMapOutputValueClass(KeyValue.class);
180   }
181 
182   /**
183    * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
184    * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
185    * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
186    */
187   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
188   public void test_LATEST_TIMESTAMP_isReplaced()
189   throws Exception {
190     Configuration conf = new Configuration(this.util.getConfiguration());
191     RecordWriter<ImmutableBytesWritable, Cell> writer = null;
192     TaskAttemptContext context = null;
193     Path dir =
194       util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
195     try {
196       Job job = new Job(conf);
197       FileOutputFormat.setOutputPath(job, dir);
198       context = createTestTaskAttemptContext(job);
199       HFileOutputFormat2 hof = new HFileOutputFormat2();
200       writer = hof.getRecordWriter(context);
201       final byte [] b = Bytes.toBytes("b");
202 
203       // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
204       // changed by call to write.  Check all in kv is same but ts.
205       KeyValue kv = new KeyValue(b, b, b);
206       KeyValue original = kv.clone();
207       writer.write(new ImmutableBytesWritable(), kv);
208       assertFalse(original.equals(kv));
209       assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
210       assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
211       assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
212       assertNotSame(original.getTimestamp(), kv.getTimestamp());
213       assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
214 
215       // Test 2. Now test passing a kv that has explicit ts.  It should not be
216       // changed by call to record write.
217       kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
218       original = kv.clone();
219       writer.write(new ImmutableBytesWritable(), kv);
220       assertTrue(original.equals(kv));
221     } finally {
222       if (writer != null && context != null) writer.close(context);
223       dir.getFileSystem(conf).delete(dir, true);
224     }
225   }
226 
227   private TaskAttemptContext createTestTaskAttemptContext(final Job job)
228   throws Exception {
229     HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
230     TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
231       job, "attempt_201402131733_0001_m_000000_0");
232     return context;
233   }
234 
235   /*
236    * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
237    * metadata used by time-restricted scans.
238    */
239   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
240   public void test_TIMERANGE() throws Exception {
241     Configuration conf = new Configuration(this.util.getConfiguration());
242     RecordWriter<ImmutableBytesWritable, Cell> writer = null;
243     TaskAttemptContext context = null;
244     Path dir =
245       util.getDataTestDir("test_TIMERANGE_present");
246     LOG.info("Timerange dir writing to dir: "+ dir);
247     try {
248       // build a record writer using HFileOutputFormat2
249       Job job = new Job(conf);
250       FileOutputFormat.setOutputPath(job, dir);
251       context = createTestTaskAttemptContext(job);
252       HFileOutputFormat2 hof = new HFileOutputFormat2();
253       writer = hof.getRecordWriter(context);
254 
255       // Pass two key values with explicit times stamps
256       final byte [] b = Bytes.toBytes("b");
257 
258       // value 1 with timestamp 2000
259       KeyValue kv = new KeyValue(b, b, b, 2000, b);
260       KeyValue original = kv.clone();
261       writer.write(new ImmutableBytesWritable(), kv);
262       assertEquals(original,kv);
263 
264       // value 2 with timestamp 1000
265       kv = new KeyValue(b, b, b, 1000, b);
266       original = kv.clone();
267       writer.write(new ImmutableBytesWritable(), kv);
268       assertEquals(original, kv);
269 
270       // verify that the file has the proper FileInfo.
271       writer.close(context);
272 
273       // the generated file lives 1 directory down from the attempt directory
274       // and is the only file, e.g.
275       // _attempt__0000_r_000000_0/b/1979617994050536795
276       FileSystem fs = FileSystem.get(conf);
277       Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
278       FileStatus[] sub1 = fs.listStatus(attemptDirectory);
279       FileStatus[] file = fs.listStatus(sub1[0].getPath());
280 
281       // open as HFile Reader and pull out TIMERANGE FileInfo.
282       HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
283           new CacheConfig(conf), conf);
284       Map<byte[],byte[]> finfo = rd.loadFileInfo();
285       byte[] range = finfo.get("TIMERANGE".getBytes());
286       assertNotNull(range);
287 
288       // unmarshall and check values.
289       TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
290       Writables.copyWritable(range, timeRangeTracker);
291       LOG.info(timeRangeTracker.getMinimumTimestamp() +
292           "...." + timeRangeTracker.getMaximumTimestamp());
293       assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
294       assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
295       rd.close();
296     } finally {
297       if (writer != null && context != null) writer.close(context);
298       dir.getFileSystem(conf).delete(dir, true);
299     }
300   }
301 
302   /**
303    * Run small MR job.
304    */
305   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
306   public void testWritingPEData() throws Exception {
307     Configuration conf = util.getConfiguration();
308     Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
309     FileSystem fs = testDir.getFileSystem(conf);
310 
311     // Set down this value or we OOME in eclipse.
312     conf.setInt("mapreduce.task.io.sort.mb", 20);
313     // Write a few files.
314     conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
315 
316     Job job = new Job(conf, "testWritingPEData");
317     setupRandomGeneratorMapper(job);
318     // This partitioner doesn't work well for number keys but using it anyways
319     // just to demonstrate how to configure it.
320     byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
321     byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
322 
323     Arrays.fill(startKey, (byte)0);
324     Arrays.fill(endKey, (byte)0xff);
325 
326     job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
327     // Set start and end rows for partitioner.
328     SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
329     SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
330     job.setReducerClass(KeyValueSortReducer.class);
331     job.setOutputFormatClass(HFileOutputFormat2.class);
332     job.setNumReduceTasks(4);
333     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
334         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
335         KeyValueSerialization.class.getName());
336 
337     FileOutputFormat.setOutputPath(job, testDir);
338     assertTrue(job.waitForCompletion(false));
339     FileStatus [] files = fs.listStatus(testDir);
340     assertTrue(files.length > 0);
341   }
342 
343   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
344   public void testJobConfiguration() throws Exception {
345     Configuration conf = new Configuration(this.util.getConfiguration());
346     conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
347         .toString());
348     Job job = new Job(conf);
349     job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
350     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
351     setupMockStartKeys(regionLocator);
352     HFileOutputFormat2.configureIncrementalLoad(job, new HTableDescriptor(), regionLocator);
353     assertEquals(job.getNumReduceTasks(), 4);
354   }
355 
356   private byte [][] generateRandomStartKeys(int numKeys) {
357     Random random = new Random();
358     byte[][] ret = new byte[numKeys][];
359     // first region start key is always empty
360     ret[0] = HConstants.EMPTY_BYTE_ARRAY;
361     for (int i = 1; i < numKeys; i++) {
362       ret[i] =
363         PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
364     }
365     return ret;
366   }
367 
368   private byte[][] generateRandomSplitKeys(int numKeys) {
369     Random random = new Random();
370     byte[][] ret = new byte[numKeys][];
371     for (int i = 0; i < numKeys; i++) {
372       ret[i] =
373           PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
374     }
375     return ret;
376   }
377 
378   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
379   public void testMRIncrementalLoad() throws Exception {
380     LOG.info("\nStarting test testMRIncrementalLoad\n");
381     doIncrementalLoadTest(false);
382   }
383 
384   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
385   public void testMRIncrementalLoadWithSplit() throws Exception {
386     LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
387     doIncrementalLoadTest(true);
388   }
389 
390   /**
391    * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
392    * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
393    * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
394    * it's not possible to check the region locality by comparing region locations and DN hostnames.
395    * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
396    * we could test region locality features more easily.
397    */
398   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
399   public void testMRIncrementalLoadWithLocality() throws Exception {
400     LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
401     doIncrementalLoadTest(false);
402     doIncrementalLoadTest(true);
403   }
404 
405   private void doIncrementalLoadTest(
406       boolean shouldChangeRegions) throws Exception {
407     util = new HBaseTestingUtility();
408     Configuration conf = util.getConfiguration();
409     byte[][] splitKeys = generateRandomSplitKeys(4);
410     util.setJobWithoutMRCluster();
411     util.startMiniCluster();
412     try {
413       HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
414       Admin admin = table.getConnection().getAdmin();
415       Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
416       assertEquals("Should start with empty table",
417           0, util.countRows(table));
418       int numRegions = -1;
419       try (RegionLocator r = table.getRegionLocator()) {
420         numRegions = r.getStartKeys().length;
421       }
422       assertEquals("Should make 5 regions", numRegions, 5);
423       // Generate the bulk load files
424       runIncrementalPELoad(conf, table.getTableDescriptor(), table.getRegionLocator(), testDir);
425       // This doesn't write into the table, just makes files
426       assertEquals("HFOF should not touch actual table",
427           0, util.countRows(table));
428 
429 
430       // Make sure that a directory was created for every CF
431       int dir = 0;
432       for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
433         for (byte[] family : FAMILIES) {
434           if (Bytes.toString(family).equals(f.getPath().getName())) {
435             ++dir;
436           }
437         }
438       }
439       assertEquals("Column family not found in FS.", FAMILIES.length, dir);
440 
441       // handle the split case
442       if (shouldChangeRegions) {
443         LOG.info("Changing regions in table");
444         admin.disableTable(table.getName());
445         while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
446             getRegionStates().isRegionsInTransition()) {
447           Threads.sleep(200);
448           LOG.info("Waiting on table to finish disabling");
449         }
450         util.deleteTable(table.getName());
451         byte[][] newSplitKeys = generateRandomSplitKeys(14);
452         table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
453 
454         while (table.getRegionLocator().getAllRegionLocations().size() != 15 ||
455             !admin.isTableAvailable(table.getName())) {
456           Thread.sleep(200);
457           LOG.info("Waiting for new region assignment to happen");
458         }
459       }
460 
461       // Perform the actual load
462       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
463 
464       // Ensure data shows up
465       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
466       assertEquals("LoadIncrementalHFiles should put expected data in table",
467           expectedRows, util.countRows(table));
468       Scan scan = new Scan();
469       ResultScanner results = table.getScanner(scan);
470       for (Result res : results) {
471         assertEquals(FAMILIES.length, res.rawCells().length);
472         Cell first = res.rawCells()[0];
473         for (Cell kv : res.rawCells()) {
474           assertTrue(CellUtil.matchingRow(first, kv));
475           assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
476         }
477       }
478       results.close();
479       String tableDigestBefore = util.checksumRows(table);
480 
481       // Cause regions to reopen
482       admin.disableTable(TABLE_NAME);
483       while (!admin.isTableDisabled(TABLE_NAME)) {
484         Thread.sleep(200);
485         LOG.info("Waiting for table to disable");
486       }
487       admin.enableTable(TABLE_NAME);
488       util.waitTableAvailable(TABLE_NAME);
489       assertEquals("Data should remain after reopening of regions",
490           tableDigestBefore, util.checksumRows(table));
491     } finally {
492       util.shutdownMiniCluster();
493     }
494   }
495 
496   private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
497       RegionLocator regionLocator, Path outDir) throws IOException, UnsupportedEncodingException,
498       InterruptedException, ClassNotFoundException {
499     Job job = new Job(conf, "testLocalMRIncrementalLoad");
500     job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
501     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
502         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
503         KeyValueSerialization.class.getName());
504     setupRandomGeneratorMapper(job);
505     HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
506     FileOutputFormat.setOutputPath(job, outDir);
507 
508     assertFalse(util.getTestFileSystem().exists(outDir)) ;
509 
510     assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
511 
512     assertTrue(job.waitForCompletion(true));
513   }
514 
515   /**
516    * Test for {@link HFileOutputFormat2#configureCompression(org.apache.hadoop.hbase.client.Table,
517    * Configuration)} and {@link HFileOutputFormat2#createFamilyCompressionMap
518    * (Configuration)}.
519    * Tests that the compression map is correctly serialized into
520    * and deserialized from configuration
521    *
522    * @throws IOException
523    */
524   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
525   public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
526     for (int numCfs = 0; numCfs <= 3; numCfs++) {
527       Configuration conf = new Configuration(this.util.getConfiguration());
528       Map<String, Compression.Algorithm> familyToCompression =
529           getMockColumnFamiliesForCompression(numCfs);
530       Table table = Mockito.mock(HTable.class);
531       setupMockColumnFamiliesForCompression(table, familyToCompression);
532       HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor());
533 
534       // read back family specific compression setting from the configuration
535       Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
536           .createFamilyCompressionMap(conf);
537 
538       // test that we have a value for all column families that matches with the
539       // used mock values
540       for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
541         assertEquals("Compression configuration incorrect for column family:"
542             + entry.getKey(), entry.getValue(),
543             retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
544       }
545     }
546   }
547 
548   private void setupMockColumnFamiliesForCompression(Table table,
549       Map<String, Compression.Algorithm> familyToCompression) throws IOException {
550     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
551     for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
552       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
553           .setMaxVersions(1)
554           .setCompressionType(entry.getValue())
555           .setBlockCacheEnabled(false)
556           .setTimeToLive(0));
557     }
558     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
559   }
560 
561   /**
562    * @return a map from column family names to compression algorithms for
563    *         testing column family compression. Column family names have special characters
564    */
565   private Map<String, Compression.Algorithm>
566       getMockColumnFamiliesForCompression (int numCfs) {
567     Map<String, Compression.Algorithm> familyToCompression
568       = new HashMap<String, Compression.Algorithm>();
569     // use column family names having special characters
570     if (numCfs-- > 0) {
571       familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
572     }
573     if (numCfs-- > 0) {
574       familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
575     }
576     if (numCfs-- > 0) {
577       familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
578     }
579     if (numCfs-- > 0) {
580       familyToCompression.put("Family3", Compression.Algorithm.NONE);
581     }
582     return familyToCompression;
583   }
584 
585 
586   /**
587    * Test for {@link HFileOutputFormat2#configureBloomType(org.apache.hadoop.hbase.client.Table,
588    * Configuration)} and {@link HFileOutputFormat2#createFamilyBloomTypeMap
589    * (Configuration)}.
590    * Tests that the compression map is correctly serialized into
591    * and deserialized from configuration
592    *
593    * @throws IOException
594    */
595   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
596   public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
597     for (int numCfs = 0; numCfs <= 2; numCfs++) {
598       Configuration conf = new Configuration(this.util.getConfiguration());
599       Map<String, BloomType> familyToBloomType =
600           getMockColumnFamiliesForBloomType(numCfs);
601       Table table = Mockito.mock(HTable.class);
602       setupMockColumnFamiliesForBloomType(table,
603           familyToBloomType);
604       HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf);
605 
606       // read back family specific data block encoding settings from the
607       // configuration
608       Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
609           HFileOutputFormat2
610               .createFamilyBloomTypeMap(conf);
611 
612       // test that we have a value for all column families that matches with the
613       // used mock values
614       for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
615         assertEquals("BloomType configuration incorrect for column family:"
616             + entry.getKey(), entry.getValue(),
617             retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
618       }
619     }
620   }
621 
622   private void setupMockColumnFamiliesForBloomType(Table table,
623       Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
624     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
625     for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
626       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
627           .setMaxVersions(1)
628           .setBloomFilterType(entry.getValue())
629           .setBlockCacheEnabled(false)
630           .setTimeToLive(0));
631     }
632     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
633   }
634 
635   /**
636    * @return a map from column family names to compression algorithms for
637    *         testing column family compression. Column family names have special characters
638    */
639   private Map<String, BloomType>
640   getMockColumnFamiliesForBloomType (int numCfs) {
641     Map<String, BloomType> familyToBloomType =
642         new HashMap<String, BloomType>();
643     // use column family names having special characters
644     if (numCfs-- > 0) {
645       familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
646     }
647     if (numCfs-- > 0) {
648       familyToBloomType.put("Family2=asdads&!AASD",
649           BloomType.ROWCOL);
650     }
651     if (numCfs-- > 0) {
652       familyToBloomType.put("Family3", BloomType.NONE);
653     }
654     return familyToBloomType;
655   }
656 
657   /**
658    * Test for {@link HFileOutputFormat2#configureBlockSize(org.apache.hadoop.hbase.client.Table,
659    * Configuration)} and {@link HFileOutputFormat2#createFamilyBlockSizeMap
660    * (Configuration)}.
661    * Tests that the compression map is correctly serialized into
662    * and deserialized from configuration
663    *
664    * @throws IOException
665    */
666   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
667   public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
668     for (int numCfs = 0; numCfs <= 3; numCfs++) {
669       Configuration conf = new Configuration(this.util.getConfiguration());
670       Map<String, Integer> familyToBlockSize =
671           getMockColumnFamiliesForBlockSize(numCfs);
672       Table table = Mockito.mock(HTable.class);
673       setupMockColumnFamiliesForBlockSize(table,
674           familyToBlockSize);
675       HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf);
676 
677       // read back family specific data block encoding settings from the
678       // configuration
679       Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
680           HFileOutputFormat2
681               .createFamilyBlockSizeMap(conf);
682 
683       // test that we have a value for all column families that matches with the
684       // used mock values
685       for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
686           ) {
687         assertEquals("BlockSize configuration incorrect for column family:"
688             + entry.getKey(), entry.getValue(),
689             retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
690       }
691     }
692   }
693 
694   private void setupMockColumnFamiliesForBlockSize(Table table,
695       Map<String, Integer> familyToDataBlockEncoding) throws IOException {
696     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
697     for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
698       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
699           .setMaxVersions(1)
700           .setBlocksize(entry.getValue())
701           .setBlockCacheEnabled(false)
702           .setTimeToLive(0));
703     }
704     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
705   }
706 
707   /**
708    * @return a map from column family names to compression algorithms for
709    *         testing column family compression. Column family names have special characters
710    */
711   private Map<String, Integer>
712   getMockColumnFamiliesForBlockSize (int numCfs) {
713     Map<String, Integer> familyToBlockSize =
714         new HashMap<String, Integer>();
715     // use column family names having special characters
716     if (numCfs-- > 0) {
717       familyToBlockSize.put("Family1!@#!@#&", 1234);
718     }
719     if (numCfs-- > 0) {
720       familyToBlockSize.put("Family2=asdads&!AASD",
721           Integer.MAX_VALUE);
722     }
723     if (numCfs-- > 0) {
724       familyToBlockSize.put("Family2=asdads&!AASD",
725           Integer.MAX_VALUE);
726     }
727     if (numCfs-- > 0) {
728       familyToBlockSize.put("Family3", 0);
729     }
730     return familyToBlockSize;
731   }
732 
733   /**
734    * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)}
735    * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
736    * Tests that the compression map is correctly serialized into
737    * and deserialized from configuration
738    *
739    * @throws IOException
740    */
741   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
742   public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
743     for (int numCfs = 0; numCfs <= 3; numCfs++) {
744       Configuration conf = new Configuration(this.util.getConfiguration());
745       Map<String, DataBlockEncoding> familyToDataBlockEncoding =
746           getMockColumnFamiliesForDataBlockEncoding(numCfs);
747       Table table = Mockito.mock(HTable.class);
748       setupMockColumnFamiliesForDataBlockEncoding(table,
749           familyToDataBlockEncoding);
750       HTableDescriptor tableDescriptor = table.getTableDescriptor();
751       HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
752 
753       // read back family specific data block encoding settings from the
754       // configuration
755       Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
756           HFileOutputFormat2
757           .createFamilyDataBlockEncodingMap(conf);
758 
759       // test that we have a value for all column families that matches with the
760       // used mock values
761       for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
762         assertEquals("DataBlockEncoding configuration incorrect for column family:"
763             + entry.getKey(), entry.getValue(),
764             retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
765       }
766     }
767   }
768 
769   private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
770       Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
771     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
772     for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
773       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
774           .setMaxVersions(1)
775           .setDataBlockEncoding(entry.getValue())
776           .setBlockCacheEnabled(false)
777           .setTimeToLive(0));
778     }
779     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
780   }
781 
782   /**
783    * @return a map from column family names to compression algorithms for
784    *         testing column family compression. Column family names have special characters
785    */
786   private Map<String, DataBlockEncoding>
787       getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
788     Map<String, DataBlockEncoding> familyToDataBlockEncoding =
789         new HashMap<String, DataBlockEncoding>();
790     // use column family names having special characters
791     if (numCfs-- > 0) {
792       familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
793     }
794     if (numCfs-- > 0) {
795       familyToDataBlockEncoding.put("Family2=asdads&!AASD",
796           DataBlockEncoding.FAST_DIFF);
797     }
798     if (numCfs-- > 0) {
799       familyToDataBlockEncoding.put("Family2=asdads&!AASD",
800           DataBlockEncoding.PREFIX);
801     }
802     if (numCfs-- > 0) {
803       familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
804     }
805     return familyToDataBlockEncoding;
806   }
807 
808   private void setupMockStartKeys(RegionLocator table) throws IOException {
809     byte[][] mockKeys = new byte[][] {
810         HConstants.EMPTY_BYTE_ARRAY,
811         Bytes.toBytes("aaa"),
812         Bytes.toBytes("ggg"),
813         Bytes.toBytes("zzz")
814     };
815     Mockito.doReturn(mockKeys).when(table).getStartKeys();
816   }
817 
818   /**
819    * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
820    * bloom filter settings from the column family descriptor
821    */
822   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
823   public void testColumnFamilySettings() throws Exception {
824     Configuration conf = new Configuration(this.util.getConfiguration());
825     RecordWriter<ImmutableBytesWritable, Cell> writer = null;
826     TaskAttemptContext context = null;
827     Path dir = util.getDataTestDir("testColumnFamilySettings");
828 
829     // Setup table descriptor
830     Table table = Mockito.mock(Table.class);
831     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
832     HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
833     Mockito.doReturn(htd).when(table).getTableDescriptor();
834     for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
835       htd.addFamily(hcd);
836     }
837 
838     // set up the table to return some mock keys
839     setupMockStartKeys(regionLocator);
840 
841     try {
842       // partial map red setup to get an operational writer for testing
843       // We turn off the sequence file compression, because DefaultCodec
844       // pollutes the GZip codec pool with an incompatible compressor.
845       conf.set("io.seqfile.compression.type", "NONE");
846       conf.set("hbase.fs.tmp.dir", dir.toString());
847       Job job = new Job(conf, "testLocalMRIncrementalLoad");
848       job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
849       setupRandomGeneratorMapper(job);
850       HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
851       FileOutputFormat.setOutputPath(job, dir);
852       context = createTestTaskAttemptContext(job);
853       HFileOutputFormat2 hof = new HFileOutputFormat2();
854       writer = hof.getRecordWriter(context);
855 
856       // write out random rows
857       writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
858       writer.close(context);
859 
860       // Make sure that a directory was created for every CF
861       FileSystem fs = dir.getFileSystem(conf);
862 
863       // commit so that the filesystem has one directory per column family
864       hof.getOutputCommitter(context).commitTask(context);
865       hof.getOutputCommitter(context).commitJob(context);
866       FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
867       assertEquals(htd.getFamilies().size(), families.length);
868       for (FileStatus f : families) {
869         String familyStr = f.getPath().getName();
870         HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
871         // verify that the compression on this file matches the configured
872         // compression
873         Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
874         Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), conf);
875         Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
876 
877         byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
878         if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
879         assertEquals("Incorrect bloom filter used for column family " + familyStr +
880           "(reader: " + reader + ")",
881           hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
882         assertEquals("Incorrect compression used for column family " + familyStr +
883           "(reader: " + reader + ")", hcd.getCompression(), reader.getFileContext().getCompression());
884       }
885     } finally {
886       dir.getFileSystem(conf).delete(dir, true);
887     }
888   }
889 
890   /**
891    * Write random values to the writer assuming a table created using
892    * {@link #FAMILIES} as column family descriptors
893    */
894   private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
895       TaskAttemptContext context, Set<byte[]> families, int numRows)
896       throws IOException, InterruptedException {
897     byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
898     int valLength = 10;
899     byte valBytes[] = new byte[valLength];
900 
901     int taskId = context.getTaskAttemptID().getTaskID().getId();
902     assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
903     final byte [] qualifier = Bytes.toBytes("data");
904     Random random = new Random();
905     for (int i = 0; i < numRows; i++) {
906 
907       Bytes.putInt(keyBytes, 0, i);
908       random.nextBytes(valBytes);
909       ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
910 
911       for (byte[] family : families) {
912         Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
913         writer.write(key, kv);
914       }
915     }
916   }
917 
918   /**
919    * This test is to test the scenario happened in HBASE-6901.
920    * All files are bulk loaded and excluded from minor compaction.
921    * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
922    * will be thrown.
923    */
924   @Ignore ("Flakey: See HBASE-9051") @Test
925   public void testExcludeAllFromMinorCompaction() throws Exception {
926     Configuration conf = util.getConfiguration();
927     conf.setInt("hbase.hstore.compaction.min", 2);
928     generateRandomStartKeys(5);
929     util.setJobWithoutMRCluster();
930     util.startMiniCluster();
931     try (Connection conn = ConnectionFactory.createConnection();
932         Admin admin = conn.getAdmin()) {
933       final FileSystem fs = util.getDFSCluster().getFileSystem();
934       HTable table = util.createTable(TABLE_NAME, FAMILIES);
935       assertEquals("Should start with empty table", 0, util.countRows(table));
936 
937       // deep inspection: get the StoreFile dir
938       final Path storePath = new Path(
939         FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
940           new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
941             Bytes.toString(FAMILIES[0])));
942       assertEquals(0, fs.listStatus(storePath).length);
943 
944       // Generate two bulk load files
945       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
946           true);
947 
948       for (int i = 0; i < 2; i++) {
949         Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
950         runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
951             testDir);
952         // Perform the actual load
953         new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
954       }
955 
956       // Ensure data shows up
957       int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
958       assertEquals("LoadIncrementalHFiles should put expected data in table",
959           expectedRows, util.countRows(table));
960 
961       // should have a second StoreFile now
962       assertEquals(2, fs.listStatus(storePath).length);
963 
964       // minor compactions shouldn't get rid of the file
965       admin.compact(TABLE_NAME);
966       try {
967         quickPoll(new Callable<Boolean>() {
968           @Override
969           public Boolean call() throws Exception {
970             return fs.listStatus(storePath).length == 1;
971           }
972         }, 5000);
973         throw new IOException("SF# = " + fs.listStatus(storePath).length);
974       } catch (AssertionError ae) {
975         // this is expected behavior
976       }
977 
978       // a major compaction should work though
979       admin.majorCompact(TABLE_NAME);
980       quickPoll(new Callable<Boolean>() {
981         @Override
982         public Boolean call() throws Exception {
983           return fs.listStatus(storePath).length == 1;
984         }
985       }, 5000);
986 
987     } finally {
988       util.shutdownMiniCluster();
989     }
990   }
991 
992   @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
993   public void testExcludeMinorCompaction() throws Exception {
994     Configuration conf = util.getConfiguration();
995     conf.setInt("hbase.hstore.compaction.min", 2);
996     generateRandomStartKeys(5);
997     util.setJobWithoutMRCluster();
998     util.startMiniCluster();
999     try (Connection conn = ConnectionFactory.createConnection(conf);
1000         Admin admin = conn.getAdmin()){
1001       Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1002       final FileSystem fs = util.getDFSCluster().getFileSystem();
1003       Table table = util.createTable(TABLE_NAME, FAMILIES);
1004       assertEquals("Should start with empty table", 0, util.countRows(table));
1005 
1006       // deep inspection: get the StoreFile dir
1007       final Path storePath = new Path(
1008         FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
1009           new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
1010             Bytes.toString(FAMILIES[0])));
1011       assertEquals(0, fs.listStatus(storePath).length);
1012 
1013       // put some data in it and flush to create a storefile
1014       Put p = new Put(Bytes.toBytes("test"));
1015       p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1016       table.put(p);
1017       admin.flush(TABLE_NAME);
1018       assertEquals(1, util.countRows(table));
1019       quickPoll(new Callable<Boolean>() {
1020         @Override
1021         public Boolean call() throws Exception {
1022           return fs.listStatus(storePath).length == 1;
1023         }
1024       }, 5000);
1025 
1026       // Generate a bulk load file with more rows
1027       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1028           true);
1029 
1030       RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
1031       runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir);
1032 
1033       // Perform the actual load
1034       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1035 
1036       // Ensure data shows up
1037       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1038       assertEquals("LoadIncrementalHFiles should put expected data in table",
1039           expectedRows + 1, util.countRows(table));
1040 
1041       // should have a second StoreFile now
1042       assertEquals(2, fs.listStatus(storePath).length);
1043 
1044       // minor compactions shouldn't get rid of the file
1045       admin.compact(TABLE_NAME);
1046       try {
1047         quickPoll(new Callable<Boolean>() {
1048           @Override
1049           public Boolean call() throws Exception {
1050             return fs.listStatus(storePath).length == 1;
1051           }
1052         }, 5000);
1053         throw new IOException("SF# = " + fs.listStatus(storePath).length);
1054       } catch (AssertionError ae) {
1055         // this is expected behavior
1056       }
1057 
1058       // a major compaction should work though
1059       admin.majorCompact(TABLE_NAME);
1060       quickPoll(new Callable<Boolean>() {
1061         @Override
1062         public Boolean call() throws Exception {
1063           return fs.listStatus(storePath).length == 1;
1064         }
1065       }, 5000);
1066 
1067     } finally {
1068       util.shutdownMiniCluster();
1069     }
1070   }
1071 
1072   private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1073     int sleepMs = 10;
1074     int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1075     while (retries-- > 0) {
1076       if (c.call().booleanValue()) {
1077         return;
1078       }
1079       Thread.sleep(sleepMs);
1080     }
1081     fail();
1082   }
1083 
1084   public static void main(String args[]) throws Exception {
1085     new TestHFileOutputFormat2().manualTest(args);
1086   }
1087 
1088   public void manualTest(String args[]) throws Exception {
1089     Configuration conf = HBaseConfiguration.create();
1090     util = new HBaseTestingUtility(conf);
1091     if ("newtable".equals(args[0])) {
1092       TableName tname = TableName.valueOf(args[1]);
1093       byte[][] splitKeys = generateRandomSplitKeys(4);
1094       try (HTable table = util.createTable(tname, FAMILIES, splitKeys)) {
1095       }
1096     } else if ("incremental".equals(args[0])) {
1097       TableName tname = TableName.valueOf(args[1]);
1098       try(Connection c = ConnectionFactory.createConnection(conf);
1099           Admin admin = c.getAdmin();
1100           RegionLocator regionLocator = c.getRegionLocator(tname)) {
1101         Path outDir = new Path("incremental-out");
1102         runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir);
1103       }
1104     } else {
1105       throw new RuntimeException(
1106           "usage: TestHFileOutputFormat2 newtable | incremental");
1107     }
1108   }
1109 
1110 }
1111