View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  
25  import java.io.File;
26  import java.io.IOException;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.fs.FileUtil;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.testclassification.LargeTests;
35  import org.apache.hadoop.hbase.client.HTable;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Result;
38  import org.apache.hadoop.hbase.client.Scan;
39  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.mapreduce.Job;
42  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
43  import org.junit.experimental.categories.Category;
44  
45  /**
46   * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
47   * on our tables is simple - take every row in the table, reverse the value of
48   * a particular cell, and write it back to the table.
49   */
50  @Category(LargeTests.class)
51  public class TestTableMapReduce extends TestTableMapReduceBase {
52    private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
53  
54    protected Log getLog() { return LOG; }
55  
56    /**
57     * Pass the given key and processed record reduce
58     */
59    static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
60  
61      /**
62       * Pass the key, and reversed value to reduce
63       *
64       * @param key
65       * @param value
66       * @param context
67       * @throws IOException
68       */
69      public void map(ImmutableBytesWritable key, Result value,
70        Context context)
71      throws IOException, InterruptedException {
72        if (value.size() != 1) {
73          throw new IOException("There should only be one input column");
74        }
75        Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
76          cf = value.getMap();
77        if(!cf.containsKey(INPUT_FAMILY)) {
78          throw new IOException("Wrong input columns. Missing: '" +
79            Bytes.toString(INPUT_FAMILY) + "'.");
80        }
81  
82        // Get the original value and reverse it
83        String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
84        StringBuilder newValue = new StringBuilder(originalValue);
85        newValue.reverse();
86        // Now set the value to be collected
87        Put outval = new Put(key.get());
88        outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
89        context.write(key, outval);
90      }
91    }
92  
93    protected void runTestOnTable(HTable table) throws IOException {
94      Job job = null;
95      try {
96        LOG.info("Before map/reduce startup");
97        job = new Job(table.getConfiguration(), "process column contents");
98        job.setNumReduceTasks(1);
99        Scan scan = new Scan();
100       scan.addFamily(INPUT_FAMILY);
101       TableMapReduceUtil.initTableMapperJob(
102         Bytes.toString(table.getTableName()), scan,
103         ProcessContentsMapper.class, ImmutableBytesWritable.class,
104         Put.class, job);
105       TableMapReduceUtil.initTableReducerJob(
106         Bytes.toString(table.getTableName()),
107         IdentityTableReducer.class, job);
108       FileOutputFormat.setOutputPath(job, new Path("test"));
109       LOG.info("Started " + Bytes.toString(table.getTableName()));
110       assertTrue(job.waitForCompletion(true));
111       LOG.info("After map/reduce completion");
112 
113       // verify map-reduce results
114       verify(table.getName());
115     } catch (InterruptedException e) {
116       throw new IOException(e);
117     } catch (ClassNotFoundException e) {
118       throw new IOException(e);
119     } finally {
120       table.close();
121       if (job != null) {
122         FileUtil.fullyDelete(
123           new File(job.getConfiguration().get("hadoop.tmp.dir")));
124       }
125     }
126   }
127 }