1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.AdditionalMatchers.aryEq;
26 import static org.mockito.Matchers.any;
27 import static org.mockito.Matchers.anyBoolean;
28 import static org.mockito.Matchers.anyInt;
29 import static org.mockito.Matchers.anyLong;
30 import static org.mockito.Matchers.argThat;
31 import static org.mockito.Matchers.eq;
32 import static org.mockito.Matchers.isNull;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.only;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.when;
38
39 import java.io.IOException;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.Iterator;
44 import java.util.List;
45
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.hbase.Cell;
49 import org.apache.hadoop.hbase.HBaseConfiguration;
50 import org.apache.hadoop.hbase.HColumnDescriptor;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.KeyValue;
53 import org.apache.hadoop.hbase.io.compress.Compression;
54 import org.apache.hadoop.hbase.io.hfile.HFile;
55 import org.apache.hadoop.hbase.regionserver.BloomType;
56 import org.apache.hadoop.hbase.regionserver.InternalScanner;
57 import org.apache.hadoop.hbase.regionserver.ScanType;
58 import org.apache.hadoop.hbase.regionserver.ScannerContext;
59 import org.apache.hadoop.hbase.regionserver.Store;
60 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
61 import org.apache.hadoop.hbase.regionserver.StoreFile;
62 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
63 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
64 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
65 import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
66 import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
67 import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
68 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
69 import org.apache.hadoop.hbase.security.User;
70 import org.apache.hadoop.hbase.testclassification.SmallTests;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.util.ConcatenatedLists;
73 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
74 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
75 import org.junit.Test;
76 import org.junit.experimental.categories.Category;
77 import org.mockito.ArgumentMatcher;
78
79 import com.google.common.collect.ImmutableList;
80 import com.google.common.collect.Lists;
81
82 @Category(SmallTests.class)
83 public class TestStripeCompactionPolicy {
84 private static final byte[] KEY_A = Bytes.toBytes("aaa");
85 private static final byte[] KEY_B = Bytes.toBytes("bbb");
86 private static final byte[] KEY_C = Bytes.toBytes("ccc");
87 private static final byte[] KEY_D = Bytes.toBytes("ddd");
88 private static final byte[] KEY_E = Bytes.toBytes("eee");
89 private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
90 private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
91 private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
92 private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
93 private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
94
95
96 private static long defaultSplitSize = 18;
97 private static float defaultSplitCount = 1.8F;
98 private final static int defaultInitialCount = 1;
99 private static long defaultTtl = 1000 * 1000;
100
101 @Test
102 public void testNoStripesFromFlush() throws Exception {
103 Configuration conf = HBaseConfiguration.create();
104 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
105 StripeCompactionPolicy policy = createPolicy(conf);
106 StripeInformationProvider si = createStripesL0Only(0, 0);
107
108 KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
109 KeyValue[][] expected = new KeyValue[][] { input };
110 verifyFlush(policy, si, input, expected, null);
111 }
112
113 @Test
114 public void testOldStripesFromFlush() throws Exception {
115 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
116 StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
117
118 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
119 KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
120 new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } };
121 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
122 }
123
124 @Test
125 public void testNewStripesFromFlush() throws Exception {
126 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
127 StripeInformationProvider si = createStripesL0Only(0, 0);
128 KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
129
130 KeyValue[][] expected = new KeyValue[][] { input };
131 verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
132 }
133
134 @Test
135 public void testSingleStripeCompaction() throws Exception {
136
137 Configuration conf = HBaseConfiguration.create();
138 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F);
139 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 3);
140 conf.setInt(StripeStoreConfig.MAX_FILES_KEY, 4);
141 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, 1000);
142 StoreConfigInformation sci = mock(StoreConfigInformation.class);
143 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
144 StripeCompactionPolicy policy = new StripeCompactionPolicy(conf, sci, ssc) {
145 @Override
146 public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
147 List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
148 if (!filesCompacting.isEmpty()) return null;
149 return selectSingleStripeCompaction(si, false, false, isOffpeak);
150 }
151
152 @Override
153 public boolean needsCompactions(
154 StripeInformationProvider si, List<StoreFile> filesCompacting) {
155 if (!filesCompacting.isEmpty()) return false;
156 return needsSingleStripeCompaction(si);
157 }
158 };
159
160
161 StripeInformationProvider si = createStripesWithSizes(0, 0,
162 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L });
163 verifyNoCompaction(policy, si);
164
165 si = createStripesWithSizes(0, 0,
166 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 1L, 1L });
167 assertNull(policy.selectCompaction(si, al(), false));
168 assertTrue(policy.needsCompactions(si, al()));
169
170 si = createStripesWithSizes(0, 0,
171 new Long[] { 2L }, new Long[] { 3L, 3L }, new Long[] { 5L, 4L, 3L });
172 verifySingleStripeCompaction(policy, si, 2, null);
173
174 si = createStripesWithSizes(0, 0,
175 new Long[] { 3L, 2L, 2L }, new Long[] { 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L, 1L });
176 verifySingleStripeCompaction(policy, si, 2, null);
177 si = createStripesWithSizes(0, 0,
178 new Long[] { 5L }, new Long[] { 3L, 2L, 2L, 1L }, new Long[] { 3L, 2L, 2L });
179 verifySingleStripeCompaction(policy, si, 1, null);
180
181 si = createStripesWithSizes(0, 0,
182 new Long[] { 3L, 3L, 3L }, new Long[] { 3L, 1L, 2L }, new Long[] { 3L, 2L, 2L });
183 verifySingleStripeCompaction(policy, si, 1, null);
184
185 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 5L, 4L, 4L, 4L, 4L });
186 List<StoreFile> sfs = si.getStripes().get(1).subList(1, 5);
187 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
188
189 si = createStripesWithSizes(0, 0, new Long[] { 5L }, new Long[] { 50L, 4L, 4L, 4L, 4L });
190 sfs = si.getStripes().get(1).subList(1, 5);
191 verifyCompaction(policy, si, sfs, null, 1, null, si.getStartRow(1), si.getEndRow(1), true);
192 }
193
194 @Test
195 public void testWithParallelCompaction() throws Exception {
196
197
198 assertNull(createPolicy(HBaseConfiguration.create()).selectCompaction(
199 mock(StripeInformationProvider.class), al(createFile()), false));
200 }
201
202 @Test
203 public void testWithReferences() throws Exception {
204 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
205 StripeCompactor sc = mock(StripeCompactor.class);
206 StoreFile ref = createFile();
207 when(ref.isReference()).thenReturn(true);
208 StripeInformationProvider si = mock(StripeInformationProvider.class);
209 Collection<StoreFile> sfs = al(ref, createFile());
210 when(si.getStorefiles()).thenReturn(sfs);
211
212 assertTrue(policy.needsCompactions(si, al()));
213 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
214 assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
215 scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
216 verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
217 aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
218 any(NoLimitCompactionThroughputController.class), any(User.class));
219 }
220
221 @Test
222 public void testInitialCountFromL0() throws Exception {
223 Configuration conf = HBaseConfiguration.create();
224 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
225 StripeCompactionPolicy policy = createPolicy(
226 conf, defaultSplitSize, defaultSplitCount, 2, false);
227 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8);
228 verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true);
229 si = createStripesL0Only(3, 10);
230 verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true);
231 policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false);
232 verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true);
233 }
234
235 @Test
236 public void testExistingStripesFromL0() throws Exception {
237 Configuration conf = HBaseConfiguration.create();
238 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 3);
239 StripeCompactionPolicy.StripeInformationProvider si = createStripes(3, KEY_A);
240 verifyCompaction(
241 createPolicy(conf), si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
242 }
243
244 @Test
245 public void testNothingToCompactFromL0() throws Exception {
246 Configuration conf = HBaseConfiguration.create();
247 conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 4);
248 StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 10);
249 StripeCompactionPolicy policy = createPolicy(conf);
250 verifyNoCompaction(policy, si);
251
252 si = createStripes(3, KEY_A);
253 verifyNoCompaction(policy, si);
254 }
255
256 @Test
257 public void testSplitOffStripe() throws Exception {
258 Configuration conf = HBaseConfiguration.create();
259
260 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
261 Long[] toSplit = new Long[] { defaultSplitSize - 2, 1L, 1L };
262 Long[] noSplit = new Long[] { defaultSplitSize - 2, 1L };
263 long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
264
265 StripeCompactionPolicy.StripeInformationProvider si =
266 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 2L });
267 assertNull(createPolicy(conf).selectCompaction(si, al(), false));
268
269 conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 500f);
270 StripeCompactionPolicy policy = createPolicy(conf);
271 verifyWholeStripesCompaction(policy, si, 0, 0, null, 2, splitTargetSize);
272
273 si = createStripesWithSizes(0, 0, noSplit, noSplit, toSplit);
274 verifyWholeStripesCompaction(policy, si, 2, 2, null, 2, splitTargetSize);
275
276 si = createStripesWithSizes(0, 0, noSplit, toSplit, noSplit);
277 verifyWholeStripesCompaction(policy, si, 1, 1, null, 2, splitTargetSize);
278
279
280 StripeCompactionPolicy specPolicy = createPolicy(
281 conf, defaultSplitSize + 1, defaultSplitCount, defaultInitialCount, false);
282 verifySingleStripeCompaction(specPolicy, si, 1, null);
283 }
284
285 @Test
286 public void testSplitOffStripeOffPeak() throws Exception {
287
288 Configuration conf = HBaseConfiguration.create();
289 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
290
291 StripeCompactionPolicy.StripeInformationProvider si =
292 createStripesWithSizes(0, 0, new Long[] { defaultSplitSize - 2, 1L, 1L });
293 assertEquals(2, createPolicy(conf).selectCompaction(si, al(), false).getRequest().getFiles()
294 .size());
295
296 conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 500f);
297 assertEquals(3, createPolicy(conf).selectCompaction(si, al(), true).getRequest().getFiles()
298 .size());
299 }
300
301 @Test
302 public void testSplitOffStripeDropDeletes() throws Exception {
303 Configuration conf = HBaseConfiguration.create();
304 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 2);
305 StripeCompactionPolicy policy = createPolicy(conf);
306 Long[] toSplit = new Long[] { defaultSplitSize / 2, defaultSplitSize / 2 };
307 Long[] noSplit = new Long[] { 1L };
308 long splitTargetSize = (long)(defaultSplitSize / defaultSplitCount);
309
310
311 StripeCompactionPolicy.StripeInformationProvider si =
312 createStripesWithSizes(0, 0, noSplit, toSplit);
313 verifyWholeStripesCompaction(policy, si, 1, 1, true, null, splitTargetSize);
314
315 si = createStripesWithSizes(2, 2, noSplit, toSplit);
316 verifyWholeStripesCompaction(policy, si, 1, 1, false, null, splitTargetSize);
317 }
318
319 @SuppressWarnings("unchecked")
320 @Test
321 public void testMergeExpiredFiles() throws Exception {
322 ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
323 long now = defaultTtl + 2;
324 edge.setValue(now);
325 EnvironmentEdgeManager.injectEdge(edge);
326 try {
327 StoreFile expiredFile = createFile(), notExpiredFile = createFile();
328 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
329 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
330 List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
331 List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
332 List<StoreFile> mixed = Lists.newArrayList(expiredFile, notExpiredFile);
333
334 StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create(),
335 defaultSplitSize, defaultSplitCount, defaultInitialCount, true);
336
337 StripeCompactionPolicy.StripeInformationProvider si =
338 createStripesWithFiles(expired, expired, expired);
339 verifyWholeStripesCompaction(policy, si, 0, 2, null, 1, Long.MAX_VALUE, false);
340
341 si = createStripesWithFiles(notExpired, notExpired, notExpired);
342 assertNull(policy.selectCompaction(si, al(), false));
343
344 si = createStripesWithFiles(notExpired, expired, notExpired);
345 verifyWholeStripesCompaction(policy, si, 1, 2, null, 1, Long.MAX_VALUE, false);
346
347
348 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
349 verifyWholeStripesCompaction(policy, si, 3, 4, null, 1, Long.MAX_VALUE, false);
350
351 si = createStripesWithFiles(expired, expired, notExpired, expired, mixed);
352 verifyWholeStripesCompaction(policy, si, 0, 1, null, 1, Long.MAX_VALUE, false);
353 } finally {
354 EnvironmentEdgeManager.reset();
355 }
356 }
357
358 @SuppressWarnings("unchecked")
359 @Test
360 public void testMergeExpiredStripes() throws Exception {
361
362 ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
363 long now = defaultTtl + 2;
364 edge.setValue(now);
365 EnvironmentEdgeManager.injectEdge(edge);
366 try {
367 StoreFile expiredFile = createFile(), notExpiredFile = createFile();
368 when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
369 when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
370 List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
371 List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
372
373 StripeCompactionPolicy policy =
374 createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount,
375 defaultInitialCount, true);
376
377
378 StripeCompactionPolicy.StripeInformationProvider si =
379 createStripesWithFiles(expired, expired, expired);
380 verifyMergeCompatcion(policy, si, 0, 2);
381
382
383 si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
384 verifyMergeCompatcion(policy, si, 3, 4);
385 } finally {
386 EnvironmentEdgeManager.reset();
387 }
388 }
389
390 private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
391 List<StoreFile>... stripeFiles) throws Exception {
392 return createStripesWithFiles(createBoundaries(stripeFiles.length),
393 Lists.newArrayList(stripeFiles), new ArrayList<StoreFile>());
394 }
395
396 @Test
397 public void testSingleStripeDropDeletes() throws Exception {
398 Configuration conf = HBaseConfiguration.create();
399 StripeCompactionPolicy policy = createPolicy(conf);
400
401 Long[][] stripes = new Long[][] { new Long[] { 3L, 2L, 2L, 2L }, new Long[] { 6L } };
402 StripeInformationProvider si = createStripesWithSizes(0, 0, stripes);
403 verifySingleStripeCompaction(policy, si, 0, true);
404
405 si = createStripesWithSizes(2, 2, stripes);
406 verifySingleStripeCompaction(policy, si, 0, false);
407
408 si = createStripesWithSizes(6, 2, stripes);
409 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
410 sfs.addSublist(si.getLevel0Files());
411 sfs.addSublist(si.getStripes().get(0));
412 verifyCompaction(
413 policy, si, sfs, si.getStartRow(0), si.getEndRow(0), si.getStripeBoundaries());
414
415 si = createStripesWithSizes(6, 2,
416 new Long[][] { new Long[] { 10L, 1L, 1L, 1L, 1L }, new Long[] { 12L } });
417 verifyCompaction(policy, si, si.getLevel0Files(), null, null, si.getStripeBoundaries());
418
419
420 stripes = new Long[][] { new Long[] { 100L, 3L, 2L, 2L, 2L }, new Long[] { 6L } };
421 si = createStripesWithSizes(0, 0, stripes);
422 List<StoreFile> compact_file = new ArrayList<StoreFile>();
423 Iterator<StoreFile> iter = si.getStripes().get(0).listIterator(1);
424 while (iter.hasNext()) {
425 compact_file.add(iter.next());
426 }
427 verifyCompaction(policy, si, compact_file, false, 1, null, si.getStartRow(0), si.getEndRow(0), true);
428 }
429
430
431 private static StripeCompactionPolicy createPolicy(
432 Configuration conf) throws Exception {
433 return createPolicy(conf, defaultSplitSize, defaultSplitCount, defaultInitialCount, false);
434 }
435
436 private static StripeCompactionPolicy createPolicy(Configuration conf,
437 long splitSize, float splitCount, int initialCount, boolean hasTtl) throws Exception {
438 conf.setLong(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize);
439 conf.setFloat(StripeStoreConfig.SPLIT_PARTS_KEY, splitCount);
440 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialCount);
441 StoreConfigInformation sci = mock(StoreConfigInformation.class);
442 when(sci.getStoreFileTtl()).thenReturn(hasTtl ? defaultTtl : Long.MAX_VALUE);
443 StripeStoreConfig ssc = new StripeStoreConfig(conf, sci);
444 return new StripeCompactionPolicy(conf, sci, ssc);
445 }
446
447 private static ArrayList<StoreFile> al(StoreFile... sfs) {
448 return new ArrayList<StoreFile>(Arrays.asList(sfs));
449 }
450
451 private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
452 int from, int to) throws Exception {
453 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
454 Collection<StoreFile> sfs = getAllFiles(si, from, to);
455 verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
456
457
458
459 StripeCompactor sc = createCompactor();
460 List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
461 assertEquals(1, paths.size());
462 }
463
464
465
466
467
468
469
470
471
472
473
474 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
475 StripeInformationProvider si, int from, int to, Boolean dropDeletes,
476 Integer count, Long size, boolean needsCompaction) throws IOException {
477 verifyCompaction(policy, si, getAllFiles(si, from, to), dropDeletes,
478 count, size, si.getStartRow(from), si.getEndRow(to), needsCompaction);
479 }
480
481 private void verifyWholeStripesCompaction(StripeCompactionPolicy policy,
482 StripeInformationProvider si, int from, int to, Boolean dropDeletes,
483 Integer count, Long size) throws IOException {
484 verifyWholeStripesCompaction(policy, si, from, to, dropDeletes, count, size, true);
485 }
486
487 private void verifySingleStripeCompaction(StripeCompactionPolicy policy,
488 StripeInformationProvider si, int index, Boolean dropDeletes) throws IOException {
489 verifyWholeStripesCompaction(policy, si, index, index, dropDeletes, 1, null, true);
490 }
491
492
493
494
495
496
497 private void verifyNoCompaction(
498 StripeCompactionPolicy policy, StripeInformationProvider si) throws IOException {
499 assertNull(policy.selectCompaction(si, al(), false));
500 assertFalse(policy.needsCompactions(si, al()));
501 }
502
503
504
505
506
507
508
509
510
511
512 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
513 Collection<StoreFile> sfs, byte[] dropDeletesFrom, byte[] dropDeletesTo,
514 final List<byte[]> boundaries) throws Exception {
515 StripeCompactor sc = mock(StripeCompactor.class);
516 assertTrue(policy.needsCompactions(si, al()));
517 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
518 verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
519 scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
520 verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
521 @Override
522 public boolean matches(Object argument) {
523 @SuppressWarnings("unchecked")
524 List<byte[]> other = (List<byte[]>) argument;
525 if (other.size() != boundaries.size()) return false;
526 for (int i = 0; i < other.size(); ++i) {
527 if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
528 }
529 return true;
530 }
531 }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
532 dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
533 any(NoLimitCompactionThroughputController.class), any(User.class));
534 }
535
536
537
538
539
540
541
542
543
544
545
546
547 private void verifyCompaction(StripeCompactionPolicy policy, StripeInformationProvider si,
548 Collection<StoreFile> sfs, Boolean dropDeletes, Integer count, Long size,
549 byte[] start, byte[] end, boolean needsCompaction) throws IOException {
550 StripeCompactor sc = mock(StripeCompactor.class);
551 assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
552 StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
553 verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
554 scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
555 verify(sc, times(1)).compact(eq(scr.getRequest()),
556 count == null ? anyInt() : eq(count.intValue()),
557 size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
558 dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
559 any(NoLimitCompactionThroughputController.class), any(User.class));
560 }
561
562
563 protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
564 KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
565 StoreFileWritersCapture writers = new StoreFileWritersCapture();
566 StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(si, input.length);
567 StripeMultiFileWriter mw = req.createWriter();
568 mw.init(null, writers, new KeyValue.KVComparator());
569 for (KeyValue kv : input) {
570 mw.append(kv);
571 }
572 boolean hasMetadata = boundaries != null;
573 mw.commitWriters(0, false);
574 writers.verifyKvs(expected, true, hasMetadata);
575 if (hasMetadata) {
576 writers.verifyBoundaries(boundaries);
577 }
578 }
579
580
581 private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
582 return dropDeletes == null ? any(byte[].class)
583 : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));
584 }
585
586 private void verifyCollectionsEqual(Collection<StoreFile> sfs, Collection<StoreFile> scr) {
587
588 assertEquals(sfs.size(), scr.size());
589 assertTrue(scr.containsAll(sfs));
590 }
591
592 private static List<StoreFile> getAllFiles(
593 StripeInformationProvider si, int fromStripe, int toStripe) {
594 ArrayList<StoreFile> expected = new ArrayList<StoreFile>();
595 for (int i = fromStripe; i <= toStripe; ++i) {
596 expected.addAll(si.getStripes().get(i));
597 }
598 return expected;
599 }
600
601
602
603
604
605
606 private static StripeInformationProvider createStripes(
607 int l0Count, byte[]... boundaries) throws Exception {
608 List<Long> l0Sizes = new ArrayList<Long>();
609 for (int i = 0; i < l0Count; ++i) {
610 l0Sizes.add(5L);
611 }
612 List<List<Long>> sizes = new ArrayList<List<Long>>();
613 for (int i = 0; i <= boundaries.length; ++i) {
614 sizes.add(Arrays.asList(Long.valueOf(5)));
615 }
616 return createStripes(Arrays.asList(boundaries), sizes, l0Sizes);
617 }
618
619
620
621
622
623
624 private static StripeInformationProvider createStripesL0Only(
625 int l0Count, long l0Size) throws Exception {
626 List<Long> l0Sizes = new ArrayList<Long>();
627 for (int i = 0; i < l0Count; ++i) {
628 l0Sizes.add(l0Size);
629 }
630 return createStripes(null, new ArrayList<List<Long>>(), l0Sizes);
631 }
632
633
634
635
636
637
638
639 private static StripeInformationProvider createStripesWithSizes(
640 int l0Count, long l0Size, Long[]... sizes) throws Exception {
641 ArrayList<List<Long>> sizeList = new ArrayList<List<Long>>();
642 for (Long[] size : sizes) {
643 sizeList.add(Arrays.asList(size));
644 }
645 return createStripesWithSizes(l0Count, l0Size, sizeList);
646 }
647
648 private static StripeInformationProvider createStripesWithSizes(
649 int l0Count, long l0Size, List<List<Long>> sizes) throws Exception {
650 List<byte[]> boundaries = createBoundaries(sizes.size());
651 List<Long> l0Sizes = new ArrayList<Long>();
652 for (int i = 0; i < l0Count; ++i) {
653 l0Sizes.add(l0Size);
654 }
655 return createStripes(boundaries, sizes, l0Sizes);
656 }
657
658 private static List<byte[]> createBoundaries(int stripeCount) {
659 byte[][] keys = new byte[][] { KEY_A, KEY_B, KEY_C, KEY_D, KEY_E };
660 assert stripeCount <= keys.length + 1;
661 List<byte[]> boundaries = new ArrayList<byte[]>();
662 boundaries.addAll(Arrays.asList(keys).subList(0, stripeCount - 1));
663 return boundaries;
664 }
665
666 private static StripeInformationProvider createStripes(List<byte[]> boundaries,
667 List<List<Long>> stripeSizes, List<Long> l0Sizes) throws Exception {
668 List<List<StoreFile>> stripeFiles = new ArrayList<List<StoreFile>>(stripeSizes.size());
669 for (List<Long> sizes : stripeSizes) {
670 List<StoreFile> sfs = new ArrayList<StoreFile>();
671 for (Long size : sizes) {
672 sfs.add(createFile(size));
673 }
674 stripeFiles.add(sfs);
675 }
676 List<StoreFile> l0Files = new ArrayList<StoreFile>();
677 for (Long size : l0Sizes) {
678 l0Files.add(createFile(size));
679 }
680 return createStripesWithFiles(boundaries, stripeFiles, l0Files);
681 }
682
683
684
685
686 private static StripeInformationProvider createStripesWithFiles(List<byte[]> boundaries,
687 List<List<StoreFile>> stripeFiles, List<StoreFile> l0Files) throws Exception {
688 ArrayList<ImmutableList<StoreFile>> stripes = new ArrayList<ImmutableList<StoreFile>>();
689 ArrayList<byte[]> boundariesList = new ArrayList<byte[]>();
690 StripeInformationProvider si = mock(StripeInformationProvider.class);
691 if (!stripeFiles.isEmpty()) {
692 assert stripeFiles.size() == (boundaries.size() + 1);
693 boundariesList.add(OPEN_KEY);
694 for (int i = 0; i <= boundaries.size(); ++i) {
695 byte[] startKey = ((i == 0) ? OPEN_KEY : boundaries.get(i - 1));
696 byte[] endKey = ((i == boundaries.size()) ? OPEN_KEY : boundaries.get(i));
697 boundariesList.add(endKey);
698 for (StoreFile sf : stripeFiles.get(i)) {
699 setFileStripe(sf, startKey, endKey);
700 }
701 stripes.add(ImmutableList.copyOf(stripeFiles.get(i)));
702 when(si.getStartRow(eq(i))).thenReturn(startKey);
703 when(si.getEndRow(eq(i))).thenReturn(endKey);
704 }
705 }
706 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
707 sfs.addAllSublists(stripes);
708 sfs.addSublist(l0Files);
709 when(si.getStorefiles()).thenReturn(sfs);
710 when(si.getStripes()).thenReturn(stripes);
711 when(si.getStripeBoundaries()).thenReturn(boundariesList);
712 when(si.getStripeCount()).thenReturn(stripes.size());
713 when(si.getLevel0Files()).thenReturn(l0Files);
714 return si;
715 }
716
717 private static StoreFile createFile(long size) throws Exception {
718 StoreFile sf = mock(StoreFile.class);
719 when(sf.getPath()).thenReturn(new Path("moo"));
720 StoreFile.Reader r = mock(StoreFile.Reader.class);
721 when(r.getEntries()).thenReturn(size);
722 when(r.length()).thenReturn(size);
723 when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
724 when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
725 when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
726 mock(StoreFileScanner.class));
727 when(sf.getReader()).thenReturn(r);
728 when(sf.createReader(anyBoolean())).thenReturn(r);
729 when(sf.createReader()).thenReturn(r);
730 return sf;
731 }
732
733 private static StoreFile createFile() throws Exception {
734 return createFile(0);
735 }
736
737 private static void setFileStripe(StoreFile sf, byte[] startKey, byte[] endKey) {
738 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
739 when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
740 }
741
742 private static StripeCompactor createCompactor() throws Exception {
743 HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
744 StoreFileWritersCapture writers = new StoreFileWritersCapture();
745 Store store = mock(Store.class);
746 HRegionInfo info = mock(HRegionInfo.class);
747 when(info.getRegionNameAsString()).thenReturn("testRegion");
748 when(store.getFamily()).thenReturn(col);
749 when(store.getRegionInfo()).thenReturn(info);
750 when(
751 store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
752 anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
753
754 Configuration conf = HBaseConfiguration.create();
755 final Scanner scanner = new Scanner();
756 return new StripeCompactor(conf, store) {
757 @Override
758 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
759 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
760 byte[] dropDeletesToRow) throws IOException {
761 return scanner;
762 }
763
764 @Override
765 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
766 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
767 return scanner;
768 }
769 };
770 }
771
772 private static class Scanner implements InternalScanner {
773 private final ArrayList<KeyValue> kvs;
774
775 public Scanner(KeyValue... kvs) {
776 this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
777 }
778
779 @Override
780 public boolean next(List<Cell> results) throws IOException {
781 if (kvs.isEmpty()) return false;
782 results.add(kvs.remove(0));
783 return !kvs.isEmpty();
784 }
785
786 @Override
787 public boolean next(List<Cell> result, ScannerContext scannerContext)
788 throws IOException {
789 return next(result);
790 }
791
792 @Override
793 public void close() throws IOException {
794 }
795 }
796 }