1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
21 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
22 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
23 import static org.junit.Assert.assertArrayEquals;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNull;
28 import static org.mockito.Matchers.any;
29 import static org.mockito.Matchers.anyBoolean;
30 import static org.mockito.Matchers.anyLong;
31 import static org.mockito.Mockito.doAnswer;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.when;
34
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.TreeMap;
40
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.hbase.Cell;
45 import org.apache.hadoop.hbase.HBaseConfiguration;
46 import org.apache.hadoop.hbase.HColumnDescriptor;
47 import org.apache.hadoop.hbase.HRegionInfo;
48 import org.apache.hadoop.hbase.KeyValue;
49 import org.apache.hadoop.hbase.KeyValue.KVComparator;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.io.compress.Compression;
52 import org.apache.hadoop.hbase.io.hfile.HFile;
53 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
54 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
55 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
56 import org.apache.hadoop.hbase.testclassification.SmallTests;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.mockito.invocation.InvocationOnMock;
61 import org.mockito.stubbing.Answer;
62
63
64 @Category(SmallTests.class)
65 public class TestStripeCompactor {
66 private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
67 private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
68
69 private static final byte[] KEY_B = Bytes.toBytes("bbb");
70 private static final byte[] KEY_C = Bytes.toBytes("ccc");
71 private static final byte[] KEY_D = Bytes.toBytes("ddd");
72
73 private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
74 private static final KeyValue KV_B = kvAfter(KEY_B);
75 private static final KeyValue KV_C = kvAfter(KEY_C);
76 private static final KeyValue KV_D = kvAfter(KEY_D);
77
78 private static KeyValue kvAfter(byte[] key) {
79 return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
80 }
81
82 private static <T> T[] a(T... a) {
83 return a;
84 }
85
86 private static KeyValue[] e() {
87 return TestStripeCompactor.<KeyValue>a();
88 }
89
90 @Test
91 public void testBoundaryCompactions() throws Exception {
92
93 verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
94 a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
95 verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
96 verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
97 }
98
99 @Test
100 public void testBoundaryCompactionEmptyFiles() throws Exception {
101
102 verifyBoundaryCompaction(
103 a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false);
104 verifyBoundaryCompaction(a(KV_A, KV_C),
105 a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false);
106
107 verifyBoundaryCompaction(
108 e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false);
109
110 verifyBoundaryCompaction(
111 e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false);
112 verifyBoundaryCompaction(
113 e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false);
114
115 verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
116 a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
117 verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
118 a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
119
120 }
121
122 public static void verifyBoundaryCompaction(
123 KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception {
124 verifyBoundaryCompaction(input, boundaries, output, null, null, true);
125 }
126
127 public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries,
128 KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles)
129 throws Exception {
130 StoreFileWritersCapture writers = new StoreFileWritersCapture();
131 StripeCompactor sc = createCompactor(writers, input);
132 List<Path> paths =
133 sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
134 NoLimitCompactionThroughputController.INSTANCE);
135 writers.verifyKvs(output, allFiles, true);
136 if (allFiles) {
137 assertEquals(output.length, paths.size());
138 writers.verifyBoundaries(boundaries);
139 }
140 }
141
142 @Test
143 public void testSizeCompactions() throws Exception {
144
145 verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
146 a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
147 verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
148 a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
149 verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
150
151 verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
152 a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
153 verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
154 a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
155
156 verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
157 a(a(KV_A), a(KV_B, KV_C, KV_D)));
158 verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
159 new KeyValue[][] { a(KV_A, KV_B, KV_C) });
160
161 verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
162 a(a(KV_A, KV_B), a(KV_C, KV_D)));
163 }
164
165 public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize,
166 byte[] left, byte[] right, KeyValue[][] output) throws Exception {
167 StoreFileWritersCapture writers = new StoreFileWritersCapture();
168 StripeCompactor sc = createCompactor(writers, input);
169 List<Path> paths =
170 sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
171 NoLimitCompactionThroughputController.INSTANCE);
172 assertEquals(output.length, paths.size());
173 writers.verifyKvs(output, true, true);
174 List<byte[]> boundaries = new ArrayList<byte[]>();
175 boundaries.add(left);
176 for (int i = 1; i < output.length; ++i) {
177 boundaries.add(output[i][0].getRow());
178 }
179 boundaries.add(right);
180 writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
181 }
182
183 private static StripeCompactor createCompactor(
184 StoreFileWritersCapture writers, KeyValue[] input) throws Exception {
185 Configuration conf = HBaseConfiguration.create();
186 final Scanner scanner = new Scanner(input);
187
188
189 HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
190 ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, new KVComparator());
191 Store store = mock(Store.class);
192 when(store.getFamily()).thenReturn(col);
193 when(store.getScanInfo()).thenReturn(si);
194 when(store.areWritesEnabled()).thenReturn(true);
195 when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
196 when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
197 when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
198 anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
199 when(store.getComparator()).thenReturn(new KVComparator());
200
201 return new StripeCompactor(conf, store) {
202 @Override
203 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
204 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
205 byte[] dropDeletesToRow) throws IOException {
206 return scanner;
207 }
208
209 @Override
210 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
211 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
212 return scanner;
213 }
214 };
215 }
216
217 private static CompactionRequest createDummyRequest() throws Exception {
218
219
220 StoreFile sf = mock(StoreFile.class);
221 StoreFile.Reader r = mock(StoreFile.Reader.class);
222 when(r.length()).thenReturn(1L);
223 when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
224 when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
225 when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
226 .thenReturn(mock(StoreFileScanner.class));
227 when(sf.getReader()).thenReturn(r);
228 when(sf.createReader()).thenReturn(r);
229 when(sf.createReader(anyBoolean())).thenReturn(r);
230 return new CompactionRequest(Arrays.asList(sf));
231 }
232
233 private static class Scanner implements InternalScanner {
234 private final ArrayList<KeyValue> kvs;
235 public Scanner(KeyValue... kvs) {
236 this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
237 }
238
239 @Override
240 public boolean next(List<Cell> results) throws IOException {
241 if (kvs.isEmpty()) return false;
242 results.add(kvs.remove(0));
243 return !kvs.isEmpty();
244 }
245
246 @Override
247 public boolean next(List<Cell> result, ScannerContext scannerContext)
248 throws IOException {
249 return next(result);
250 }
251
252 @Override
253 public void close() throws IOException {}
254 }
255
256
257 public static class StoreFileWritersCapture implements
258 Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
259 public static class Writer {
260 public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
261 public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
262 }
263
264 private List<Writer> writers = new ArrayList<Writer>();
265
266 @Override
267 public StoreFile.Writer createWriter() throws IOException {
268 final Writer realWriter = new Writer();
269 writers.add(realWriter);
270 StoreFile.Writer writer = mock(StoreFile.Writer.class);
271 doAnswer(new Answer<Object>() {
272 public Object answer(InvocationOnMock invocation) {
273 return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]);
274 }}).when(writer).append(any(KeyValue.class));
275 doAnswer(new Answer<Object>() {
276 public Object answer(InvocationOnMock invocation) {
277 Object[] args = invocation.getArguments();
278 return realWriter.data.put((byte[])args[0], (byte[])args[1]);
279 }}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
280 return writer;
281 }
282
283 @Override
284 public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
285 return createWriter();
286 }
287
288 public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
289 if (allFiles) {
290 assertEquals(kvss.length, writers.size());
291 }
292 int skippedWriters = 0;
293 for (int i = 0; i < kvss.length; ++i) {
294 KeyValue[] kvs = kvss[i];
295 if (kvs != null) {
296 Writer w = writers.get(i - skippedWriters);
297 if (requireMetadata) {
298 assertNotNull(w.data.get(STRIPE_START_KEY));
299 assertNotNull(w.data.get(STRIPE_END_KEY));
300 } else {
301 assertNull(w.data.get(STRIPE_START_KEY));
302 assertNull(w.data.get(STRIPE_END_KEY));
303 }
304 assertEquals(kvs.length, w.kvs.size());
305 for (int j = 0; j < kvs.length; ++j) {
306 assertEquals(kvs[j], w.kvs.get(j));
307 }
308 } else {
309 assertFalse(allFiles);
310 ++skippedWriters;
311 }
312 }
313 }
314
315 public void verifyBoundaries(byte[][] boundaries) {
316 assertEquals(boundaries.length - 1, writers.size());
317 for (int i = 0; i < writers.size(); ++i) {
318 assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY));
319 assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY));
320 }
321 }
322 }
323 }