View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.mockito.Matchers.anyObject;
25  import static org.mockito.Mockito.doAnswer;
26  import static org.mockito.Mockito.doReturn;
27  import static org.mockito.Mockito.doThrow;
28  import static org.mockito.Mockito.mock;
29  import static org.mockito.Mockito.spy;
30  
31  import java.io.IOException;
32  import java.util.Arrays;
33  import java.util.Map;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.*;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.client.Connection;
40  import org.apache.hadoop.hbase.client.ConnectionFactory;
41  import org.apache.hadoop.hbase.client.HTable;
42  import org.apache.hadoop.hbase.client.Put;
43  import org.apache.hadoop.hbase.client.Result;
44  import org.apache.hadoop.hbase.client.ResultScanner;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.client.Table;
47  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
48  import org.apache.hadoop.hbase.filter.Filter;
49  import org.apache.hadoop.hbase.filter.RegexStringComparator;
50  import org.apache.hadoop.hbase.filter.RowFilter;
51  import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
52  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
53  import org.apache.hadoop.hbase.testclassification.LargeTests;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.io.NullWritable;
56  import org.apache.hadoop.mapred.InputFormat;
57  import org.apache.hadoop.mapred.JobClient;
58  import org.apache.hadoop.mapred.JobConf;
59  import org.apache.hadoop.mapred.JobConfigurable;
60  import org.apache.hadoop.mapred.MiniMRCluster;
61  import org.apache.hadoop.mapred.OutputCollector;
62  import org.apache.hadoop.mapred.Reporter;
63  import org.apache.hadoop.mapred.RunningJob;
64  import org.apache.hadoop.mapred.lib.NullOutputFormat;
65  import org.junit.AfterClass;
66  import org.junit.Before;
67  import org.junit.BeforeClass;
68  import org.junit.Test;
69  import org.junit.experimental.categories.Category;
70  import org.mockito.invocation.InvocationOnMock;
71  import org.mockito.stubbing.Answer;
72  
73  /**
74   * This tests the TableInputFormat and its recovery semantics
75   * 
76   */
77  @Category(LargeTests.class)
78  public class TestTableInputFormat {
79  
80    private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
81  
82    private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
83    private static MiniMRCluster mrCluster;
84    static final byte[] FAMILY = Bytes.toBytes("family");
85  
86    private static final byte[][] columns = new byte[][] { FAMILY };
87  
88    @BeforeClass
89    public static void beforeClass() throws Exception {
90      UTIL.startMiniCluster();
91      mrCluster = UTIL.startMiniMapReduceCluster();
92    }
93  
94    @AfterClass
95    public static void afterClass() throws Exception {
96      UTIL.shutdownMiniMapReduceCluster();
97      UTIL.shutdownMiniCluster();
98    }
99  
100   @Before
101   public void before() throws IOException {
102     LOG.info("before");
103     UTIL.ensureSomeRegionServersAvailable(1);
104     LOG.info("before done");
105   }
106 
107   /**
108    * Setup a table with two rows and values.
109    * 
110    * @param tableName
111    * @return
112    * @throws IOException
113    */
114   public static Table createTable(byte[] tableName) throws IOException {
115     return createTable(tableName, new byte[][] { FAMILY });
116   }
117 
118   /**
119    * Setup a table with two rows and values per column family.
120    * 
121    * @param tableName
122    * @return
123    * @throws IOException
124    */
125   public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
126     Table table = UTIL.createTable(TableName.valueOf(tableName), families);
127     Put p = new Put("aaa".getBytes());
128     for (byte[] family : families) {
129       p.add(family, null, "value aaa".getBytes());
130     }
131     table.put(p);
132     p = new Put("bbb".getBytes());
133     for (byte[] family : families) {
134       p.add(family, null, "value bbb".getBytes());
135     }
136     table.put(p);
137     return table;
138   }
139 
140   /**
141    * Verify that the result and key have expected values.
142    * 
143    * @param r
144    * @param key
145    * @param expectedKey
146    * @param expectedValue
147    * @return
148    */
149   static boolean checkResult(Result r, ImmutableBytesWritable key,
150       byte[] expectedKey, byte[] expectedValue) {
151     assertEquals(0, key.compareTo(expectedKey));
152     Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
153     byte[] value = vals.values().iterator().next();
154     assertTrue(Arrays.equals(value, expectedValue));
155     return true; // if succeed
156   }
157 
158   /**
159    * Create table data and run tests on specified htable using the
160    * o.a.h.hbase.mapred API.
161    * 
162    * @param table
163    * @throws IOException
164    */
165   static void runTestMapred(Table table) throws IOException {
166     org.apache.hadoop.hbase.mapred.TableRecordReader trr = 
167         new org.apache.hadoop.hbase.mapred.TableRecordReader();
168     trr.setStartRow("aaa".getBytes());
169     trr.setEndRow("zzz".getBytes());
170     trr.setHTable(table);
171     trr.setInputColumns(columns);
172 
173     trr.init();
174     Result r = new Result();
175     ImmutableBytesWritable key = new ImmutableBytesWritable();
176 
177     boolean more = trr.next(key, r);
178     assertTrue(more);
179     checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
180 
181     more = trr.next(key, r);
182     assertTrue(more);
183     checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
184 
185     // no more data
186     more = trr.next(key, r);
187     assertFalse(more);
188   }
189 
190   /**
191    * Create a table that IOE's on first scanner next call
192    * 
193    * @throws IOException
194    */
195   static Table createIOEScannerTable(byte[] name, final int failCnt)
196       throws IOException {
197     // build up a mock scanner stuff to fail the first time
198     Answer<ResultScanner> a = new Answer<ResultScanner>() {
199       int cnt = 0;
200 
201       @Override
202       public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
203         // first invocation return the busted mock scanner
204         if (cnt++ < failCnt) {
205           // create mock ResultScanner that always fails.
206           Scan scan = mock(Scan.class);
207           doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
208           ResultScanner scanner = mock(ResultScanner.class);
209           // simulate TimeoutException / IOException
210           doThrow(new IOException("Injected exception")).when(scanner).next();
211           return scanner;
212         }
213 
214         // otherwise return the real scanner.
215         return (ResultScanner) invocation.callRealMethod();
216       }
217     };
218 
219     Table htable = spy(createTable(name));
220     doAnswer(a).when(htable).getScanner((Scan) anyObject());
221     return htable;
222   }
223 
224   /**
225    * Create a table that throws a DoNoRetryIOException on first scanner next
226    * call
227    * 
228    * @throws IOException
229    */
230   static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
231       throws IOException {
232     // build up a mock scanner stuff to fail the first time
233     Answer<ResultScanner> a = new Answer<ResultScanner>() {
234       int cnt = 0;
235 
236       @Override
237       public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
238         // first invocation return the busted mock scanner
239         if (cnt++ < failCnt) {
240           // create mock ResultScanner that always fails.
241           Scan scan = mock(Scan.class);
242           doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
243           ResultScanner scanner = mock(ResultScanner.class);
244 
245           invocation.callRealMethod(); // simulate NotServingRegionException
246           doThrow(
247               new NotServingRegionException("Injected simulated TimeoutException"))
248               .when(scanner).next();
249           return scanner;
250         }
251 
252         // otherwise return the real scanner.
253         return (ResultScanner) invocation.callRealMethod();
254       }
255     };
256 
257     Table htable = spy(createTable(name));
258     doAnswer(a).when(htable).getScanner((Scan) anyObject());
259     return htable;
260   }
261 
262   /**
263    * Run test assuming no errors using mapred api.
264    * 
265    * @throws IOException
266    */
267   @Test
268   public void testTableRecordReader() throws IOException {
269     Table table = createTable("table1".getBytes());
270     runTestMapred(table);
271   }
272 
273   /**
274    * Run test assuming Scanner IOException failure using mapred api,
275    * 
276    * @throws IOException
277    */
278   @Test
279   public void testTableRecordReaderScannerFail() throws IOException {
280     Table htable = createIOEScannerTable("table2".getBytes(), 1);
281     runTestMapred(htable);
282   }
283 
284   /**
285    * Run test assuming Scanner IOException failure using mapred api,
286    * 
287    * @throws IOException
288    */
289   @Test(expected = IOException.class)
290   public void testTableRecordReaderScannerFailTwice() throws IOException {
291     Table htable = createIOEScannerTable("table3".getBytes(), 2);
292     runTestMapred(htable);
293   }
294 
295   /**
296    * Run test assuming NotServingRegionException using mapred api.
297    * 
298    * @throws org.apache.hadoop.hbase.DoNotRetryIOException
299    */
300   @Test
301   public void testTableRecordReaderScannerTimeout() throws IOException {
302     Table htable = createDNRIOEScannerTable("table4".getBytes(), 1);
303     runTestMapred(htable);
304   }
305 
306   /**
307    * Run test assuming NotServingRegionException using mapred api.
308    * 
309    * @throws org.apache.hadoop.hbase.DoNotRetryIOException
310    */
311   @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
312   public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
313     Table htable = createDNRIOEScannerTable("table5".getBytes(), 2);
314     runTestMapred(htable);
315   }
316 
317   /**
318    * Verify the example we present in javadocs on TableInputFormatBase
319    */
320   @Test
321   public void testExtensionOfTableInputFormatBase() throws IOException {
322     LOG.info("testing use of an InputFormat taht extends InputFormatBase");
323     final Table table = createTable(Bytes.toBytes("exampleTable"),
324       new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
325     testInputFormat(ExampleTIF.class);
326   }
327 
328   @Test
329   public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
330     LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
331         + "as it was given in 0.98.");
332     final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
333       new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
334     testInputFormat(ExampleDeprecatedTIF.class);
335   }
336 
337   @Test
338   public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
339     LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
340         + "using JobConfigurable.");
341     final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
342       new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
343     testInputFormat(ExampleJobConfigurableTIF.class);
344   }
345 
346   void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
347     final JobConf job = MapreduceTestingShim.getJobConf(mrCluster);
348     job.setInputFormat(clazz);
349     job.setOutputFormat(NullOutputFormat.class);
350     job.setMapperClass(ExampleVerifier.class);
351     job.setNumReduceTasks(0);
352     LOG.debug("submitting job.");
353     final RunningJob run = JobClient.runJob(job);
354     assertTrue("job failed!", run.isSuccessful());
355     assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
356         .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
357     assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
358         .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
359     assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
360         .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
361     assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
362         .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
363     assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
364         .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
365     assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
366         .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
367   }
368 
369   public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> {
370 
371     @Override
372     public void configure(JobConf conf) {
373     }
374 
375     @Override
376     public void map(ImmutableBytesWritable key, Result value,
377         OutputCollector<NullWritable,NullWritable> output,
378         Reporter reporter) throws IOException {
379       for (Cell cell : value.listCells()) {
380         reporter.getCounter(TestTableInputFormat.class.getName() + ":row",
381             Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
382             .increment(1l);
383         reporter.getCounter(TestTableInputFormat.class.getName() + ":family",
384             Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
385             .increment(1l);
386         reporter.getCounter(TestTableInputFormat.class.getName() + ":value",
387             Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
388             .increment(1l);
389       }
390     }
391 
392     @Override
393     public void close() {
394     }
395 
396   }
397 
398   public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
399 
400     @Override
401     public void configure(JobConf job) {
402       try {
403         HTable exampleTable = new HTable(HBaseConfiguration.create(job),
404           Bytes.toBytes("exampleDeprecatedTable"));
405         // mandatory
406         setHTable(exampleTable);
407         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
408           Bytes.toBytes("columnB") };
409         // mandatory
410         setInputColumns(inputColumns);
411         Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
412         // optional
413         setRowFilter(exampleFilter);
414       } catch (IOException exception) {
415         throw new RuntimeException("Failed to configure for job.", exception);
416       }
417     }
418 
419   }
420 
421   public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
422 
423     @Override
424     public void configure(JobConf job) {
425       try {
426         initialize(job);
427       } catch (IOException exception) {
428         throw new RuntimeException("Failed to initialize.", exception);
429       }
430     }
431 
432     @Override
433     protected void initialize(JobConf job) throws IOException {
434       initialize(job, "exampleJobConfigurableTable");
435     }
436   }
437 
438 
439   public static class ExampleTIF extends TableInputFormatBase {
440 
441     @Override
442     protected void initialize(JobConf job) throws IOException {
443       initialize(job, "exampleTable");
444     }
445 
446     protected void initialize(JobConf job, String table) throws IOException {
447       Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
448       TableName tableName = TableName.valueOf(table);
449       // mandatory
450       initializeTable(connection, tableName);
451       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
452         Bytes.toBytes("columnB") };
453       // mandatory
454       setInputColumns(inputColumns);
455       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
456       // optional
457       setRowFilter(exampleFilter);
458     }
459 
460   }
461 
462 }
463