1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.lang.reflect.Constructor;
22 import java.lang.reflect.InvocationTargetException;
23 import java.lang.reflect.Method;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.mapred.JobConf;
27 import org.apache.hadoop.mapred.MiniMRCluster;
28 import org.apache.hadoop.mapreduce.Job;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.JobID;
31
32
33
34
35
36
37
38 abstract public class MapreduceTestingShim {
39 private static MapreduceTestingShim instance;
40 private static Class[] emptyParam = new Class[] {};
41
42 static {
43 try {
44
45 Class c = Class
46 .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
47 instance = new MapreduceV2Shim();
48 } catch (Exception e) {
49 instance = new MapreduceV1Shim();
50 }
51 }
52
53 abstract public JobContext newJobContext(Configuration jobConf)
54 throws IOException;
55
56 abstract public Job newJob(Configuration conf) throws IOException;
57
58 abstract public JobConf obtainJobConf(MiniMRCluster cluster);
59
60 abstract public String obtainMROutputDirProp();
61
62 public static JobContext createJobContext(Configuration jobConf)
63 throws IOException {
64 return instance.newJobContext(jobConf);
65 }
66
67 public static JobConf getJobConf(MiniMRCluster cluster) {
68 return instance.obtainJobConf(cluster);
69 }
70
71 public static Job createJob(Configuration conf) throws IOException {
72 return instance.newJob(conf);
73 }
74
75 public static String getMROutputDirProp() {
76 return instance.obtainMROutputDirProp();
77 }
78
79 private static class MapreduceV1Shim extends MapreduceTestingShim {
80 public JobContext newJobContext(Configuration jobConf) throws IOException {
81
82
83 JobID jobId = new JobID();
84 Constructor<JobContext> c;
85 try {
86 c = JobContext.class.getConstructor(Configuration.class, JobID.class);
87 return c.newInstance(jobConf, jobId);
88 } catch (Exception e) {
89 throw new IllegalStateException(
90 "Failed to instantiate new JobContext(jobConf, new JobID())", e);
91 }
92 }
93
94 @Override
95 public Job newJob(Configuration conf) throws IOException {
96
97
98 Constructor<Job> c;
99 try {
100 c = Job.class.getConstructor(Configuration.class);
101 return c.newInstance(conf);
102 } catch (Exception e) {
103 throw new IllegalStateException(
104 "Failed to instantiate new Job(conf)", e);
105 }
106 }
107
108 public JobConf obtainJobConf(MiniMRCluster cluster) {
109 if (cluster == null) return null;
110 try {
111 Object runner = cluster.getJobTrackerRunner();
112 Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
113 Object tracker = meth.invoke(runner, new Object []{});
114 Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
115 return (JobConf) m.invoke(tracker, new Object []{});
116 } catch (NoSuchMethodException nsme) {
117 return null;
118 } catch (InvocationTargetException ite) {
119 return null;
120 } catch (IllegalAccessException iae) {
121 return null;
122 }
123 }
124
125 @Override
126 public String obtainMROutputDirProp() {
127 return "mapred.output.dir";
128 }
129 };
130
131 private static class MapreduceV2Shim extends MapreduceTestingShim {
132 public JobContext newJobContext(Configuration jobConf) {
133 return newJob(jobConf);
134 }
135
136 @Override
137 public Job newJob(Configuration jobConf) {
138
139
140 try {
141 Method m = Job.class.getMethod("getInstance", Configuration.class);
142 return (Job) m.invoke(null, jobConf);
143 } catch (Exception e) {
144 e.printStackTrace();
145 throw new IllegalStateException(
146 "Failed to return from Job.getInstance(jobConf)");
147 }
148 }
149
150 public JobConf obtainJobConf(MiniMRCluster cluster) {
151 try {
152 Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
153 return (JobConf) meth.invoke(cluster, new Object []{});
154 } catch (NoSuchMethodException nsme) {
155 return null;
156 } catch (InvocationTargetException ite) {
157 return null;
158 } catch (IllegalAccessException iae) {
159 return null;
160 }
161 }
162
163 @Override
164 public String obtainMROutputDirProp() {
165
166
167 return "mapreduce.output.fileoutputformat.outputdir";
168 }
169 };
170
171 }