View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.File;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileUtil;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.testclassification.LargeTests;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.client.Result;
40  import org.apache.hadoop.hbase.client.Table;
41  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.mapred.JobClient;
44  import org.apache.hadoop.mapred.JobConf;
45  import org.apache.hadoop.mapred.MapReduceBase;
46  import org.apache.hadoop.mapred.OutputCollector;
47  import org.apache.hadoop.mapred.Reporter;
48  import org.apache.hadoop.mapred.RunningJob;
49  import org.junit.AfterClass;
50  import org.junit.Assert;
51  import org.junit.Before;
52  import org.junit.BeforeClass;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  
56  import com.google.common.collect.ImmutableMap;
57  import com.google.common.collect.ImmutableSet;
58  
59  @Category(LargeTests.class)
60  public class TestTableMapReduceUtil {
61  
62    private static final Log LOG = LogFactory
63        .getLog(TestTableMapReduceUtil.class);
64  
65    private static Table presidentsTable;
66    private static final String TABLE_NAME = "People";
67  
68    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
69    private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
70  
71    private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
72        "president1", "president2", "president3");
73    private static Iterator<String> presidentNames = ImmutableSet.of(
74        "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
75  
76    private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
77        "actor2");
78    private static Iterator<String> actorNames = ImmutableSet.of(
79        "Jack Nicholson", "Martin Freeman").iterator();
80  
81    private static String PRESIDENT_PATTERN = "president";
82    private static String ACTOR_PATTERN = "actor";
83    private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
84        .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
85  
86    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
87  
88    @BeforeClass
89    public static void beforeClass() throws Exception {
90      UTIL.startMiniCluster();
91      presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME));
92      UTIL.startMiniMapReduceCluster();
93    }
94  
95    @AfterClass
96    public static void afterClass() throws Exception {
97      UTIL.shutdownMiniMapReduceCluster();
98      UTIL.shutdownMiniCluster();
99    }
100 
101   @Before
102   public void before() throws IOException {
103     LOG.info("before");
104     UTIL.ensureSomeRegionServersAvailable(1);
105     LOG.info("before done");
106   }
107 
108   public static Table createAndFillTable(TableName tableName) throws IOException {
109     Table table = UTIL.createTable(tableName, COLUMN_FAMILY);
110     createPutCommand(table);
111     return table;
112   }
113 
114   private static void createPutCommand(Table table) throws IOException {
115     for (String president : presidentsRowKeys) {
116       if (presidentNames.hasNext()) {
117         Put p = new Put(Bytes.toBytes(president));
118         p.add(COLUMN_FAMILY, COLUMN_QUALIFIER,
119             Bytes.toBytes(presidentNames.next()));
120         table.put(p);
121       }
122     }
123 
124     for (String actor : actorsRowKeys) {
125       if (actorNames.hasNext()) {
126         Put p = new Put(Bytes.toBytes(actor));
127         p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
128         table.put(p);
129       }
130     }
131   }
132 
133   /**
134    * Check what the given number of reduce tasks for the given job configuration
135    * does not exceed the number of regions for the given table.
136    */
137   @Test
138   public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
139       throws IOException {
140     Assert.assertNotNull(presidentsTable);
141     Configuration cfg = UTIL.getConfiguration();
142     JobConf jobConf = new JobConf(cfg);
143     TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
144     TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
145     TableMapReduceUtil.setScannerCaching(jobConf, 100);
146     assertEquals(1, jobConf.getNumReduceTasks());
147     assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
148 
149     jobConf.setNumReduceTasks(10);
150     TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
151     TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
152     assertEquals(1, jobConf.getNumReduceTasks());
153   }
154 
155   @Test
156   public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
157       throws IOException {
158     Configuration cfg = UTIL.getConfiguration();
159     JobConf jobConf = new JobConf(cfg);
160     TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
161     TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
162     assertEquals(1, jobConf.getNumMapTasks());
163 
164     jobConf.setNumMapTasks(10);
165     TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
166     TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
167     assertEquals(1, jobConf.getNumMapTasks());
168   }
169 
170   @Test
171   @SuppressWarnings("deprecation")
172   public void shoudBeValidMapReduceEvaluation() throws Exception {
173     Configuration cfg = UTIL.getConfiguration();
174     JobConf jobConf = new JobConf(cfg);
175     try {
176       jobConf.setJobName("process row task");
177       jobConf.setNumReduceTasks(1);
178       TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
179           ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
180           jobConf);
181       TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
182           ClassificatorRowReduce.class, jobConf);
183       RunningJob job = JobClient.runJob(jobConf);
184       assertTrue(job.isSuccessful());
185     } finally {
186       if (jobConf != null)
187         FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
188     }
189   }
190 
191   @Test
192   @SuppressWarnings("deprecation")
193   public void shoudBeValidMapReduceWithPartitionerEvaluation()
194       throws IOException {
195     Configuration cfg = UTIL.getConfiguration();
196     JobConf jobConf = new JobConf(cfg);
197     try {
198       jobConf.setJobName("process row task");
199       jobConf.setNumReduceTasks(2);
200       TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
201           ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
202           jobConf);
203 
204       TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
205           ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
206       RunningJob job = JobClient.runJob(jobConf);
207       assertTrue(job.isSuccessful());
208     } finally {
209       if (jobConf != null)
210         FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
211     }
212   }
213 
214   @SuppressWarnings("deprecation")
215   static class ClassificatorRowReduce extends MapReduceBase implements
216       TableReduce<ImmutableBytesWritable, Put> {
217 
218     @Override
219     public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
220         OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
221         throws IOException {
222       String strKey = Bytes.toString(key.get());
223       List<Put> result = new ArrayList<Put>();
224       while (values.hasNext())
225         result.add(values.next());
226 
227       if (relation.keySet().contains(strKey)) {
228         Set<String> set = relation.get(strKey);
229         if (set != null) {
230           assertEquals(set.size(), result.size());
231         } else {
232           throwAccertionError("Test infrastructure error: set is null");
233         }
234       } else {
235         throwAccertionError("Test infrastructure error: key not found in map");
236       }
237     }
238 
239     private void throwAccertionError(String errorMessage) throws AssertionError {
240       throw new AssertionError(errorMessage);
241     }
242   }
243 
244   @SuppressWarnings("deprecation")
245   static class ClassificatorMapper extends MapReduceBase implements
246       TableMap<ImmutableBytesWritable, Put> {
247 
248     @Override
249     public void map(ImmutableBytesWritable row, Result result,
250         OutputCollector<ImmutableBytesWritable, Put> outCollector,
251         Reporter reporter) throws IOException {
252       String rowKey = Bytes.toString(result.getRow());
253       final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
254           Bytes.toBytes(PRESIDENT_PATTERN));
255       final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
256           Bytes.toBytes(ACTOR_PATTERN));
257       ImmutableBytesWritable outKey = null;
258 
259       if (rowKey.startsWith(PRESIDENT_PATTERN)) {
260         outKey = pKey;
261       } else if (rowKey.startsWith(ACTOR_PATTERN)) {
262         outKey = aKey;
263       } else {
264         throw new AssertionError("unexpected rowKey");
265       }
266 
267       String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
268           COLUMN_QUALIFIER));
269       outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add(
270           COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
271     }
272   }
273 }