View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.mockito.AdditionalMatchers.aryEq;
26  import static org.mockito.Matchers.any;
27  import static org.mockito.Matchers.anyBoolean;
28  import static org.mockito.Matchers.anyInt;
29  import static org.mockito.Matchers.anyLong;
30  import static org.mockito.Matchers.argThat;
31  import static org.mockito.Matchers.eq;
32  import static org.mockito.Matchers.isNull;
33  import static org.mockito.Mockito.mock;
34  import static org.mockito.Mockito.only;
35  import static org.mockito.Mockito.times;
36  import static org.mockito.Mockito.verify;
37  import static org.mockito.Mockito.when;
38  
39  import java.io.IOException;
40  import java.util.ArrayList;
41  import java.util.Arrays;
42  import java.util.Collection;
43  import java.util.Iterator;
44  import java.util.List;
45  
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.hbase.Cell;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HColumnDescriptor;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.io.compress.Compression;
54  import org.apache.hadoop.hbase.io.hfile.HFile;
55  import org.apache.hadoop.hbase.regionserver.BloomType;
56  import org.apache.hadoop.hbase.regionserver.InternalScanner;
57  import org.apache.hadoop.hbase.regionserver.ScanType;
58  import org.apache.hadoop.hbase.regionserver.ScannerContext;
59  import org.apache.hadoop.hbase.regionserver.Store;
60  import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
61  import org.apache.hadoop.hbase.regionserver.StoreFile;
62  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
63  import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
64  import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
65  import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
66  import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
67  import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
68  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
69  import org.apache.hadoop.hbase.security.User;
70  import org.apache.hadoop.hbase.testclassification.SmallTests;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.ConcatenatedLists;
73  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
74  import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
75  import org.junit.Test;
76  import org.junit.experimental.categories.Category;
77  import org.mockito.ArgumentMatcher;
78  
79  import com.google.common.collect.ImmutableList;
80  import com.google.common.collect.Lists;
81  
82  @Category(SmallTests.class)
83  public class TestStripeCompactionPolicy {
84    private static final byte[] KEY_A = Bytes.toBytes("aaa");
85    private static final byte[] KEY_B = Bytes.toBytes("bbb");
86    private static final byte[] KEY_C = Bytes.toBytes("ccc");
87    private static final byte[] KEY_D = Bytes.toBytes("ddd");
88    private static final byte[] KEY_E = Bytes.toBytes("eee");
89    private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
90    private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
91    private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
92    private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
93    private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
94  
95  
96    private static long defaultSplitSize = 18;
97    private static float defaultSplitCount = 1.8F;
98    private final static int defaultInitialCount = 1;
99    private static long defaultTtl = 1000 * 1000;
100 
101   @Test
102   public void testNoStripesFromFlush() throws Exception {
103     Configuration conf = HBaseConfiguration.create();
104     conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
105     StripeCompactionPolicy policy = createPolicy(conf);
106     StripeInformationProvider si = createStripesL0Only(0, 0);
107 
108     KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
109     KeyValue[][] expected = new KeyValue[][] { input };
110     verifyFlush(policy, si, input, expected, null);
111   }
112 
113   @Test
114   public void testOldStripesFromFlush() throws Exception {
115     StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
116     StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
117 
118     KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
119     KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
120         new KeyValue[] { KV_C, KV_C }, new KeyValue[] {  KV_D, KV_E } };
121     verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
122   }
123 
124   @Test
125   public void testNewStripesFromFlush() throws Exception {
126     StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
127     StripeInformationProvider si = createStripesL0Only(0, 0);
128     KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
129     // Starts with one stripe; unlike flush results, must have metadata
130     KeyValue[][] expected = new KeyValue[][] { input };
131     verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
132   }
133 
134   @Test
135   public void testSingleStripeCompaction() throws Exception {
136     // Create a special policy that only compacts single stripes, using standard methods.
137     Configuration conf = HBaseConfiguration.create();
138     conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
139     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
140     conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
141     conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000); // make sure the are no splits
142     StoreConfigInformation sci = mock(StoreConfigInformation.class);
143     StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
144     StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
145       @Override
146       public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
147           List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
148         if (!filesCompacting.isEmpty()) return null;
149         return selectSingleStripeCompaction(si, false, false, isOffpeak);
150       }
151 
152       @Override
153       public boolean needsCompactions(
154           StripeInformationProvider si, List<StoreFile> filesCompacting) {
155         if (!filesCompacting.isEmpty()) return false;
156         return needsSingleStripeCompaction(si);
157       }
158     };
159 
160     // No compaction due to min files or ratio
161     StripeInformationProvider si = createStripesWithSizes(0, 0,
162         new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
163     verifyNoCompaction(policy, si);
164     // No compaction due to min files or ratio - will report needed, but not do any.
165     si = createStripesWithSizes(0, 0,
166         new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L });
167     assertNull(policy.selectCompaction(si, al(), false));
168     assertTrue(policy.needsCompactions(si, al()));
169     // One stripe has possible compaction
170     si = createStripesWithSizes(0, 0,
171         new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L });
172     verifySingleStripeCompaction(policy, si, 2, null);
173     // Several stripes have possible compactions; choose best quality (removes most files)
174     si = createStripesWithSizes(0, 0,
175         new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L });
176     verifySingleStripeCompaction(policy, si, 2, null);
177     si = createStripesWithSizes(0, 0,
178         new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L });
179     verifySingleStripeCompaction(policy, si, 1, null);
180     // Or with smallest files, if the count is the same 
181     si = createStripesWithSizes(0, 0,
182         new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L });
183     verifySingleStripeCompaction(policy, si, 1, null);
184     // Verify max count is respected.
185     si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
186     List<StoreFile> sfs = si.getStripes().get(1).subList(1, 5);
187     verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
188     // Verify ratio is applied.
189     si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
190     sfs = si.getStripes().get(1).subList(1, 5);
191     verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
192   }
193 
194   @Test
195   public void testWithParallelCompaction() throws Exception {
196     // TODO: currently only one compaction at a time per store is allowed. If this changes,
197     //       the appropriate file exclusion testing would need to be done in respective tests.
198     assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction(
199         mock(StripeInformationProvider.class), al(createFile()), false));
200   }
201 
202   @Test
203   public void testWithReferences() throws Exception {
204     StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
205     StripeCompactor sc = mock(StripeCompactor.class);
206     StoreFile ref = createFile();
207     when(ref.isReference()).thenReturn(true);
208     StripeInformationProvider si = mock(StripeInformationProvider.class);
209     Collection<StoreFile> sfs = al(ref, createFile());
210     when(si.getStorefiles()).thenReturn(sfs);
211 
212     assertTrue(policy.needsCompactions(si, al()));
213     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
214     assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
215     scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
216     verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
217       aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
218       any(NoLimitCompactionThroughputController.class), any(User.class));
219   }
220 
221   @Test
222   public void testInitialCountFromL0() throws Exception {
223     Configuration conf = HBaseConfiguration.create();
224     conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
225     StripeCompactionPolicy policy = createPolicy(
226         conf, defaultSplitSize, defaultSplitCount, 2, false);
227     StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
228     verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
229     si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts.
230     verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
231     policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
232     verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
233   }
234 
235   @Test
236   public void testExistingStripesFromL0() throws Exception {
237     Configuration conf = HBaseConfiguration.create();
238     conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
239     StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
240     verifyCompaction(
241         createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
242   }
243 
244   @Test
245   public void testNothingToCompactFromL0() throws Exception {
246     Configuration conf = HBaseConfiguration.create();
247     conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
248     StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
249     StripeCompactionPolicy policy = createPolicy(conf);
250     verifyNoCompaction(policy, si);
251 
252     si = createStripes(3, KEY_A);
253     verifyNoCompaction(policy, si);
254   }
255 
256   @Test
257   public void testSplitOffStripe() throws Exception {
258     Configuration conf = HBaseConfiguration.create();
259     // First test everything with default split count of 2, then split into more.
260     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
261     Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
262     Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
263     long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
264     // Don't split if not eligible for compaction.
265     StripeCompactionPolicy.StripeInformationProvider si =
266         createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
267     assertNull(createPolicy(conf).selectCompaction(si, al(), false));
268     // Make sure everything is eligible.
269     conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
270     StripeCompactionPolicy policy = createPolicy(conf);
271     verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
272     // Add some extra stripes...
273     si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
274     verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
275     // In the middle.
276     si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
277     verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
278     // No split-off with different config (larger split size).
279     // However, in this case some eligible stripe will just be compacted alone.
280     StripeCompactionPolicy specPolicy = createPolicy(
281         conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
282     verifySingleStripeCompaction(specPolicy, si, 1, null);
283   }
284 
285   @Test
286   public void testSplitOffStripeOffPeak() throws Exception {
287     // for HBASE-11439
288     Configuration conf = HBaseConfiguration.create();
289     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
290     // Select the last 2 files.
291     StripeCompactionPolicy.StripeInformationProvider si =
292         createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
293     assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles()
294         .size());
295     // Make sure everything is eligible in offpeak.
296     conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
297     assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles()
298         .size());
299   }
300 
301   @Test
302   public void testSplitOffStripeDropDeletes() throws Exception {
303     Configuration conf = HBaseConfiguration.create();
304     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
305     StripeCompactionPolicy policy = createPolicy(conf);
306     Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
307     Long[] noSplit = new Long[] { 1L };
308     long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
309 
310     // Verify the deletes can be dropped if there are no L0 files.
311     StripeCompactionPolicy.StripeInformationProvider si =
312         createStripesWithSizes(0, 0, noSplit, toSplit);
313     verifyWholeStripesCompaction(policy, si, 1, 1,    true, null, splitTargetSize);
314     // But cannot be dropped if there are.
315     si = createStripesWithSizes(2, 2, noSplit, toSplit);
316     verifyWholeStripesCompaction(policy, si, 1, 1,    false, null, splitTargetSize);
317   }
318 
319   @SuppressWarnings("unchecked")
320   @Test
321   public void testMergeExpiredFiles() throws Exception {
322     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
323     long now = defaultTtl + 2;
324     edge.setValue(now);
325     EnvironmentEdgeManager.injectEdge(edge);
326     try {
327       StoreFile expiredFile = createFile(), notExpiredFile = createFile();
328       when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
329       when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
330       List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
331       List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
332       List<StoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
333 
334       StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(),
335           defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
336       // Merge expired if there are eligible stripes.
337       StripeCompactionPolicy.StripeInformationProvider si =
338           createStripesWithFiles(expired, expired, expired);
339       verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
340       // Don't merge if nothing expired.
341       si = createStripesWithFiles(notExpired, notExpired, notExpired);
342       assertNull(policy.selectCompaction(si, al(), false));
343       // Merge one expired stripe with next.
344       si = createStripesWithFiles(notExpired, expired, notExpired);
345       verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
346       // Merge the biggest run out of multiple options.
347       // Merge one expired stripe with next.
348       si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
349       verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
350       // Stripe with a subset of expired files is not merged.
351       si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
352       verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
353     } finally {
354       EnvironmentEdgeManager.reset();
355     }
356   }
357 
358   @SuppressWarnings("unchecked")
359   @Test
360   public void testMergeExpiredStripes() throws Exception {
361     // HBASE-11397
362     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
363     long now = defaultTtl + 2;
364     edge.setValue(now);
365     EnvironmentEdgeManager.injectEdge(edge);
366     try {
367       StoreFile expiredFile = createFile(), notExpiredFile = createFile();
368       when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
369       when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
370       List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
371       List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
372 
373       StripeCompactionPolicy policy =
374           createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount,
375             defaultInitialCount, true);
376 
377       // Merge all three expired stripes into one.
378       StripeCompactionPolicy.StripeInformationProvider si =
379           createStripesWithFiles(expired, expired, expired);
380       verifyMergeCompatcion(policy, si, 0, 2);
381 
382       // Merge two adjacent expired stripes into one.
383       si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
384       verifyMergeCompatcion(policy, si, 3, 4);
385     } finally {
386       EnvironmentEdgeManager.reset();
387     }
388   }
389 
390   private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
391       List<StoreFile>... stripeFiles) throws Exception {
392     return createStripesWithFiles(createBoundaries(stripeFiles.length),
393         Lists.newArrayList(stripeFiles), new ArrayList<StoreFile>());
394   }
395 
396   @Test
397   public void testSingleStripeDropDeletes() throws Exception {
398     Configuration conf = HBaseConfiguration.create();
399     StripeCompactionPolicy policy = createPolicy(conf);
400     // Verify the deletes can be dropped if there are no L0 files.
401     Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
402     StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
403     verifySingleStripeCompaction(policy, si, 0, true);
404     // But cannot be dropped if there are.
405     si = createStripesWithSizes(2, 2, stripes);
406     verifySingleStripeCompaction(policy, si, 0, false);
407     // Unless there are enough to cause L0 compaction.
408     si = createStripesWithSizes(6, 2, stripes);
409     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
410     sfs.addSublist(si.getLevel0Files());
411     sfs.addSublist(si.getStripes().get(0));
412     verifyCompaction(
413         policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
414     // If we cannot actually compact all files in some stripe, L0 is chosen.
415     si = createStripesWithSizes(6, 2,
416         new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
417     verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
418     // even if L0 has no file
419     // if all files of stripe aren't selected, delete must not be dropped.
420     stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
421     si = createStripesWithSizes(0, 0, stripes);
422     List<StoreFile> compact_file = new ArrayList<StoreFile>();
423     Iterator<StoreFile> iter = si.getStripes().get(0).listIterator(1);
424     while (iter.hasNext()) {
425         compact_file.add(iter.next());
426     }
427     verifyCompaction(policy, si, compact_file, false, 1, null, si.getStartRow(0), si.getEndRow(0), true);
428   }
429 
430   /********* HELPER METHODS ************/
431   private static StripeCompactionPolicy createPolicy(
432       Configuration conf) throws Exception {
433     return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
434   }
435 
436   private static StripeCompactionPolicy createPolicy(Configuration conf,
437       long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception {
438     conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
439     conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
440     conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
441     StoreConfigInformation sci = mock(StoreConfigInformation.class);
442     when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
443     StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
444     return new StripeCompactionPolicy(conf, sci, ssc);
445   }
446 
447   private static ArrayList<StoreFile> al(StoreFile... sfs) {
448     return new ArrayList<StoreFile>(Arrays.asList(sfs));
449   }
450 
451   private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
452       int from, int to) throws Exception {
453     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
454     Collection<StoreFile> sfs = getAllFiles(si, from, to);
455     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
456 
457     // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
458     // an empty file to preserve metadata
459     StripeCompactor sc = createCompactor();
460     List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
461     assertEquals(1, paths.size());
462   }
463 
464   /**
465    * Verify the compaction that includes several entire stripes.
466    * @param policy Policy to test.
467    * @param si Stripe information pre-set with stripes to test.
468    * @param from Starting stripe.
469    * @param to Ending stripe (inclusive).
470    * @param dropDeletes Whether to drop deletes from compaction range.
471    * @param count Expected # of resulting stripes, null if not checked.
472    * @param size Expected target stripe size, null if not checked.
473    */
474   private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
475       StripeInformationProvider si, int from, int to, Boolean dropDeletes,
476       Integer count, Long size, boolean needsCompaction) throws IOException {
477     verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes,
478         count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction);
479   }
480 
481   private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
482       StripeInformationProvider si, int from, int to, Boolean dropDeletes,
483       Integer count, Long size) throws IOException {
484     verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
485   }
486 
487   private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
488       StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
489     verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
490   }
491 
492   /**
493    * Verify no compaction is needed or selected.
494    * @param policy Policy to test.
495    * @param si Stripe information pre-set with stripes to test.
496    */
497   private void verifyNoCompaction(
498       StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException {
499     assertNull(policy.selectCompaction(si, al(), false));
500     assertFalse(policy.needsCompactions(si, al()));
501   }
502 
503   /**
504    * Verify arbitrary compaction.
505    * @param policy Policy to test.
506    * @param si Stripe information pre-set with stripes to test.
507    * @param sfs Files that should be compacted.
508    * @param dropDeletesFrom Row from which to drop deletes.
509    * @param dropDeletesTo Row to which to drop deletes.
510    * @param boundaries Expected target stripe boundaries.
511    */
512   private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
513       Collection<StoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
514       final List<byte[]> boundaries) throws Exception {
515     StripeCompactor sc = mock(StripeCompactor.class);
516     assertTrue(policy.needsCompactions(si, al()));
517     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
518     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
519     scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
520     verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
521       @Override
522       public boolean matches(Object argument) {
523         @SuppressWarnings("unchecked")
524         List<byte[]> other = (List<byte[]>) argument;
525         if (other.size() != boundaries.size()) return false;
526         for (int i = 0; i < other.size(); ++i) {
527           if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
528         }
529         return true;
530       }
531     }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
532       dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
533       any(NoLimitCompactionThroughputController.class), any(User.class));
534   }
535 
536   /**
537    * Verify arbitrary compaction.
538    * @param policy Policy to test.
539    * @param si Stripe information pre-set with stripes to test.
540    * @param sfs Files that should be compacted.
541    * @param dropDeletes Whether to drop deletes from compaction range.
542    * @param count Expected # of resulting stripes, null if not checked.
543    * @param size Expected target stripe size, null if not checked.
544    * @param start Left boundary of the compaction.
545    * @param righr Right boundary of the compaction.
546    */
547   private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
548       Collection<StoreFile> sfs, Boolean dropDeletes, Integer count, Long size,
549       byte[] start, byte[] end, boolean needsCompaction) throws IOException {
550     StripeCompactor sc = mock(StripeCompactor.class);
551     assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
552     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
553     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
554     scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
555     verify(sc, times(1)).compact(eq(scr.getRequest()),
556       count == null ? anyInt() : eq(count.intValue()),
557       size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
558       dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
559       any(NoLimitCompactionThroughputController.class), any(User.class));
560   }
561 
562   /** Verify arbitrary flush. */
563   protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
564       KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
565     StoreFileWritersCapture writers = new StoreFileWritersCapture();
566     StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(si, input.length);
567     StripeMultiFileWriter mw = req.createWriter();
568     mw.init(null, writers, new KeyValue.KVComparator());
569     for (KeyValue kv : input) {
570       mw.append(kv);
571     }
572     boolean hasMetadata = boundaries != null;
573     mw.commitWriters(0, false);
574     writers.verifyKvs(expected, true, hasMetadata);
575     if (hasMetadata) {
576       writers.verifyBoundaries(boundaries);
577     }
578   }
579 
580 
581   private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
582     return dropDeletes == null ? any(byte[].class)
583             : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
584   }
585 
586   private void verifyCollectionsEqual(Collection<StoreFile> sfs, Collection<StoreFile> scr) {
587     // Dumb.
588     assertEquals(sfs.size(), scr.size());
589     assertTrue(scr.containsAll(sfs));
590   }
591 
592   private static List<StoreFile> getAllFiles(
593       StripeInformationProvider si, int fromStripe, int toStripe) {
594     ArrayList<StoreFile> expected = new ArrayList<StoreFile>();
595     for (int i = fromStripe; i <= toStripe; ++i) {
596       expected.addAll(si.getStripes().get(i));
597     }
598     return expected;
599   }
600 
601   /**
602    * @param l0Count Number of L0 files.
603    * @param boundaries Target boundaries.
604    * @return Mock stripes.
605    */
606   private static StripeInformationProvider createStripes(
607       int l0Count, byte[]... boundaries) throws Exception {
608     List<Long> l0Sizes = new ArrayList<Long>();
609     for (int i = 0; i < l0Count; ++i) {
610       l0Sizes.add(5L);
611     }
612     List<List<Long>> sizes = new ArrayList<List<Long>>();
613     for (int i = 0; i <= boundaries.length; ++i) {
614       sizes.add(Arrays.asList(Long.valueOf(5)));
615     }
616     return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
617   }
618 
619   /**
620    * @param l0Count Number of L0 files.
621    * @param l0Size Size of each file.
622    * @return Mock stripes.
623    */
624   private static StripeInformationProvider createStripesL0Only(
625       int l0Count, long l0Size) throws Exception {
626     List<Long> l0Sizes = new ArrayList<Long>();
627     for (int i = 0; i < l0Count; ++i) {
628       l0Sizes.add(l0Size);
629     }
630     return createStripes(null, new ArrayList<List<Long>>(), l0Sizes);
631   }
632 
633   /**
634    * @param l0Count Number of L0 files.
635    * @param l0Size Size of each file.
636    * @param sizes Sizes of the files; each sub-array representing a stripe.
637    * @return Mock stripes.
638    */
639   private static StripeInformationProvider createStripesWithSizes(
640       int l0Count, long l0Size, Long[]... sizes) throws Exception {
641     ArrayList<List<Long>> sizeList = new ArrayList<List<Long>>();
642     for (Long[] size : sizes) {
643       sizeList.add(Arrays.asList(size));
644     }
645     return createStripesWithSizes(l0Count, l0Size, sizeList);
646   }
647 
648   private static StripeInformationProvider createStripesWithSizes(
649       int l0Count, long l0Size, List<List<Long>> sizes) throws Exception {
650     List<byte[]> boundaries = createBoundaries(sizes.size());
651     List<Long> l0Sizes = new ArrayList<Long>();
652     for (int i = 0; i < l0Count; ++i) {
653       l0Sizes.add(l0Size);
654     }
655     return createStripes(boundaries, sizes, l0Sizes);
656   }
657 
658   private static List<byte[]> createBoundaries(int stripeCount) {
659     byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
660     assert stripeCount <= keys.length + 1;
661     List<byte[]> boundaries = new ArrayList<byte[]>();
662     boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
663     return boundaries;
664   }
665 
666   private static StripeInformationProvider createStripes(List<byte[]> boundaries,
667       List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
668     List<List<StoreFile>> stripeFiles = new ArrayList<List<StoreFile>>(stripeSizes.size());
669     for (List<Long> sizes : stripeSizes) {
670       List<StoreFile> sfs = new ArrayList<StoreFile>();
671       for (Long size : sizes) {
672         sfs.add(createFile(size));
673       }
674       stripeFiles.add(sfs);
675     }
676     List<StoreFile> l0Files = new ArrayList<StoreFile>();
677     for (Long size : l0Sizes) {
678       l0Files.add(createFile(size));
679     }
680     return createStripesWithFiles(boundaries, stripeFiles, l0Files);
681   }
682 
683   /**
684    * This method actually does all the work.
685    */
686   private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
687       List<List<StoreFile>> stripeFiles, List<StoreFile> l0Files) throws Exception {
688     ArrayList<ImmutableList<StoreFile>> stripes = new ArrayList<ImmutableList<StoreFile>>();
689     ArrayList<byte[]> boundariesList = new ArrayList<byte[]>();
690     StripeInformationProvider si = mock(StripeInformationProvider.class);
691     if (!stripeFiles.isEmpty()) {
692       assert stripeFiles.size() == (boundaries.size() + 1);
693       boundariesList.add(OPEN_KEY);
694       for (int i = 0; i <= boundaries.size(); ++i) {
695         byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
696         byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
697         boundariesList.add(endKey);
698         for (StoreFile sf : stripeFiles.get(i)) {
699           setFileStripe(sf, startKey, endKey);
700         }
701         stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
702         when(si.getStartRow(eq(i))).thenReturn(startKey);
703         when(si.getEndRow(eq(i))).thenReturn(endKey);
704       }
705     }
706     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
707     sfs.addAllSublists(stripes);
708     sfs.addSublist(l0Files);
709     when(si.getStorefiles()).thenReturn(sfs);
710     when(si.getStripes()).thenReturn(stripes);
711     when(si.getStripeBoundaries()).thenReturn(boundariesList);
712     when(si.getStripeCount()).thenReturn(stripes.size());
713     when(si.getLevel0Files()).thenReturn(l0Files);
714     return si;
715   }
716 
717   private static StoreFile createFile(long size) throws Exception {
718     StoreFile sf = mock(StoreFile.class);
719     when(sf.getPath()).thenReturn(new Path("moo"));
720     StoreFile.Reader r = mock(StoreFile.Reader.class);
721     when(r.getEntries()).thenReturn(size);
722     when(r.length()).thenReturn(size);
723     when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
724     when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
725     when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
726       mock(StoreFileScanner.class));
727     when(sf.getReader()).thenReturn(r);
728     when(sf.createReader(anyBoolean())).thenReturn(r);
729     when(sf.createReader()).thenReturn(r);
730     return sf;
731   }
732 
733   private static StoreFile createFile() throws Exception {
734     return createFile(0);
735   }
736 
737   private static void setFileStripe(StoreFile sf, byte[] startKey, byte[] endKey) {
738     when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
739     when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
740   }
741 
742   private static StripeCompactor createCompactor() throws Exception {
743     HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
744     StoreFileWritersCapture writers = new StoreFileWritersCapture();
745     Store store = mock(Store.class);
746     HRegionInfo info = mock(HRegionInfo.class);
747     when(info.getRegionNameAsString()).thenReturn("testRegion");
748     when(store.getFamily()).thenReturn(col);
749     when(store.getRegionInfo()).thenReturn(info);
750     when(
751       store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
752         anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
753 
754     Configuration conf = HBaseConfiguration.create();
755     final Scanner scanner = new Scanner();
756     return new StripeCompactor(conf, store) {
757       @Override
758       protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
759           long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
760           byte[] dropDeletesToRow) throws IOException {
761         return scanner;
762       }
763 
764       @Override
765       protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
766           ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
767         return scanner;
768       }
769     };
770   }
771 
772   private static class Scanner implements InternalScanner {
773     private final ArrayList<KeyValue> kvs;
774 
775     public Scanner(KeyValue... kvs) {
776       this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
777     }
778 
779     @Override
780     public boolean next(List<Cell> results) throws IOException {
781       if (kvs.isEmpty()) return false;
782       results.add(kvs.remove(0));
783       return !kvs.isEmpty();
784     }
785 
786     @Override
787     public boolean next(List<Cell> result, ScannerContext scannerContext)
788         throws IOException {
789       return next(result);
790     }
791 
792     @Override
793     public void close() throws IOException {
794     }
795   }
796 }