1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.File;
24 import java.io.IOException;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.fs.FileUtil;
29 import org.apache.hadoop.hbase.testclassification.LargeTests;
30 import org.apache.hadoop.hbase.client.HTable;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34 import org.apache.hadoop.hbase.mapreduce.TestTableMapReduceBase;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.mapred.JobClient;
37 import org.apache.hadoop.mapred.JobConf;
38 import org.apache.hadoop.mapred.MapReduceBase;
39 import org.apache.hadoop.mapred.OutputCollector;
40 import org.apache.hadoop.mapred.Reporter;
41 import org.apache.hadoop.mapred.RunningJob;
42 import org.junit.experimental.categories.Category;
43
44
45
46
47
48
49 @Category(LargeTests.class)
50 @SuppressWarnings("deprecation")
51 public class TestTableMapReduce extends TestTableMapReduceBase {
52 private static final Log LOG =
53 LogFactory.getLog(TestTableMapReduce.class.getName());
54
55 protected Log getLog() { return LOG; }
56
57
58
59
60 static class ProcessContentsMapper extends MapReduceBase implements
61 TableMap<ImmutableBytesWritable, Put> {
62
63
64
65
66 public void map(ImmutableBytesWritable key, Result value,
67 OutputCollector<ImmutableBytesWritable, Put> output,
68 Reporter reporter)
69 throws IOException {
70 output.collect(key, TestTableMapReduceBase.map(key, value));
71 }
72 }
73
74 @Override
75 protected void runTestOnTable(HTable table) throws IOException {
76 JobConf jobConf = null;
77 try {
78 LOG.info("Before map/reduce startup");
79 jobConf = new JobConf(UTIL.getConfiguration(), TestTableMapReduce.class);
80 jobConf.setJobName("process column contents");
81 jobConf.setNumReduceTasks(1);
82 TableMapReduceUtil.initTableMapJob(table.getName().getNameAsString(),
83 Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class,
84 ImmutableBytesWritable.class, Put.class, jobConf);
85 TableMapReduceUtil.initTableReduceJob(table.getName().getNameAsString(),
86 IdentityTableReduce.class, jobConf);
87
88 LOG.info("Started " + table.getName());
89 RunningJob job = JobClient.runJob(jobConf);
90 assertTrue(job.isSuccessful());
91 LOG.info("After map/reduce completion");
92
93
94 verify(table.getName());
95 } finally {
96 if (jobConf != null) {
97 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
98 }
99 }
100 }
101 }
102