View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.apache.commons.cli.CommandLine;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FSDataInputStream;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.client.HTable;
31  import org.apache.hadoop.hbase.client.Connection;
32  import org.apache.hadoop.hbase.client.ConnectionFactory;
33  import org.apache.hadoop.hbase.client.Result;
34  import org.apache.hadoop.hbase.client.ResultScanner;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.client.Table;
37  import org.apache.hadoop.hbase.client.TableSnapshotScanner;
38  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
39  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
41  import org.apache.hadoop.hbase.mapreduce.TableMapper;
42  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
43  import org.apache.hadoop.hbase.util.FSUtils;
44  import org.apache.hadoop.io.NullWritable;
45  import org.apache.hadoop.mapreduce.Counters;
46  import org.apache.hadoop.mapreduce.Job;
47  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
48  import org.apache.hadoop.util.StringUtils;
49  import org.apache.hadoop.util.ToolRunner;
50  
51  import com.google.common.base.Stopwatch;
52  
53  /**
54   * A simple performance evaluation tool for single client and MR scans
55   * and snapshot scans.
56   */
57  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
58  public class ScanPerformanceEvaluation extends AbstractHBaseTool {
59  
60    private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
61  
62    private String type;
63    private String file;
64    private String tablename;
65    private String snapshotName;
66    private String restoreDir;
67    private String caching;
68  
69    @Override
70    public void setConf(Configuration conf) {
71      super.setConf(conf);
72      Path rootDir;
73      try {
74        rootDir = FSUtils.getRootDir(conf);
75        rootDir.getFileSystem(conf);
76      } catch (IOException ex) {
77        throw new RuntimeException(ex);
78      }
79    }
80  
81    @Override
82    protected void addOptions() {
83      this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
84      this.addOptWithArg("f", "file", "the filename to read from");
85      this.addOptWithArg("tn", "table", "the tablename to read from");
86      this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
87      this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
88      this.addOptWithArg("ch", "caching", "scanner caching value");
89    }
90  
91    @Override
92    protected void processOptions(CommandLine cmd) {
93      type = cmd.getOptionValue("type");
94      file = cmd.getOptionValue("file");
95      tablename = cmd.getOptionValue("table");
96      snapshotName = cmd.getOptionValue("snapshot");
97      restoreDir = cmd.getOptionValue("restoredir");
98      caching = cmd.getOptionValue("caching");
99    }
100 
101   protected void testHdfsStreaming(Path filename) throws IOException {
102     byte[] buf = new byte[1024];
103     FileSystem fs = filename.getFileSystem(getConf());
104 
105     // read the file from start to finish
106     Stopwatch fileOpenTimer = new Stopwatch();
107     Stopwatch streamTimer = new Stopwatch();
108 
109     fileOpenTimer.start();
110     FSDataInputStream in = fs.open(filename);
111     fileOpenTimer.stop();
112 
113     long totalBytes = 0;
114     streamTimer.start();
115     while (true) {
116       int read = in.read(buf);
117       if (read < 0) {
118         break;
119       }
120       totalBytes += read;
121     }
122     streamTimer.stop();
123 
124     double throughput = (double)totalBytes / streamTimer.elapsedTime(TimeUnit.SECONDS);
125 
126     System.out.println("HDFS streaming: ");
127     System.out.println("total time to open: " + fileOpenTimer.elapsedMillis() + " ms");
128     System.out.println("total time to read: " + streamTimer.elapsedMillis() + " ms");
129     System.out.println("total bytes: " + totalBytes + " bytes ("
130         + StringUtils.humanReadableInt(totalBytes) + ")");
131     System.out.println("throghput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
132   }
133 
134   private Scan getScan() {
135     Scan scan = new Scan(); // default scan settings
136     scan.setCacheBlocks(false);
137     scan.setMaxVersions(1);
138     scan.setScanMetricsEnabled(true);
139     if (caching != null) {
140       scan.setCaching(Integer.parseInt(caching));
141     }
142 
143     return scan;
144   }
145 
146   public void testScan() throws IOException {
147     Stopwatch tableOpenTimer = new Stopwatch();
148     Stopwatch scanOpenTimer = new Stopwatch();
149     Stopwatch scanTimer = new Stopwatch();
150 
151     tableOpenTimer.start();
152     Table table = new HTable(getConf(), TableName.valueOf(tablename));
153     tableOpenTimer.stop();
154 
155     Scan scan = getScan();
156     scanOpenTimer.start();
157     ResultScanner scanner = table.getScanner(scan);
158     scanOpenTimer.stop();
159 
160     long numRows = 0;
161     long numCells = 0;
162     scanTimer.start();
163     while (true) {
164       Result result = scanner.next();
165       if (result == null) {
166         break;
167       }
168       numRows++;
169 
170       numCells += result.rawCells().length;
171     }
172     scanTimer.stop();
173     scanner.close();
174     table.close();
175 
176     ScanMetrics metrics = scan.getScanMetrics();
177     long totalBytes = metrics.countOfBytesInResults.get();
178     double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
179     double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
180     double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
181 
182     System.out.println("HBase scan: ");
183     System.out.println("total time to open table: " + tableOpenTimer.elapsedMillis() + " ms");
184     System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
185     System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
186 
187     System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
188 
189     System.out.println("total bytes: " + totalBytes + " bytes ("
190         + StringUtils.humanReadableInt(totalBytes) + ")");
191     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
192     System.out.println("total rows  : " + numRows);
193     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
194     System.out.println("total cells : " + numCells);
195     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
196   }
197 
198 
199   public void testSnapshotScan() throws IOException {
200     Stopwatch snapshotRestoreTimer = new Stopwatch();
201     Stopwatch scanOpenTimer = new Stopwatch();
202     Stopwatch scanTimer = new Stopwatch();
203 
204     Path restoreDir = new Path(this.restoreDir);
205 
206     snapshotRestoreTimer.start();
207     restoreDir.getFileSystem(conf).delete(restoreDir, true);
208     snapshotRestoreTimer.stop();
209 
210     Scan scan = getScan();
211     scanOpenTimer.start();
212     TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
213     scanOpenTimer.stop();
214 
215     long numRows = 0;
216     long numCells = 0;
217     scanTimer.start();
218     while (true) {
219       Result result = scanner.next();
220       if (result == null) {
221         break;
222       }
223       numRows++;
224 
225       numCells += result.rawCells().length;
226     }
227     scanTimer.stop();
228     scanner.close();
229 
230     ScanMetrics metrics = scanner.getScanMetrics();
231     long totalBytes = metrics.countOfBytesInResults.get();
232     double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
233     double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
234     double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
235 
236     System.out.println("HBase scan snapshot: ");
237     System.out.println("total time to restore snapshot: " + snapshotRestoreTimer.elapsedMillis() + " ms");
238     System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
239     System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
240 
241     System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
242 
243     System.out.println("total bytes: " + totalBytes + " bytes ("
244         + StringUtils.humanReadableInt(totalBytes) + ")");
245     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
246     System.out.println("total rows  : " + numRows);
247     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
248     System.out.println("total cells : " + numCells);
249     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
250 
251   }
252 
253   public static enum ScanCounter {
254     NUM_ROWS,
255     NUM_CELLS,
256   }
257 
258   public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
259     @Override
260     protected void map(ImmutableBytesWritable key, Result value,
261         Context context) throws IOException,
262         InterruptedException {
263       context.getCounter(ScanCounter.NUM_ROWS).increment(1);
264       context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
265     }
266   }
267 
268   public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
269     Stopwatch scanOpenTimer = new Stopwatch();
270     Stopwatch scanTimer = new Stopwatch();
271 
272     Scan scan = getScan();
273 
274     String jobName = "testScanMapReduce";
275 
276     Job job = new Job(conf);
277     job.setJobName(jobName);
278 
279     job.setJarByClass(getClass());
280 
281     TableMapReduceUtil.initTableMapperJob(
282         this.tablename,
283         scan,
284         MyMapper.class,
285         NullWritable.class,
286         NullWritable.class,
287         job
288     );
289 
290     job.setNumReduceTasks(0);
291     job.setOutputKeyClass(NullWritable.class);
292     job.setOutputValueClass(NullWritable.class);
293     job.setOutputFormatClass(NullOutputFormat.class);
294 
295     scanTimer.start();
296     job.waitForCompletion(true);
297     scanTimer.stop();
298 
299     Counters counters = job.getCounters();
300     long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
301     long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
302 
303     long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
304     double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
305     double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
306     double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
307 
308     System.out.println("HBase scan mapreduce: ");
309     System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
310     System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
311 
312     System.out.println("total bytes: " + totalBytes + " bytes ("
313         + StringUtils.humanReadableInt(totalBytes) + ")");
314     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
315     System.out.println("total rows  : " + numRows);
316     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
317     System.out.println("total cells : " + numCells);
318     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
319   }
320 
321   public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
322     Stopwatch scanOpenTimer = new Stopwatch();
323     Stopwatch scanTimer = new Stopwatch();
324 
325     Scan scan = getScan();
326 
327     String jobName = "testSnapshotScanMapReduce";
328 
329     Job job = new Job(conf);
330     job.setJobName(jobName);
331 
332     job.setJarByClass(getClass());
333 
334     TableMapReduceUtil.initTableSnapshotMapperJob(
335         this.snapshotName,
336         scan,
337         MyMapper.class,
338         NullWritable.class,
339         NullWritable.class,
340         job,
341         true,
342         new Path(restoreDir)
343     );
344 
345     job.setNumReduceTasks(0);
346     job.setOutputKeyClass(NullWritable.class);
347     job.setOutputValueClass(NullWritable.class);
348     job.setOutputFormatClass(NullOutputFormat.class);
349 
350     scanTimer.start();
351     job.waitForCompletion(true);
352     scanTimer.stop();
353 
354     Counters counters = job.getCounters();
355     long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
356     long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
357 
358     long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
359     double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
360     double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
361     double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
362 
363     System.out.println("HBase scan mapreduce: ");
364     System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
365     System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
366 
367     System.out.println("total bytes: " + totalBytes + " bytes ("
368         + StringUtils.humanReadableInt(totalBytes) + ")");
369     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
370     System.out.println("total rows  : " + numRows);
371     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
372     System.out.println("total cells : " + numCells);
373     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
374   }
375 
376   @Override
377   protected int doWork() throws Exception {
378     if (type.equals("streaming")) {
379       testHdfsStreaming(new Path(file));
380     } else if (type.equals("scan")){
381       testScan();
382     } else if (type.equals("snapshotscan")) {
383       testSnapshotScan();
384     } else if (type.equals("scanmapreduce")) {
385       testScanMapReduce();
386     } else if (type.equals("snapshotscanmapreduce")) {
387       testSnapshotScanMapReduce();
388     }
389     return 0;
390   }
391 
392   public static void main (String[] args) throws Exception {
393     int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
394     System.exit(ret);
395   }
396 }