View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static java.util.Arrays.asList;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.io.File;
27  import java.io.FileNotFoundException;
28  import java.io.FileOutputStream;
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.List;
33  import java.util.Random;
34  
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.CellUtil;
39  import org.apache.hadoop.hbase.DoNotRetryIOException;
40  import org.apache.hadoop.hbase.HBaseConfiguration;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.io.hfile.HFile;
48  import org.apache.hadoop.hbase.io.hfile.HFileContext;
49  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
50  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
51  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
52  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
53  import org.apache.hadoop.hbase.testclassification.SmallTests;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.Pair;
56  import org.apache.hadoop.hbase.wal.WAL;
57  import org.apache.hadoop.hbase.wal.WALKey;
58  import org.hamcrest.Description;
59  import org.hamcrest.Matcher;
60  import org.hamcrest.TypeSafeMatcher;
61  
62  import static org.mockito.Matchers.*;
63  import static org.mockito.Mockito.mock;
64  import static org.mockito.Mockito.verify;
65  import static org.mockito.Mockito.when;
66  import org.mockito.invocation.InvocationOnMock;
67  import org.mockito.stubbing.Answer;
68  
69  import org.junit.Before;
70  import org.junit.ClassRule;
71  import org.junit.Rule;
72  import org.junit.Test;
73  import org.junit.experimental.categories.Category;
74  import org.junit.rules.TemporaryFolder;
75  import org.junit.rules.TestName;
76  
77  /**
78   * This class attempts to unit test bulk HLog loading.
79   */
80  @Category(SmallTests.class)
81  public class TestBulkLoad {
82  
83    @ClassRule
84    public static TemporaryFolder testFolder = new TemporaryFolder();
85    private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
86    private final WAL log = mock(WAL.class);
87    private final Configuration conf = HBaseConfiguration.create();
88    private final Random random = new Random();
89    private final byte[] randomBytes = new byte[100];
90    private final byte[] family1 = Bytes.toBytes("family1");
91    private final byte[] family2 = Bytes.toBytes("family2");
92    @Rule
93    public TestName name = new TestName();
94  
95    @Before
96    public void before() throws IOException {
97      random.nextBytes(randomBytes);
98      // Mockito.when(log.append(htd, info, key, edits, inMemstore));
99    }
100 
101   @Test
102   public void verifyBulkLoadEvent() throws IOException {
103     TableName tableName = TableName.valueOf("test", "test");
104     List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1);
105     byte[] familyName = familyPaths.get(0).getFirst();
106     String storeFileName = familyPaths.get(0).getSecond();
107     storeFileName = (new Path(storeFileName)).getName();
108     List<String> storeFileNames = new ArrayList<String>();
109     storeFileNames.add(storeFileName);
110     when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class),
111             argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
112                     familyName, storeFileNames)),
113             any(boolean.class))).thenAnswer(new Answer() {
114       public Object answer(InvocationOnMock invocation) {
115         WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
116         MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
117         if (mvcc != null) {
118           MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
119           walKey.setWriteEntry(we);
120         }
121         return 01L;
122       };
123     });
124     testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
125         .bulkLoadHFiles(familyPaths, false, null);
126     verify(log).sync(anyLong());
127   }
128 
129   @Test
130   public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
131     testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<Pair<byte[], String>>(),
132       false, null);
133   }
134 
135   @Test
136   public void shouldBulkLoadSingleFamilyHLog() throws IOException {
137     when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
138             any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
139             any(boolean.class))).thenAnswer(new Answer() {
140       public Object answer(InvocationOnMock invocation) {
141         WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
142         MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
143         if (mvcc != null) {
144           MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
145           walKey.setWriteEntry(we);
146         }
147         return 01L;
148       };
149     });
150     testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
151     verify(log).sync(anyLong());
152   }
153 
154   @Test
155   public void shouldBulkLoadManyFamilyHLog() throws IOException {
156     when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
157             any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
158             any(boolean.class))).thenAnswer(new Answer() {
159               public Object answer(InvocationOnMock invocation) {
160                 WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
161                 MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
162                 if (mvcc != null) {
163                   MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
164                   walKey.setWriteEntry(we);
165                 }
166                 return 01L;
167               };
168             });
169     testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
170             false, null);
171     verify(log).sync(anyLong());
172   }
173 
174   @Test
175   public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
176     when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
177             any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
178             any(boolean.class))).thenAnswer(new Answer() {
179       public Object answer(InvocationOnMock invocation) {
180         WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
181         MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
182         if (mvcc != null) {
183           MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
184           walKey.setWriteEntry(we);
185         }
186         return 01L;
187       };
188     });
189     TableName tableName = TableName.valueOf("test", "test");
190     testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
191         .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
192     verify(log).sync(anyLong());
193   }
194 
195   @Test(expected = DoNotRetryIOException.class)
196   public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
197     testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false,
198       null);
199   }
200 
201   @Test(expected = DoNotRetryIOException.class)
202   public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
203       throws IOException {
204     testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
205   }
206 
207   @Test(expected = DoNotRetryIOException.class)
208   public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
209     testRegionWithFamilies()
210         .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
211             false, null);
212   }
213 
214   @Test(expected = FileNotFoundException.class)
215   public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
216     List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
217     testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null);
218   }
219 
220   private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
221     return new Pair<byte[], String>(family, getNotExistFilePath());
222   }
223 
224   private String getNotExistFilePath() {
225     Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist");
226     return path.toUri().getPath();
227   }
228 
229   private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
230       throws IOException {
231     createHFileForFamilies(family);
232     return new Pair<byte[], String>(new byte[]{0x00, 0x01, 0x02}, getNotExistFilePath());
233   }
234 
235 
236   private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
237                                                               byte[]... families)
238   throws IOException {
239     HRegionInfo hRegionInfo = new HRegionInfo(tableName);
240     HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
241     for (byte[] family : families) {
242       hTableDescriptor.addFamily(new HColumnDescriptor(family));
243     }
244 
245     // TODO We need a way to do this without creating files
246     return HRegion.createHRegion(hRegionInfo,
247         new Path(testFolder.newFolder().toURI()),
248         conf,
249         hTableDescriptor,
250         log);
251 
252   }
253 
254   private HRegion testRegionWithFamilies(byte[]... families) throws IOException {
255     TableName tableName = TableName.valueOf(name.getMethodName());
256     return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
257   }
258 
259   private List<Pair<byte[], String>> getBlankFamilyPaths(){
260     return new ArrayList<Pair<byte[], String>>();
261   }
262 
263   private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
264     List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
265     for (byte[] family : families) {
266       familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(family)));
267     }
268     return familyPaths;
269   }
270 
271   private String createHFileForFamilies(byte[] family) throws IOException {
272     HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
273     // TODO We need a way to do this without creating files
274     File hFileLocation = testFolder.newFile();
275     FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation));
276     try {
277       hFileFactory.withOutputStream(out);
278       hFileFactory.withFileContext(new HFileContext());
279       HFile.Writer writer = hFileFactory.create();
280       try {
281         writer.append(new KeyValue(CellUtil.createCell(randomBytes,
282             family,
283             randomBytes,
284             0l,
285             KeyValue.Type.Put.getCode(),
286             randomBytes)));
287       } finally {
288         writer.close();
289       }
290     } finally {
291       out.close();
292     }
293     return hFileLocation.getAbsoluteFile().getAbsolutePath();
294   }
295 
296   private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
297     return new WalMatcher(typeBytes);
298   }
299 
300   private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
301       byte[] familyName, List<String> storeFileNames) {
302     return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
303   }
304 
305   private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
306     private final byte[] typeBytes;
307     private final byte[] tableName;
308     private final byte[] familyName;
309     private final List<String> storeFileNames;
310 
311     public WalMatcher(byte[] typeBytes) {
312       this(typeBytes, null, null, null);
313     }
314 
315     public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
316         List<String> storeFileNames) {
317       this.typeBytes = typeBytes;
318       this.tableName = tableName;
319       this.familyName = familyName;
320       this.storeFileNames = storeFileNames;
321     }
322 
323     @Override
324     protected boolean matchesSafely(WALEdit item) {
325       assertTrue(Arrays.equals(item.getCells().get(0).getQualifier(), typeBytes));
326       BulkLoadDescriptor desc;
327       try {
328         desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
329       } catch (IOException e) {
330         return false;
331       }
332       assertNotNull(desc);
333 
334       if (tableName != null) {
335         assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
336           tableName));
337       }
338 
339       if(storeFileNames != null) {
340         int index=0;
341         StoreDescriptor store = desc.getStores(0);
342         assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
343         assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
344         assertEquals(storeFileNames.size(), store.getStoreFileCount());
345       }
346 
347       return true;
348     }
349 
350     @Override
351     public void describeTo(Description description) {
352 
353     }
354   }
355 }