View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
21  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
22  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
23  import static org.junit.Assert.assertArrayEquals;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertFalse;
26  import static org.junit.Assert.assertNotNull;
27  import static org.junit.Assert.assertNull;
28  import static org.mockito.Matchers.any;
29  import static org.mockito.Matchers.anyBoolean;
30  import static org.mockito.Matchers.anyLong;
31  import static org.mockito.Mockito.doAnswer;
32  import static org.mockito.Mockito.mock;
33  import static org.mockito.Mockito.when;
34  
35  import java.io.IOException;
36  import java.util.ArrayList;
37  import java.util.Arrays;
38  import java.util.List;
39  import java.util.TreeMap;
40  
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.Cell;
45  import org.apache.hadoop.hbase.HBaseConfiguration;
46  import org.apache.hadoop.hbase.HColumnDescriptor;
47  import org.apache.hadoop.hbase.HRegionInfo;
48  import org.apache.hadoop.hbase.KeyValue;
49  import org.apache.hadoop.hbase.KeyValue.KVComparator;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.io.compress.Compression;
52  import org.apache.hadoop.hbase.io.hfile.HFile;
53  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
54  import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
55  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
56  import org.apache.hadoop.hbase.testclassification.SmallTests;
57  import org.apache.hadoop.hbase.util.Bytes;
58  import org.junit.Test;
59  import org.junit.experimental.categories.Category;
60  import org.mockito.invocation.InvocationOnMock;
61  import org.mockito.stubbing.Answer;
62  
63  
64  @Category(SmallTests.class)
65  public class TestStripeCompactor {
66    private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
67    private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
68  
69    private static final byte[] KEY_B = Bytes.toBytes("bbb");
70    private static final byte[] KEY_C = Bytes.toBytes("ccc");
71    private static final byte[] KEY_D = Bytes.toBytes("ddd");
72  
73    private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
74    private static final KeyValue KV_B = kvAfter(KEY_B);
75    private static final KeyValue KV_C = kvAfter(KEY_C);
76    private static final KeyValue KV_D = kvAfter(KEY_D);
77  
78    private static KeyValue kvAfter(byte[] key) {
79      return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
80    }
81  
82    private static <T> T[] a(T... a) {
83      return a;
84    }
85  
86    private static KeyValue[] e() {
87      return TestStripeCompactor.<KeyValue>a();
88    }
89  
90    @Test
91    public void testBoundaryCompactions() throws Exception {
92      // General verification
93      verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
94          a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
95      verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
96      verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
97    }
98  
99    @Test
100   public void testBoundaryCompactionEmptyFiles() throws Exception {
101     // No empty file if there're already files.
102     verifyBoundaryCompaction(
103         a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false);
104     verifyBoundaryCompaction(a(KV_A, KV_C),
105         a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false);
106     // But should be created if there are no file.
107     verifyBoundaryCompaction(
108         e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false);
109     // In major range if there's major range.
110     verifyBoundaryCompaction(
111         e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false);
112     verifyBoundaryCompaction(
113         e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false);
114     // Major range should have files regardless of KVs.
115     verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
116         a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
117     verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
118         a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
119 
120   }
121 
122   public static void verifyBoundaryCompaction(
123       KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception {
124     verifyBoundaryCompaction(input, boundaries, output, null, null, true);
125   }
126 
127   public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries,
128       KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles)
129           throws Exception {
130     StoreFileWritersCapture writers = new StoreFileWritersCapture();
131     StripeCompactor sc = createCompactor(writers, input);
132     List<Path> paths =
133         sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
134           NoLimitCompactionThroughputController.INSTANCE);
135     writers.verifyKvs(output, allFiles, true);
136     if (allFiles) {
137       assertEquals(output.length, paths.size());
138       writers.verifyBoundaries(boundaries);
139     }
140   }
141 
142   @Test
143   public void testSizeCompactions() throws Exception {
144     // General verification with different sizes.
145     verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
146         a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
147     verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
148         a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
149     verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
150     // Verify row boundaries are preserved.
151     verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
152         a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
153     verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
154         a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
155     // Too much data, count limits the number of files.
156     verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
157         a(a(KV_A), a(KV_B, KV_C, KV_D)));
158     verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
159         new KeyValue[][] { a(KV_A, KV_B, KV_C) });
160     // Too little data/large count, no extra files.
161     verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
162         a(a(KV_A, KV_B), a(KV_C, KV_D)));
163   }
164 
165   public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize,
166       byte[] left, byte[] right, KeyValue[][] output) throws Exception {
167     StoreFileWritersCapture writers = new StoreFileWritersCapture();
168     StripeCompactor sc = createCompactor(writers, input);
169     List<Path> paths =
170         sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
171           NoLimitCompactionThroughputController.INSTANCE);
172     assertEquals(output.length, paths.size());
173     writers.verifyKvs(output, true, true);
174     List<byte[]> boundaries = new ArrayList<byte[]>();
175     boundaries.add(left);
176     for (int i = 1; i < output.length; ++i) {
177       boundaries.add(output[i][0].getRow());
178     }
179     boundaries.add(right);
180     writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
181   }
182 
183   private static StripeCompactor createCompactor(
184       StoreFileWritersCapture writers, KeyValue[] input) throws Exception {
185     Configuration conf = HBaseConfiguration.create();
186     final Scanner scanner = new Scanner(input);
187 
188     // Create store mock that is satisfactory for compactor.
189     HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
190     ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, new KVComparator());
191     Store store = mock(Store.class);
192     when(store.getFamily()).thenReturn(col);
193     when(store.getScanInfo()).thenReturn(si);
194     when(store.areWritesEnabled()).thenReturn(true);
195     when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
196     when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
197     when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
198         anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
199     when(store.getComparator()).thenReturn(new KVComparator());
200 
201     return new StripeCompactor(conf, store) {
202       @Override
203       protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
204           long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
205           byte[] dropDeletesToRow) throws IOException {
206         return scanner;
207       }
208 
209       @Override
210       protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
211           ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
212         return scanner;
213       }
214     };
215   }
216 
217   private static CompactionRequest createDummyRequest() throws Exception {
218     // "Files" are totally unused, it's Scanner class below that gives compactor fake KVs.
219     // But compaction depends on everything under the sun, so stub everything with dummies.
220     StoreFile sf = mock(StoreFile.class);
221     StoreFile.Reader r = mock(StoreFile.Reader.class);
222     when(r.length()).thenReturn(1L);
223     when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
224     when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
225     when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
226       .thenReturn(mock(StoreFileScanner.class));
227     when(sf.getReader()).thenReturn(r);
228     when(sf.createReader()).thenReturn(r);
229     when(sf.createReader(anyBoolean())).thenReturn(r);
230     return new CompactionRequest(Arrays.asList(sf));
231   }
232 
233   private static class Scanner implements InternalScanner {
234     private final ArrayList<KeyValue> kvs;
235     public Scanner(KeyValue... kvs) {
236       this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
237     }
238 
239     @Override
240     public boolean next(List<Cell> results) throws IOException {
241       if (kvs.isEmpty()) return false;
242       results.add(kvs.remove(0));
243       return !kvs.isEmpty();
244     }
245 
246     @Override
247     public boolean next(List<Cell> result, ScannerContext scannerContext)
248         throws IOException {
249       return next(result);
250     }
251 
252     @Override
253     public void close() throws IOException {}
254   }
255 
256   // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
257   public static class StoreFileWritersCapture implements
258     Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
259     public static class Writer {
260       public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
261       public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
262     }
263 
264     private List<Writer> writers = new ArrayList<Writer>();
265 
266     @Override
267     public StoreFile.Writer createWriter() throws IOException {
268       final Writer realWriter = new Writer();
269       writers.add(realWriter);
270       StoreFile.Writer writer = mock(StoreFile.Writer.class);
271       doAnswer(new Answer<Object>() {
272         public Object answer(InvocationOnMock invocation) {
273           return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]);
274         }}).when(writer).append(any(KeyValue.class));
275       doAnswer(new Answer<Object>() {
276         public Object answer(InvocationOnMock invocation) {
277           Object[] args = invocation.getArguments();
278           return realWriter.data.put((byte[])args[0], (byte[])args[1]);
279         }}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
280       return writer;
281     }
282 
283     @Override
284     public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
285       return createWriter();
286     }
287 
288     public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
289       if (allFiles) {
290         assertEquals(kvss.length, writers.size());
291       }
292       int skippedWriters = 0;
293       for (int i = 0; i < kvss.length; ++i) {
294         KeyValue[] kvs = kvss[i];
295         if (kvs != null) {
296           Writer w = writers.get(i - skippedWriters);
297           if (requireMetadata) {
298             assertNotNull(w.data.get(STRIPE_START_KEY));
299             assertNotNull(w.data.get(STRIPE_END_KEY));
300           } else {
301             assertNull(w.data.get(STRIPE_START_KEY));
302             assertNull(w.data.get(STRIPE_END_KEY));
303           }
304           assertEquals(kvs.length, w.kvs.size());
305           for (int j = 0; j < kvs.length; ++j) {
306             assertEquals(kvs[j], w.kvs.get(j));
307           }
308         } else {
309           assertFalse(allFiles);
310           ++skippedWriters;
311         }
312       }
313     }
314 
315     public void verifyBoundaries(byte[][] boundaries) {
316       assertEquals(boundaries.length - 1, writers.size());
317       for (int i = 0; i < writers.size(); ++i) {
318         assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY));
319         assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY));
320       }
321     }
322   }
323 }