1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static java.util.Arrays.asList;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.File;
27 import java.io.FileNotFoundException;
28 import java.io.FileOutputStream;
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.List;
33 import java.util.Random;
34
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FSDataOutputStream;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.CellUtil;
39 import org.apache.hadoop.hbase.DoNotRetryIOException;
40 import org.apache.hadoop.hbase.HBaseConfiguration;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HRegionInfo;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.KeyValue;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.io.hfile.HFile;
48 import org.apache.hadoop.hbase.io.hfile.HFileContext;
49 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
50 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
51 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
52 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
53 import org.apache.hadoop.hbase.testclassification.SmallTests;
54 import org.apache.hadoop.hbase.util.Bytes;
55 import org.apache.hadoop.hbase.util.Pair;
56 import org.apache.hadoop.hbase.wal.WAL;
57 import org.apache.hadoop.hbase.wal.WALKey;
58 import org.hamcrest.Description;
59 import org.hamcrest.Matcher;
60 import org.hamcrest.TypeSafeMatcher;
61
62 import static org.mockito.Matchers.*;
63 import static org.mockito.Mockito.mock;
64 import static org.mockito.Mockito.verify;
65 import static org.mockito.Mockito.when;
66 import org.mockito.invocation.InvocationOnMock;
67 import org.mockito.stubbing.Answer;
68
69 import org.junit.Before;
70 import org.junit.ClassRule;
71 import org.junit.Rule;
72 import org.junit.Test;
73 import org.junit.experimental.categories.Category;
74 import org.junit.rules.TemporaryFolder;
75 import org.junit.rules.TestName;
76
77
78
79
80 @Category(SmallTests.class)
81 public class TestBulkLoad {
82
83 @ClassRule
84 public static TemporaryFolder testFolder = new TemporaryFolder();
85 private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
86 private final WAL log = mock(WAL.class);
87 private final Configuration conf = HBaseConfiguration.create();
88 private final Random random = new Random();
89 private final byte[] randomBytes = new byte[100];
90 private final byte[] family1 = Bytes.toBytes("family1");
91 private final byte[] family2 = Bytes.toBytes("family2");
92 @Rule
93 public TestName name = new TestName();
94
95 @Before
96 public void before() throws IOException {
97 random.nextBytes(randomBytes);
98
99 }
100
101 @Test
102 public void verifyBulkLoadEvent() throws IOException {
103 TableName tableName = TableName.valueOf("test", "test");
104 List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1);
105 byte[] familyName = familyPaths.get(0).getFirst();
106 String storeFileName = familyPaths.get(0).getSecond();
107 storeFileName = (new Path(storeFileName)).getName();
108 List<String> storeFileNames = new ArrayList<String>();
109 storeFileNames.add(storeFileName);
110 when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class),
111 argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
112 familyName, storeFileNames)),
113 any(boolean.class))).thenAnswer(new Answer() {
114 public Object answer(InvocationOnMock invocation) {
115 WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
116 MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
117 if (mvcc != null) {
118 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
119 walKey.setWriteEntry(we);
120 }
121 return 01L;
122 };
123 });
124 testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
125 .bulkLoadHFiles(familyPaths, false, null);
126 verify(log).sync(anyLong());
127 }
128
129 @Test
130 public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
131 testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<Pair<byte[], String>>(),
132 false, null);
133 }
134
135 @Test
136 public void shouldBulkLoadSingleFamilyHLog() throws IOException {
137 when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
138 any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
139 any(boolean.class))).thenAnswer(new Answer() {
140 public Object answer(InvocationOnMock invocation) {
141 WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
142 MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
143 if (mvcc != null) {
144 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
145 walKey.setWriteEntry(we);
146 }
147 return 01L;
148 };
149 });
150 testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
151 verify(log).sync(anyLong());
152 }
153
154 @Test
155 public void shouldBulkLoadManyFamilyHLog() throws IOException {
156 when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
157 any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
158 any(boolean.class))).thenAnswer(new Answer() {
159 public Object answer(InvocationOnMock invocation) {
160 WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
161 MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
162 if (mvcc != null) {
163 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
164 walKey.setWriteEntry(we);
165 }
166 return 01L;
167 };
168 });
169 testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
170 false, null);
171 verify(log).sync(anyLong());
172 }
173
174 @Test
175 public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
176 when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
177 any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
178 any(boolean.class))).thenAnswer(new Answer() {
179 public Object answer(InvocationOnMock invocation) {
180 WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
181 MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
182 if (mvcc != null) {
183 MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
184 walKey.setWriteEntry(we);
185 }
186 return 01L;
187 };
188 });
189 TableName tableName = TableName.valueOf("test", "test");
190 testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
191 .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
192 verify(log).sync(anyLong());
193 }
194
195 @Test(expected = DoNotRetryIOException.class)
196 public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
197 testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false,
198 null);
199 }
200
201 @Test(expected = DoNotRetryIOException.class)
202 public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
203 throws IOException {
204 testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
205 }
206
207 @Test(expected = DoNotRetryIOException.class)
208 public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
209 testRegionWithFamilies()
210 .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
211 false, null);
212 }
213
214 @Test(expected = FileNotFoundException.class)
215 public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
216 List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
217 testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null);
218 }
219
220 private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
221 return new Pair<byte[], String>(family, getNotExistFilePath());
222 }
223
224 private String getNotExistFilePath() {
225 Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist");
226 return path.toUri().getPath();
227 }
228
229 private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
230 throws IOException {
231 createHFileForFamilies(family);
232 return new Pair<byte[], String>(new byte[]{0x00, 0x01, 0x02}, getNotExistFilePath());
233 }
234
235
236 private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
237 byte[]... families)
238 throws IOException {
239 HRegionInfo hRegionInfo = new HRegionInfo(tableName);
240 HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
241 for (byte[] family : families) {
242 hTableDescriptor.addFamily(new HColumnDescriptor(family));
243 }
244
245
246 return HRegion.createHRegion(hRegionInfo,
247 new Path(testFolder.newFolder().toURI()),
248 conf,
249 hTableDescriptor,
250 log);
251
252 }
253
254 private HRegion testRegionWithFamilies(byte[]... families) throws IOException {
255 TableName tableName = TableName.valueOf(name.getMethodName());
256 return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
257 }
258
259 private List<Pair<byte[], String>> getBlankFamilyPaths(){
260 return new ArrayList<Pair<byte[], String>>();
261 }
262
263 private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
264 List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
265 for (byte[] family : families) {
266 familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(family)));
267 }
268 return familyPaths;
269 }
270
271 private String createHFileForFamilies(byte[] family) throws IOException {
272 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
273
274 File hFileLocation = testFolder.newFile();
275 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation));
276 try {
277 hFileFactory.withOutputStream(out);
278 hFileFactory.withFileContext(new HFileContext());
279 HFile.Writer writer = hFileFactory.create();
280 try {
281 writer.append(new KeyValue(CellUtil.createCell(randomBytes,
282 family,
283 randomBytes,
284 0l,
285 KeyValue.Type.Put.getCode(),
286 randomBytes)));
287 } finally {
288 writer.close();
289 }
290 } finally {
291 out.close();
292 }
293 return hFileLocation.getAbsoluteFile().getAbsolutePath();
294 }
295
296 private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
297 return new WalMatcher(typeBytes);
298 }
299
300 private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
301 byte[] familyName, List<String> storeFileNames) {
302 return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
303 }
304
305 private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
306 private final byte[] typeBytes;
307 private final byte[] tableName;
308 private final byte[] familyName;
309 private final List<String> storeFileNames;
310
311 public WalMatcher(byte[] typeBytes) {
312 this(typeBytes, null, null, null);
313 }
314
315 public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
316 List<String> storeFileNames) {
317 this.typeBytes = typeBytes;
318 this.tableName = tableName;
319 this.familyName = familyName;
320 this.storeFileNames = storeFileNames;
321 }
322
323 @Override
324 protected boolean matchesSafely(WALEdit item) {
325 assertTrue(Arrays.equals(item.getCells().get(0).getQualifier(), typeBytes));
326 BulkLoadDescriptor desc;
327 try {
328 desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
329 } catch (IOException e) {
330 return false;
331 }
332 assertNotNull(desc);
333
334 if (tableName != null) {
335 assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
336 tableName));
337 }
338
339 if(storeFileNames != null) {
340 int index=0;
341 StoreDescriptor store = desc.getStores(0);
342 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
343 assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
344 assertEquals(storeFileNames.size(), store.getStoreFileCount());
345 }
346
347 return true;
348 }
349
350 @Override
351 public void describeTo(Description description) {
352
353 }
354 }
355 }