1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Matchers.anyObject;
25 import static org.mockito.Mockito.doAnswer;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.spy;
30
31 import java.io.IOException;
32 import java.util.Arrays;
33 import java.util.Map;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.*;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.client.Connection;
40 import org.apache.hadoop.hbase.client.ConnectionFactory;
41 import org.apache.hadoop.hbase.client.HTable;
42 import org.apache.hadoop.hbase.client.Put;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.ResultScanner;
45 import org.apache.hadoop.hbase.client.Scan;
46 import org.apache.hadoop.hbase.client.Table;
47 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
48 import org.apache.hadoop.hbase.filter.Filter;
49 import org.apache.hadoop.hbase.filter.RegexStringComparator;
50 import org.apache.hadoop.hbase.filter.RowFilter;
51 import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
52 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
53 import org.apache.hadoop.hbase.testclassification.LargeTests;
54 import org.apache.hadoop.hbase.util.Bytes;
55 import org.apache.hadoop.io.NullWritable;
56 import org.apache.hadoop.mapred.InputFormat;
57 import org.apache.hadoop.mapred.JobClient;
58 import org.apache.hadoop.mapred.JobConf;
59 import org.apache.hadoop.mapred.JobConfigurable;
60 import org.apache.hadoop.mapred.MiniMRCluster;
61 import org.apache.hadoop.mapred.OutputCollector;
62 import org.apache.hadoop.mapred.Reporter;
63 import org.apache.hadoop.mapred.RunningJob;
64 import org.apache.hadoop.mapred.lib.NullOutputFormat;
65 import org.junit.AfterClass;
66 import org.junit.Before;
67 import org.junit.BeforeClass;
68 import org.junit.Test;
69 import org.junit.experimental.categories.Category;
70 import org.mockito.invocation.InvocationOnMock;
71 import org.mockito.stubbing.Answer;
72
73
74
75
76
77 @Category(LargeTests.class)
78 public class TestTableInputFormat {
79
80 private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
81
82 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
83 private static MiniMRCluster mrCluster;
84 static final byte[] FAMILY = Bytes.toBytes("family");
85
86 private static final byte[][] columns = new byte[][] { FAMILY };
87
88 @BeforeClass
89 public static void beforeClass() throws Exception {
90 UTIL.startMiniCluster();
91 mrCluster = UTIL.startMiniMapReduceCluster();
92 }
93
94 @AfterClass
95 public static void afterClass() throws Exception {
96 UTIL.shutdownMiniMapReduceCluster();
97 UTIL.shutdownMiniCluster();
98 }
99
100 @Before
101 public void before() throws IOException {
102 LOG.info("before");
103 UTIL.ensureSomeRegionServersAvailable(1);
104 LOG.info("before done");
105 }
106
107
108
109
110
111
112
113
114 public static Table createTable(byte[] tableName) throws IOException {
115 return createTable(tableName, new byte[][] { FAMILY });
116 }
117
118
119
120
121
122
123
124
125 public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
126 Table table = UTIL.createTable(TableName.valueOf(tableName), families);
127 Put p = new Put("aaa".getBytes());
128 for (byte[] family : families) {
129 p.add(family, null, "value aaa".getBytes());
130 }
131 table.put(p);
132 p = new Put("bbb".getBytes());
133 for (byte[] family : families) {
134 p.add(family, null, "value bbb".getBytes());
135 }
136 table.put(p);
137 return table;
138 }
139
140
141
142
143
144
145
146
147
148
149 static boolean checkResult(Result r, ImmutableBytesWritable key,
150 byte[] expectedKey, byte[] expectedValue) {
151 assertEquals(0, key.compareTo(expectedKey));
152 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
153 byte[] value = vals.values().iterator().next();
154 assertTrue(Arrays.equals(value, expectedValue));
155 return true;
156 }
157
158
159
160
161
162
163
164
165 static void runTestMapred(Table table) throws IOException {
166 org.apache.hadoop.hbase.mapred.TableRecordReader trr =
167 new org.apache.hadoop.hbase.mapred.TableRecordReader();
168 trr.setStartRow("aaa".getBytes());
169 trr.setEndRow("zzz".getBytes());
170 trr.setHTable(table);
171 trr.setInputColumns(columns);
172
173 trr.init();
174 Result r = new Result();
175 ImmutableBytesWritable key = new ImmutableBytesWritable();
176
177 boolean more = trr.next(key, r);
178 assertTrue(more);
179 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
180
181 more = trr.next(key, r);
182 assertTrue(more);
183 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
184
185
186 more = trr.next(key, r);
187 assertFalse(more);
188 }
189
190
191
192
193
194
195 static Table createIOEScannerTable(byte[] name, final int failCnt)
196 throws IOException {
197
198 Answer<ResultScanner> a = new Answer<ResultScanner>() {
199 int cnt = 0;
200
201 @Override
202 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
203
204 if (cnt++ < failCnt) {
205
206 Scan scan = mock(Scan.class);
207 doReturn("bogus".getBytes()).when(scan).getStartRow();
208 ResultScanner scanner = mock(ResultScanner.class);
209
210 doThrow(new IOException("Injected exception")).when(scanner).next();
211 return scanner;
212 }
213
214
215 return (ResultScanner) invocation.callRealMethod();
216 }
217 };
218
219 Table htable = spy(createTable(name));
220 doAnswer(a).when(htable).getScanner((Scan) anyObject());
221 return htable;
222 }
223
224
225
226
227
228
229
230 static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
231 throws IOException {
232
233 Answer<ResultScanner> a = new Answer<ResultScanner>() {
234 int cnt = 0;
235
236 @Override
237 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
238
239 if (cnt++ < failCnt) {
240
241 Scan scan = mock(Scan.class);
242 doReturn("bogus".getBytes()).when(scan).getStartRow();
243 ResultScanner scanner = mock(ResultScanner.class);
244
245 invocation.callRealMethod();
246 doThrow(
247 new NotServingRegionException("Injected simulated TimeoutException"))
248 .when(scanner).next();
249 return scanner;
250 }
251
252
253 return (ResultScanner) invocation.callRealMethod();
254 }
255 };
256
257 Table htable = spy(createTable(name));
258 doAnswer(a).when(htable).getScanner((Scan) anyObject());
259 return htable;
260 }
261
262
263
264
265
266
267 @Test
268 public void testTableRecordReader() throws IOException {
269 Table table = createTable("table1".getBytes());
270 runTestMapred(table);
271 }
272
273
274
275
276
277
278 @Test
279 public void testTableRecordReaderScannerFail() throws IOException {
280 Table htable = createIOEScannerTable("table2".getBytes(), 1);
281 runTestMapred(htable);
282 }
283
284
285
286
287
288
289 @Test(expected = IOException.class)
290 public void testTableRecordReaderScannerFailTwice() throws IOException {
291 Table htable = createIOEScannerTable("table3".getBytes(), 2);
292 runTestMapred(htable);
293 }
294
295
296
297
298
299
300 @Test
301 public void testTableRecordReaderScannerTimeout() throws IOException {
302 Table htable = createDNRIOEScannerTable("table4".getBytes(), 1);
303 runTestMapred(htable);
304 }
305
306
307
308
309
310
311 @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
312 public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
313 Table htable = createDNRIOEScannerTable("table5".getBytes(), 2);
314 runTestMapred(htable);
315 }
316
317
318
319
320 @Test
321 public void testExtensionOfTableInputFormatBase() throws IOException {
322 LOG.info("testing use of an InputFormat taht extends InputFormatBase");
323 final Table table = createTable(Bytes.toBytes("exampleTable"),
324 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
325 testInputFormat(ExampleTIF.class);
326 }
327
328 @Test
329 public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
330 LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
331 + "as it was given in 0.98.");
332 final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
333 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
334 testInputFormat(ExampleDeprecatedTIF.class);
335 }
336
337 @Test
338 public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
339 LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
340 + "using JobConfigurable.");
341 final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
342 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
343 testInputFormat(ExampleJobConfigurableTIF.class);
344 }
345
346 void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
347 final JobConf job = MapreduceTestingShim.getJobConf(mrCluster);
348 job.setInputFormat(clazz);
349 job.setOutputFormat(NullOutputFormat.class);
350 job.setMapperClass(ExampleVerifier.class);
351 job.setNumReduceTasks(0);
352 LOG.debug("submitting job.");
353 final RunningJob run = JobClient.runJob(job);
354 assertTrue("job failed!", run.isSuccessful());
355 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
356 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
357 assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
358 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
359 assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
360 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
361 assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
362 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
363 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
364 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
365 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
366 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
367 }
368
369 public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> {
370
371 @Override
372 public void configure(JobConf conf) {
373 }
374
375 @Override
376 public void map(ImmutableBytesWritable key, Result value,
377 OutputCollector<NullWritable,NullWritable> output,
378 Reporter reporter) throws IOException {
379 for (Cell cell : value.listCells()) {
380 reporter.getCounter(TestTableInputFormat.class.getName() + ":row",
381 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
382 .increment(1l);
383 reporter.getCounter(TestTableInputFormat.class.getName() + ":family",
384 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
385 .increment(1l);
386 reporter.getCounter(TestTableInputFormat.class.getName() + ":value",
387 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
388 .increment(1l);
389 }
390 }
391
392 @Override
393 public void close() {
394 }
395
396 }
397
398 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
399
400 @Override
401 public void configure(JobConf job) {
402 try {
403 HTable exampleTable = new HTable(HBaseConfiguration.create(job),
404 Bytes.toBytes("exampleDeprecatedTable"));
405
406 setHTable(exampleTable);
407 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
408 Bytes.toBytes("columnB") };
409
410 setInputColumns(inputColumns);
411 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
412
413 setRowFilter(exampleFilter);
414 } catch (IOException exception) {
415 throw new RuntimeException("Failed to configure for job.", exception);
416 }
417 }
418
419 }
420
421 public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
422
423 @Override
424 public void configure(JobConf job) {
425 try {
426 initialize(job);
427 } catch (IOException exception) {
428 throw new RuntimeException("Failed to initialize.", exception);
429 }
430 }
431
432 @Override
433 protected void initialize(JobConf job) throws IOException {
434 initialize(job, "exampleJobConfigurableTable");
435 }
436 }
437
438
439 public static class ExampleTIF extends TableInputFormatBase {
440
441 @Override
442 protected void initialize(JobConf job) throws IOException {
443 initialize(job, "exampleTable");
444 }
445
446 protected void initialize(JobConf job, String table) throws IOException {
447 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
448 TableName tableName = TableName.valueOf(table);
449
450 initializeTable(connection, tableName);
451 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
452 Bytes.toBytes("columnB") };
453
454 setInputColumns(inputColumns);
455 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
456
457 setRowFilter(exampleFilter);
458 }
459
460 }
461
462 }
463