1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY;
22
23 import java.io.IOException;
24 import java.io.PrintStream;
25 import java.lang.reflect.Constructor;
26 import java.math.BigDecimal;
27 import java.math.MathContext;
28 import java.text.DecimalFormat;
29 import java.text.SimpleDateFormat;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Date;
33 import java.util.LinkedList;
34 import java.util.Map;
35 import java.util.Queue;
36 import java.util.Random;
37 import java.util.TreeMap;
38 import java.util.concurrent.Callable;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43
44 import com.google.common.base.Objects;
45 import com.google.common.util.concurrent.ThreadFactoryBuilder;
46
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.conf.Configured;
51 import org.apache.hadoop.fs.FileSystem;
52 import org.apache.hadoop.fs.Path;
53 import org.apache.hadoop.hbase.classification.InterfaceAudience;
54 import org.apache.hadoop.hbase.client.Admin;
55 import org.apache.hadoop.hbase.client.Append;
56 import org.apache.hadoop.hbase.client.BufferedMutator;
57 import org.apache.hadoop.hbase.client.Connection;
58 import org.apache.hadoop.hbase.client.ConnectionFactory;
59 import org.apache.hadoop.hbase.client.Consistency;
60 import org.apache.hadoop.hbase.client.Delete;
61 import org.apache.hadoop.hbase.client.Durability;
62 import org.apache.hadoop.hbase.client.Get;
63 import org.apache.hadoop.hbase.client.Increment;
64 import org.apache.hadoop.hbase.client.Put;
65 import org.apache.hadoop.hbase.client.Result;
66 import org.apache.hadoop.hbase.client.ResultScanner;
67 import org.apache.hadoop.hbase.client.RowMutations;
68 import org.apache.hadoop.hbase.client.Scan;
69 import org.apache.hadoop.hbase.client.Table;
70 import org.apache.hadoop.hbase.filter.BinaryComparator;
71 import org.apache.hadoop.hbase.filter.CompareFilter;
72 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
73 import org.apache.hadoop.hbase.filter.Filter;
74 import org.apache.hadoop.hbase.filter.FilterAllFilter;
75 import org.apache.hadoop.hbase.filter.FilterList;
76 import org.apache.hadoop.hbase.filter.PageFilter;
77 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
78 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
79 import org.apache.hadoop.hbase.io.compress.Compression;
80 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
81 import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
82 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
83 import org.apache.hadoop.hbase.regionserver.BloomType;
84 import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
85 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
86 import org.apache.hadoop.hbase.util.*;
87 import org.apache.hadoop.io.LongWritable;
88 import org.apache.hadoop.io.Text;
89 import org.apache.hadoop.mapreduce.Job;
90 import org.apache.hadoop.mapreduce.Mapper;
91 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
92 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
93 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
94 import org.apache.hadoop.util.Tool;
95 import org.apache.hadoop.util.ToolRunner;
96 import org.codehaus.jackson.map.ObjectMapper;
97
98 import com.yammer.metrics.core.Histogram;
99 import com.yammer.metrics.stats.UniformSample;
100 import com.yammer.metrics.stats.Snapshot;
101
102 import org.apache.htrace.Sampler;
103 import org.apache.htrace.Trace;
104 import org.apache.htrace.TraceScope;
105 import org.apache.htrace.impl.ProbabilitySampler;
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
125 public class PerformanceEvaluation extends Configured implements Tool {
126 private static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
127 private static final ObjectMapper MAPPER = new ObjectMapper();
128 static {
129 MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
130 }
131
132 public static final String TABLE_NAME = "TestTable";
133 public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
134 public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0);
135 public static final byte [] QUALIFIER_NAME = COLUMN_ZERO;
136 public static final int DEFAULT_VALUE_LENGTH = 1000;
137 public static final int ROW_LENGTH = 26;
138
139 private static final int ONE_GB = 1024 * 1024 * 1000;
140 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
141
142 private static final int TAG_LENGTH = 256;
143 private static final DecimalFormat FMT = new DecimalFormat("0.##");
144 private static final MathContext CXT = MathContext.DECIMAL64;
145 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
146 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
147 private static final TestOptions DEFAULT_OPTS = new TestOptions();
148
149 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<String, CmdDescriptor>();
150 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
151
152 static {
153 addCommandDescriptor(RandomReadTest.class, "randomRead",
154 "Run random read test");
155 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
156 "Run random seek and scan 100 test");
157 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
158 "Run random seek scan with both start and stop row (max 10 rows)");
159 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
160 "Run random seek scan with both start and stop row (max 100 rows)");
161 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
162 "Run random seek scan with both start and stop row (max 1000 rows)");
163 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
164 "Run random seek scan with both start and stop row (max 10000 rows)");
165 addCommandDescriptor(RandomWriteTest.class, "randomWrite",
166 "Run random write test");
167 addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
168 "Run sequential read test");
169 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
170 "Run sequential write test");
171 addCommandDescriptor(ScanTest.class, "scan",
172 "Run scan test (read every row)");
173 addCommandDescriptor(FilteredScanTest.class, "filterScan",
174 "Run scan test using a filter to find a specific row based on it's value " +
175 "(make sure to use --rows=20)");
176 addCommandDescriptor(IncrementTest.class, "increment",
177 "Increment on each row; clients overlap on keyspace so some concurrent operations");
178 addCommandDescriptor(AppendTest.class, "append",
179 "Append on each row; clients overlap on keyspace so some concurrent operations");
180 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
181 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
182 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
183 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
184 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
185 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
186 }
187
188
189
190
191
192 protected static enum Counter {
193
194 ELAPSED_TIME,
195
196 ROWS
197 }
198
199 protected static class RunResult implements Comparable<RunResult> {
200 public RunResult(long duration, Histogram hist) {
201 this.duration = duration;
202 this.hist = hist;
203 }
204
205 public final long duration;
206 public final Histogram hist;
207
208 @Override
209 public String toString() {
210 return Long.toString(duration);
211 }
212
213 @Override public int compareTo(RunResult o) {
214 return Long.compare(this.duration, o.duration);
215 }
216 }
217
218
219
220
221
222 public PerformanceEvaluation(final Configuration conf) {
223 super(conf);
224 }
225
226 protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
227 String name, String description) {
228 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
229 COMMANDS.put(name, cmdDescriptor);
230 }
231
232
233
234
235 interface Status {
236
237
238
239
240
241 void setStatus(final String msg) throws IOException;
242 }
243
244
245
246
247 public static class EvaluationMapTask
248 extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
249
250
251 public final static String CMD_KEY = "EvaluationMapTask.command";
252
253 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
254
255 private Class<? extends Test> cmd;
256
257 @Override
258 protected void setup(Context context) throws IOException, InterruptedException {
259 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
260
261
262
263 Class<? extends PerformanceEvaluation> peClass =
264 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
265 try {
266 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
267 } catch (Exception e) {
268 throw new IllegalStateException("Could not instantiate PE instance", e);
269 }
270 }
271
272 private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
273 try {
274 return Class.forName(className).asSubclass(type);
275 } catch (ClassNotFoundException e) {
276 throw new IllegalStateException("Could not find class for name: " + className, e);
277 }
278 }
279
280 @Override
281 protected void map(LongWritable key, Text value, final Context context)
282 throws IOException, InterruptedException {
283
284 Status status = new Status() {
285 @Override
286 public void setStatus(String msg) {
287 context.setStatus(msg);
288 }
289 };
290
291 ObjectMapper mapper = new ObjectMapper();
292 TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
293 Configuration conf = HBaseConfiguration.create(context.getConfiguration());
294 final Connection con = ConnectionFactory.createConnection(conf);
295
296
297 RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, opts, status);
298
299
300 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
301 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
302 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
303 context.progress();
304 }
305 }
306
307
308
309
310
311
312 static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
313 TableName tableName = TableName.valueOf(opts.tableName);
314 boolean needsDelete = false, exists = admin.tableExists(tableName);
315 boolean isReadCmd = opts.cmdName.toLowerCase().contains("read")
316 || opts.cmdName.toLowerCase().contains("scan");
317 if (!exists && isReadCmd) {
318 throw new IllegalStateException(
319 "Must specify an existing table for read commands. Run a write command first.");
320 }
321 HTableDescriptor desc =
322 exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
323 byte[][] splits = getSplits(opts);
324
325
326
327 if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
328 || (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy)
329 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)) {
330 needsDelete = true;
331
332 LOG.debug(Objects.toStringHelper("needsDelete")
333 .add("needsDelete", needsDelete)
334 .add("isReadCmd", isReadCmd)
335 .add("exists", exists)
336 .add("desc", desc)
337 .add("presplit", opts.presplitRegions)
338 .add("splitPolicy", opts.splitPolicy)
339 .add("replicas", opts.replicas));
340 }
341
342
343 if (needsDelete) {
344 if (admin.isTableEnabled(tableName)) {
345 admin.disableTable(tableName);
346 }
347 admin.deleteTable(tableName);
348 }
349
350
351 if (!exists || needsDelete) {
352 desc = getTableDescriptor(opts);
353 if (splits != null) {
354 if (LOG.isDebugEnabled()) {
355 for (int i = 0; i < splits.length; i++) {
356 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
357 }
358 }
359 }
360 admin.createTable(desc, splits);
361 LOG.info("Table " + desc + " created");
362 }
363 return admin.tableExists(tableName);
364 }
365
366
367
368
369 protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
370 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(opts.tableName));
371 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
372 family.setDataBlockEncoding(opts.blockEncoding);
373 family.setCompressionType(opts.compression);
374 family.setBloomFilterType(opts.bloomType);
375 if (opts.inMemoryCF) {
376 family.setInMemory(true);
377 }
378 desc.addFamily(family);
379 if (opts.replicas != DEFAULT_OPTS.replicas) {
380 desc.setRegionReplication(opts.replicas);
381 }
382 if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) {
383 desc.setRegionSplitPolicyClassName(opts.splitPolicy);
384 }
385 return desc;
386 }
387
388
389
390
391 protected static byte[][] getSplits(TestOptions opts) {
392 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
393 return null;
394
395 int numSplitPoints = opts.presplitRegions - 1;
396 byte[][] splits = new byte[numSplitPoints][];
397 int jump = opts.totalRows / opts.presplitRegions;
398 for (int i = 0; i < numSplitPoints; i++) {
399 int rowkey = jump * (1 + i);
400 splits[i] = format(rowkey);
401 }
402 return splits;
403 }
404
405
406
407
408 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
409 throws IOException, InterruptedException {
410 final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
411 assert cmd != null;
412 @SuppressWarnings("unchecked")
413 Future<RunResult>[] threads = new Future[opts.numClientThreads];
414 RunResult[] results = new RunResult[opts.numClientThreads];
415 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
416 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
417 final Connection con = ConnectionFactory.createConnection(conf);
418 for (int i = 0; i < threads.length; i++) {
419 final int index = i;
420 threads[i] = pool.submit(new Callable<RunResult>() {
421 @Override
422 public RunResult call() throws Exception {
423 TestOptions threadOpts = new TestOptions(opts);
424 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
425 RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() {
426 @Override
427 public void setStatus(final String msg) throws IOException {
428 LOG.info(msg);
429 }
430 });
431 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
432 "ms over " + threadOpts.perClientRunRows + " rows");
433 return run;
434 }
435 });
436 }
437 pool.shutdown();
438
439 for (int i = 0; i < threads.length; i++) {
440 try {
441 results[i] = threads[i].get();
442 } catch (ExecutionException e) {
443 throw new IOException(e.getCause());
444 }
445 }
446 final String test = cmd.getSimpleName();
447 LOG.info("[" + test + "] Summary of timings (ms): "
448 + Arrays.toString(results));
449 Arrays.sort(results);
450 long total = 0;
451 for (RunResult result : results) {
452 total += result.duration;
453 }
454 LOG.info("[" + test + "]"
455 + "\tMin: " + results[0] + "ms"
456 + "\tMax: " + results[results.length - 1] + "ms"
457 + "\tAvg: " + (total / results.length) + "ms");
458
459 con.close();
460
461 return results;
462 }
463
464
465
466
467
468
469
470
471 static Job doMapReduce(TestOptions opts, final Configuration conf)
472 throws IOException, InterruptedException, ClassNotFoundException {
473 final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
474 assert cmd != null;
475 Path inputDir = writeInputFile(conf, opts);
476 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
477 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
478 Job job = Job.getInstance(conf);
479 job.setJarByClass(PerformanceEvaluation.class);
480 job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
481
482 job.setInputFormatClass(NLineInputFormat.class);
483 NLineInputFormat.setInputPaths(job, inputDir);
484
485 NLineInputFormat.setNumLinesPerSplit(job, 1);
486
487 job.setOutputKeyClass(LongWritable.class);
488 job.setOutputValueClass(LongWritable.class);
489
490 job.setMapperClass(EvaluationMapTask.class);
491 job.setReducerClass(LongSumReducer.class);
492
493 job.setNumReduceTasks(1);
494
495 job.setOutputFormatClass(TextOutputFormat.class);
496 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
497
498 TableMapReduceUtil.addDependencyJars(job);
499 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
500 Histogram.class,
501 ObjectMapper.class);
502
503 TableMapReduceUtil.initCredentials(job);
504
505 job.waitForCompletion(true);
506 return job;
507 }
508
509
510
511
512
513
514
515 private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
516 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
517 Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
518 Path inputDir = new Path(jobdir, "inputs");
519
520 FileSystem fs = FileSystem.get(c);
521 fs.mkdirs(inputDir);
522
523 Path inputFile = new Path(inputDir, "input.txt");
524 PrintStream out = new PrintStream(fs.create(inputFile));
525
526 Map<Integer, String> m = new TreeMap<Integer, String>();
527 Hash h = MurmurHash.getInstance();
528 int perClientRows = (opts.totalRows / opts.numClientThreads);
529 try {
530 for (int i = 0; i < 10; i++) {
531 for (int j = 0; j < opts.numClientThreads; j++) {
532 TestOptions next = new TestOptions(opts);
533 next.startRow = (j * perClientRows) + (i * (perClientRows/10));
534 next.perClientRunRows = perClientRows / 10;
535 String s = MAPPER.writeValueAsString(next);
536 LOG.info("maptask input=" + s);
537 int hash = h.hash(Bytes.toBytes(s));
538 m.put(hash, s);
539 }
540 }
541 for (Map.Entry<Integer, String> e: m.entrySet()) {
542 out.println(e.getValue());
543 }
544 } finally {
545 out.close();
546 }
547 return inputDir;
548 }
549
550
551
552
553 static class CmdDescriptor {
554 private Class<? extends Test> cmdClass;
555 private String name;
556 private String description;
557
558 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
559 this.cmdClass = cmdClass;
560 this.name = name;
561 this.description = description;
562 }
563
564 public Class<? extends Test> getCmdClass() {
565 return cmdClass;
566 }
567
568 public String getName() {
569 return name;
570 }
571
572 public String getDescription() {
573 return description;
574 }
575 }
576
577
578
579
580
581
582
583
584 static class TestOptions {
585 String cmdName = null;
586 boolean nomapred = false;
587 boolean filterAll = false;
588 int startRow = 0;
589 float size = 1.0f;
590 int perClientRunRows = DEFAULT_ROWS_PER_GB;
591 int numClientThreads = 1;
592 int totalRows = DEFAULT_ROWS_PER_GB;
593 float sampleRate = 1.0f;
594 double traceRate = 0.0;
595 String tableName = TABLE_NAME;
596 boolean flushCommits = true;
597 boolean writeToWAL = true;
598 boolean autoFlush = false;
599 boolean oneCon = false;
600 boolean useTags = false;
601 int noOfTags = 1;
602 boolean reportLatency = false;
603 int multiGet = 0;
604 int randomSleep = 0;
605 boolean inMemoryCF = false;
606 int presplitRegions = 0;
607 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
608 String splitPolicy = null;
609 Compression.Algorithm compression = Compression.Algorithm.NONE;
610 BloomType bloomType = BloomType.ROW;
611 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
612 boolean valueRandom = false;
613 boolean valueZipf = false;
614 int valueSize = DEFAULT_VALUE_LENGTH;
615 int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
616 int columns = 1;
617 int caching = 30;
618 boolean addColumns = true;
619
620 public TestOptions() {}
621
622
623
624
625
626 public TestOptions(TestOptions that) {
627 this.cmdName = that.cmdName;
628 this.nomapred = that.nomapred;
629 this.startRow = that.startRow;
630 this.size = that.size;
631 this.perClientRunRows = that.perClientRunRows;
632 this.numClientThreads = that.numClientThreads;
633 this.totalRows = that.totalRows;
634 this.sampleRate = that.sampleRate;
635 this.traceRate = that.traceRate;
636 this.tableName = that.tableName;
637 this.flushCommits = that.flushCommits;
638 this.writeToWAL = that.writeToWAL;
639 this.autoFlush = that.autoFlush;
640 this.oneCon = that.oneCon;
641 this.useTags = that.useTags;
642 this.noOfTags = that.noOfTags;
643 this.reportLatency = that.reportLatency;
644 this.multiGet = that.multiGet;
645 this.inMemoryCF = that.inMemoryCF;
646 this.presplitRegions = that.presplitRegions;
647 this.replicas = that.replicas;
648 this.splitPolicy = that.splitPolicy;
649 this.compression = that.compression;
650 this.blockEncoding = that.blockEncoding;
651 this.filterAll = that.filterAll;
652 this.bloomType = that.bloomType;
653 this.valueRandom = that.valueRandom;
654 this.valueZipf = that.valueZipf;
655 this.valueSize = that.valueSize;
656 this.period = that.period;
657 this.randomSleep = that.randomSleep;
658 this.addColumns = that.addColumns;
659 this.columns = that.columns;
660 this.caching = that.caching;
661 }
662
663 public int getCaching() {
664 return this.caching;
665 }
666
667 public void setCaching(final int caching) {
668 this.caching = caching;
669 }
670
671 public int getColumns() {
672 return this.columns;
673 }
674
675 public void setColumns(final int columns) {
676 this.columns = columns;
677 }
678
679 public boolean isValueZipf() {
680 return valueZipf;
681 }
682
683 public void setValueZipf(boolean valueZipf) {
684 this.valueZipf = valueZipf;
685 }
686
687 public String getCmdName() {
688 return cmdName;
689 }
690
691 public void setCmdName(String cmdName) {
692 this.cmdName = cmdName;
693 }
694
695 public int getRandomSleep() {
696 return randomSleep;
697 }
698
699 public void setRandomSleep(int randomSleep) {
700 this.randomSleep = randomSleep;
701 }
702
703 public int getReplicas() {
704 return replicas;
705 }
706
707 public void setReplicas(int replicas) {
708 this.replicas = replicas;
709 }
710
711 public String getSplitPolicy() {
712 return splitPolicy;
713 }
714
715 public void setSplitPolicy(String splitPolicy) {
716 this.splitPolicy = splitPolicy;
717 }
718
719 public void setNomapred(boolean nomapred) {
720 this.nomapred = nomapred;
721 }
722
723 public void setFilterAll(boolean filterAll) {
724 this.filterAll = filterAll;
725 }
726
727 public void setStartRow(int startRow) {
728 this.startRow = startRow;
729 }
730
731 public void setSize(float size) {
732 this.size = size;
733 }
734
735 public void setPerClientRunRows(int perClientRunRows) {
736 this.perClientRunRows = perClientRunRows;
737 }
738
739 public void setNumClientThreads(int numClientThreads) {
740 this.numClientThreads = numClientThreads;
741 }
742
743 public void setTotalRows(int totalRows) {
744 this.totalRows = totalRows;
745 }
746
747 public void setSampleRate(float sampleRate) {
748 this.sampleRate = sampleRate;
749 }
750
751 public void setTraceRate(double traceRate) {
752 this.traceRate = traceRate;
753 }
754
755 public void setTableName(String tableName) {
756 this.tableName = tableName;
757 }
758
759 public void setFlushCommits(boolean flushCommits) {
760 this.flushCommits = flushCommits;
761 }
762
763 public void setWriteToWAL(boolean writeToWAL) {
764 this.writeToWAL = writeToWAL;
765 }
766
767 public void setAutoFlush(boolean autoFlush) {
768 this.autoFlush = autoFlush;
769 }
770
771 public void setOneCon(boolean oneCon) {
772 this.oneCon = oneCon;
773 }
774
775 public void setUseTags(boolean useTags) {
776 this.useTags = useTags;
777 }
778
779 public void setNoOfTags(int noOfTags) {
780 this.noOfTags = noOfTags;
781 }
782
783 public void setReportLatency(boolean reportLatency) {
784 this.reportLatency = reportLatency;
785 }
786
787 public void setMultiGet(int multiGet) {
788 this.multiGet = multiGet;
789 }
790
791 public void setInMemoryCF(boolean inMemoryCF) {
792 this.inMemoryCF = inMemoryCF;
793 }
794
795 public void setPresplitRegions(int presplitRegions) {
796 this.presplitRegions = presplitRegions;
797 }
798
799 public void setCompression(Compression.Algorithm compression) {
800 this.compression = compression;
801 }
802
803 public void setBloomType(BloomType bloomType) {
804 this.bloomType = bloomType;
805 }
806
807 public void setBlockEncoding(DataBlockEncoding blockEncoding) {
808 this.blockEncoding = blockEncoding;
809 }
810
811 public void setValueRandom(boolean valueRandom) {
812 this.valueRandom = valueRandom;
813 }
814
815 public void setValueSize(int valueSize) {
816 this.valueSize = valueSize;
817 }
818
819 public void setPeriod(int period) {
820 this.period = period;
821 }
822
823 public boolean isNomapred() {
824 return nomapred;
825 }
826
827 public boolean isFilterAll() {
828 return filterAll;
829 }
830
831 public int getStartRow() {
832 return startRow;
833 }
834
835 public float getSize() {
836 return size;
837 }
838
839 public int getPerClientRunRows() {
840 return perClientRunRows;
841 }
842
843 public int getNumClientThreads() {
844 return numClientThreads;
845 }
846
847 public int getTotalRows() {
848 return totalRows;
849 }
850
851 public float getSampleRate() {
852 return sampleRate;
853 }
854
855 public double getTraceRate() {
856 return traceRate;
857 }
858
859 public String getTableName() {
860 return tableName;
861 }
862
863 public boolean isFlushCommits() {
864 return flushCommits;
865 }
866
867 public boolean isWriteToWAL() {
868 return writeToWAL;
869 }
870
871 public boolean isAutoFlush() {
872 return autoFlush;
873 }
874
875 public boolean isUseTags() {
876 return useTags;
877 }
878
879 public int getNoOfTags() {
880 return noOfTags;
881 }
882
883 public boolean isReportLatency() {
884 return reportLatency;
885 }
886
887 public int getMultiGet() {
888 return multiGet;
889 }
890
891 public boolean isInMemoryCF() {
892 return inMemoryCF;
893 }
894
895 public int getPresplitRegions() {
896 return presplitRegions;
897 }
898
899 public Compression.Algorithm getCompression() {
900 return compression;
901 }
902
903 public DataBlockEncoding getBlockEncoding() {
904 return blockEncoding;
905 }
906
907 public boolean isValueRandom() {
908 return valueRandom;
909 }
910
911 public int getValueSize() {
912 return valueSize;
913 }
914
915 public int getPeriod() {
916 return period;
917 }
918
919 public BloomType getBloomType() {
920 return bloomType;
921 }
922
923 public boolean isOneCon() {
924 return oneCon;
925 }
926
927 public boolean getAddColumns() {
928 return addColumns;
929 }
930
931 public void setAddColumns(boolean addColumns) {
932 this.addColumns = addColumns;
933 }
934 }
935
936
937
938
939
940 static abstract class Test {
941
942
943 private static final Random randomSeed = new Random(System.currentTimeMillis());
944
945 private static long nextRandomSeed() {
946 return randomSeed.nextLong();
947 }
948 private final int everyN;
949
950 protected final Random rand = new Random(nextRandomSeed());
951 protected final Configuration conf;
952 protected final TestOptions opts;
953
954 private final Status status;
955 private final Sampler<?> traceSampler;
956 private final SpanReceiverHost receiverHost;
957 protected Connection connection;
958
959 private String testName;
960 private Histogram latency;
961 private Histogram valueSize;
962 private RandomDistribution.Zipf zipf;
963
964
965
966
967
968 Test(final Connection con, final TestOptions options, final Status status) {
969 this.connection = con;
970 this.conf = con == null ? HBaseConfiguration.create() : this.connection.getConfiguration();
971 this.opts = options;
972 this.status = status;
973 this.testName = this.getClass().getSimpleName();
974 receiverHost = SpanReceiverHost.getInstance(conf);
975 if (options.traceRate >= 1.0) {
976 this.traceSampler = Sampler.ALWAYS;
977 } else if (options.traceRate > 0.0) {
978 conf.setDouble("hbase.sampler.fraction", options.traceRate);
979 this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
980 } else {
981 this.traceSampler = Sampler.NEVER;
982 }
983 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
984 if (options.isValueZipf()) {
985 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.1);
986 }
987 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
988 }
989
990 int getValueLength(final Random r) {
991 if (this.opts.isValueRandom()) return Math.abs(r.nextInt() % opts.valueSize);
992 else if (this.opts.isValueZipf()) return Math.abs(this.zipf.nextInt());
993 else return opts.valueSize;
994 }
995
996 void updateValueSize(final Result [] rs) throws IOException {
997 if (rs == null || !isRandomValueSize()) return;
998 for (Result r: rs) updateValueSize(r);
999 }
1000
1001 void updateValueSize(final Result r) throws IOException {
1002 if (r == null || !isRandomValueSize()) return;
1003 int size = 0;
1004 for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1005 size += scanner.current().getValueLength();
1006 }
1007 updateValueSize(size);
1008 }
1009
1010 void updateValueSize(final int valueSize) {
1011 if (!isRandomValueSize()) return;
1012 this.valueSize.update(valueSize);
1013 }
1014
1015 String generateStatus(final int sr, final int i, final int lr) {
1016 return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1017 (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1018 }
1019
1020 boolean isRandomValueSize() {
1021 return opts.valueRandom;
1022 }
1023
1024 protected int getReportingPeriod() {
1025 return opts.period;
1026 }
1027
1028
1029
1030
1031 public Histogram getLatency() {
1032 return latency;
1033 }
1034
1035 void testSetup() throws IOException {
1036 if (!opts.oneCon) {
1037 this.connection = ConnectionFactory.createConnection(conf);
1038 }
1039 onStartup();
1040 latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
1041 valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
1042 }
1043
1044 abstract void onStartup() throws IOException;
1045
1046 void testTakedown() throws IOException {
1047 reportLatency();
1048 reportValueSize();
1049 onTakedown();
1050 if (!opts.oneCon) {
1051 connection.close();
1052 }
1053 receiverHost.closeReceivers();
1054 }
1055
1056 abstract void onTakedown() throws IOException;
1057
1058
1059
1060
1061
1062
1063 long test() throws IOException, InterruptedException {
1064 testSetup();
1065 LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1066 final long startTime = System.nanoTime();
1067 try {
1068 testTimed();
1069 } finally {
1070 testTakedown();
1071 }
1072 return (System.nanoTime() - startTime) / 1000000;
1073 }
1074
1075 int getStartRow() {
1076 return opts.startRow;
1077 }
1078
1079 int getLastRow() {
1080 return getStartRow() + opts.perClientRunRows;
1081 }
1082
1083
1084
1085
1086 void testTimed() throws IOException, InterruptedException {
1087 int startRow = getStartRow();
1088 int lastRow = getLastRow();
1089
1090 for (int i = startRow; i < lastRow; i++) {
1091 if (i % everyN != 0) continue;
1092 long startTime = System.nanoTime();
1093 TraceScope scope = Trace.startSpan("test row", traceSampler);
1094 try {
1095 testRow(i);
1096 } finally {
1097 scope.close();
1098 }
1099 latency.update((System.nanoTime() - startTime) / 1000);
1100 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1101 status.setStatus(generateStatus(startRow, i, lastRow));
1102 }
1103 }
1104 }
1105
1106
1107
1108
1109
1110 private void reportLatency() throws IOException {
1111 status.setStatus(testName + " latency log (microseconds), on " +
1112 latency.count() + " measures");
1113 reportHistogram(this.latency);
1114 }
1115
1116 private void reportValueSize() throws IOException {
1117 status.setStatus(testName + " valueSize after " +
1118 valueSize.count() + " measures");
1119 reportHistogram(this.valueSize);
1120 }
1121
1122 private void reportHistogram(final Histogram h) throws IOException {
1123 Snapshot sn = h.getSnapshot();
1124 status.setStatus(testName + " Min = " + h.min());
1125 status.setStatus(testName + " Avg = " + h.mean());
1126 status.setStatus(testName + " StdDev = " + h.stdDev());
1127 status.setStatus(testName + " 50th = " + sn.getMedian());
1128 status.setStatus(testName + " 75th = " + sn.get75thPercentile());
1129 status.setStatus(testName + " 95th = " + sn.get95thPercentile());
1130 status.setStatus(testName + " 99th = " + sn.get99thPercentile());
1131 status.setStatus(testName + " 99.9th = " + sn.get999thPercentile());
1132 status.setStatus(testName + " 99.99th = " + sn.getValue(0.9999));
1133 status.setStatus(testName + " 99.999th = " + sn.getValue(0.99999));
1134 status.setStatus(testName + " Max = " + h.max());
1135 }
1136
1137
1138
1139
1140 public String getShortLatencyReport() {
1141 return YammerHistogramUtils.getShortHistogramReport(this.latency);
1142 }
1143
1144
1145
1146
1147 public String getShortValueSizeReport() {
1148 return YammerHistogramUtils.getShortHistogramReport(this.valueSize);
1149 }
1150
1151
1152
1153
1154
1155 abstract void testRow(final int i) throws IOException, InterruptedException;
1156 }
1157
1158 static abstract class TableTest extends Test {
1159 protected Table table;
1160
1161 TableTest(Connection con, TestOptions options, Status status) {
1162 super(con, options, status);
1163 }
1164
1165 @Override
1166 void onStartup() throws IOException {
1167 this.table = connection.getTable(TableName.valueOf(opts.tableName));
1168 }
1169
1170 @Override
1171 void onTakedown() throws IOException {
1172 table.close();
1173 }
1174 }
1175
1176 static abstract class BufferedMutatorTest extends Test {
1177 protected BufferedMutator mutator;
1178
1179 BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1180 super(con, options, status);
1181 }
1182
1183 @Override
1184 void onStartup() throws IOException {
1185 this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
1186 }
1187
1188 @Override
1189 void onTakedown() throws IOException {
1190 mutator.close();
1191 }
1192 }
1193
1194 static class RandomSeekScanTest extends TableTest {
1195 RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1196 super(con, options, status);
1197 }
1198
1199 @Override
1200 void testRow(final int i) throws IOException {
1201 Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
1202 scan.setCaching(opts.caching);
1203 FilterList list = new FilterList();
1204 if (opts.addColumns) {
1205 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1206 } else {
1207 scan.addFamily(FAMILY_NAME);
1208 }
1209 if (opts.filterAll) {
1210 list.addFilter(new FilterAllFilter());
1211 }
1212 list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1213 scan.setFilter(list);
1214 ResultScanner s = this.table.getScanner(scan);
1215 for (Result rr; (rr = s.next()) != null;) {
1216 updateValueSize(rr);
1217 }
1218 s.close();
1219 }
1220
1221 @Override
1222 protected int getReportingPeriod() {
1223 int period = opts.perClientRunRows / 100;
1224 return period == 0 ? opts.perClientRunRows : period;
1225 }
1226
1227 }
1228
1229 static abstract class RandomScanWithRangeTest extends TableTest {
1230 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1231 super(con, options, status);
1232 }
1233
1234 @Override
1235 void testRow(final int i) throws IOException {
1236 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1237 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
1238 scan.setCaching(opts.caching);
1239 if (opts.filterAll) {
1240 scan.setFilter(new FilterAllFilter());
1241 }
1242 if (opts.addColumns) {
1243 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1244 } else {
1245 scan.addFamily(FAMILY_NAME);
1246 }
1247 Result r = null;
1248 int count = 0;
1249 ResultScanner s = this.table.getScanner(scan);
1250 for (; (r = s.next()) != null;) {
1251 updateValueSize(r);
1252 count++;
1253 }
1254 if (i % 100 == 0) {
1255 LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1256 Bytes.toString(startAndStopRow.getFirst()),
1257 Bytes.toString(startAndStopRow.getSecond()), count));
1258 }
1259
1260 s.close();
1261 }
1262
1263 protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1264
1265 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1266 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1267 int stop = start + maxRange;
1268 return new Pair<byte[],byte[]>(format(start), format(stop));
1269 }
1270
1271 @Override
1272 protected int getReportingPeriod() {
1273 int period = opts.perClientRunRows / 100;
1274 return period == 0? opts.perClientRunRows: period;
1275 }
1276 }
1277
1278 static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1279 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1280 super(con, options, status);
1281 }
1282
1283 @Override
1284 protected Pair<byte[], byte[]> getStartAndStopRow() {
1285 return generateStartAndStopRows(10);
1286 }
1287 }
1288
1289 static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1290 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1291 super(con, options, status);
1292 }
1293
1294 @Override
1295 protected Pair<byte[], byte[]> getStartAndStopRow() {
1296 return generateStartAndStopRows(100);
1297 }
1298 }
1299
1300 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1301 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1302 super(con, options, status);
1303 }
1304
1305 @Override
1306 protected Pair<byte[], byte[]> getStartAndStopRow() {
1307 return generateStartAndStopRows(1000);
1308 }
1309 }
1310
1311 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1312 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1313 super(con, options, status);
1314 }
1315
1316 @Override
1317 protected Pair<byte[], byte[]> getStartAndStopRow() {
1318 return generateStartAndStopRows(10000);
1319 }
1320 }
1321
1322 static class RandomReadTest extends TableTest {
1323 private final Consistency consistency;
1324 private ArrayList<Get> gets;
1325 private Random rd = new Random();
1326
1327 RandomReadTest(Connection con, TestOptions options, Status status) {
1328 super(con, options, status);
1329 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1330 if (opts.multiGet > 0) {
1331 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1332 this.gets = new ArrayList<Get>(opts.multiGet);
1333 }
1334 }
1335
1336 @Override
1337 void testRow(final int i) throws IOException, InterruptedException {
1338 if (opts.randomSleep > 0) {
1339 Thread.sleep(rd.nextInt(opts.randomSleep));
1340 }
1341 Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1342 if (opts.addColumns) {
1343 get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1344 } else {
1345 get.addFamily(FAMILY_NAME);
1346 }
1347 if (opts.filterAll) {
1348 get.setFilter(new FilterAllFilter());
1349 }
1350 get.setConsistency(consistency);
1351 if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1352 if (opts.multiGet > 0) {
1353 this.gets.add(get);
1354 if (this.gets.size() == opts.multiGet) {
1355 Result [] rs = this.table.get(this.gets);
1356 updateValueSize(rs);
1357 this.gets.clear();
1358 }
1359 } else {
1360 updateValueSize(this.table.get(get));
1361 }
1362 }
1363
1364 @Override
1365 protected int getReportingPeriod() {
1366 int period = opts.perClientRunRows / 10;
1367 return period == 0 ? opts.perClientRunRows : period;
1368 }
1369
1370 @Override
1371 protected void testTakedown() throws IOException {
1372 if (this.gets != null && this.gets.size() > 0) {
1373 this.table.get(gets);
1374 this.gets.clear();
1375 }
1376 super.testTakedown();
1377 }
1378 }
1379
1380 static class RandomWriteTest extends BufferedMutatorTest {
1381 RandomWriteTest(Connection con, TestOptions options, Status status) {
1382 super(con, options, status);
1383 }
1384
1385 @Override
1386 void testRow(final int i) throws IOException {
1387 byte[] row = getRandomRow(this.rand, opts.totalRows);
1388 Put put = new Put(row);
1389 for (int column = 0; column < opts.columns; column++) {
1390 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1391 byte[] value = generateData(this.rand, getValueLength(this.rand));
1392 if (opts.useTags) {
1393 byte[] tag = generateData(this.rand, TAG_LENGTH);
1394 Tag[] tags = new Tag[opts.noOfTags];
1395 for (int n = 0; n < opts.noOfTags; n++) {
1396 Tag t = new Tag((byte) n, tag);
1397 tags[n] = t;
1398 }
1399 KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
1400 value, tags);
1401 put.add(kv);
1402 updateValueSize(kv.getValueLength());
1403 } else {
1404 put.add(FAMILY_NAME, qualifier, value);
1405 updateValueSize(value.length);
1406 }
1407 }
1408 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1409 mutator.mutate(put);
1410 }
1411 }
1412
1413 static class ScanTest extends TableTest {
1414 private ResultScanner testScanner;
1415
1416 ScanTest(Connection con, TestOptions options, Status status) {
1417 super(con, options, status);
1418 }
1419
1420 @Override
1421 void testTakedown() throws IOException {
1422 if (this.testScanner != null) {
1423 this.testScanner.close();
1424 }
1425 super.testTakedown();
1426 }
1427
1428
1429 @Override
1430 void testRow(final int i) throws IOException {
1431 if (this.testScanner == null) {
1432 Scan scan = new Scan(format(opts.startRow));
1433 scan.setCaching(opts.caching);
1434 if (opts.addColumns) {
1435 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1436 } else {
1437 scan.addFamily(FAMILY_NAME);
1438 }
1439 if (opts.filterAll) {
1440 scan.setFilter(new FilterAllFilter());
1441 }
1442 this.testScanner = table.getScanner(scan);
1443 }
1444 Result r = testScanner.next();
1445 updateValueSize(r);
1446 }
1447 }
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457 static abstract class CASTableTest extends TableTest {
1458 private final byte [] qualifier;
1459 CASTableTest(Connection con, TestOptions options, Status status) {
1460 super(con, options, status);
1461 qualifier = Bytes.toBytes(this.getClass().getSimpleName());
1462 }
1463
1464 byte [] getQualifier() {
1465 return this.qualifier;
1466 }
1467
1468 @Override
1469 int getStartRow() {
1470 return 0;
1471 }
1472
1473 @Override
1474 int getLastRow() {
1475 return opts.perClientRunRows;
1476 }
1477 }
1478
1479 static class IncrementTest extends CASTableTest {
1480 IncrementTest(Connection con, TestOptions options, Status status) {
1481 super(con, options, status);
1482 }
1483
1484 @Override
1485 void testRow(final int i) throws IOException {
1486 Increment increment = new Increment(format(i));
1487 increment.addColumn(FAMILY_NAME, getQualifier(), 1l);
1488 updateValueSize(this.table.increment(increment));
1489 }
1490 }
1491
1492 static class AppendTest extends CASTableTest {
1493 AppendTest(Connection con, TestOptions options, Status status) {
1494 super(con, options, status);
1495 }
1496
1497 @Override
1498 void testRow(final int i) throws IOException {
1499 byte [] bytes = format(i);
1500 Append append = new Append(bytes);
1501 append.add(FAMILY_NAME, getQualifier(), bytes);
1502 updateValueSize(this.table.append(append));
1503 }
1504 }
1505
1506 static class CheckAndMutateTest extends CASTableTest {
1507 CheckAndMutateTest(Connection con, TestOptions options, Status status) {
1508 super(con, options, status);
1509 }
1510
1511 @Override
1512 void testRow(final int i) throws IOException {
1513 byte [] bytes = format(i);
1514
1515 Put put = new Put(bytes);
1516 put.addColumn(FAMILY_NAME, getQualifier(), bytes);
1517 this.table.put(put);
1518 RowMutations mutations = new RowMutations(bytes);
1519 mutations.add(put);
1520 this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes,
1521 mutations);
1522 }
1523 }
1524
1525 static class CheckAndPutTest extends CASTableTest {
1526 CheckAndPutTest(Connection con, TestOptions options, Status status) {
1527 super(con, options, status);
1528 }
1529
1530 @Override
1531 void testRow(final int i) throws IOException {
1532 byte [] bytes = format(i);
1533
1534 Put put = new Put(bytes);
1535 put.addColumn(FAMILY_NAME, getQualifier(), bytes);
1536 this.table.put(put);
1537 this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, put);
1538 }
1539 }
1540
1541 static class CheckAndDeleteTest extends CASTableTest {
1542 CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
1543 super(con, options, status);
1544 }
1545
1546 @Override
1547 void testRow(final int i) throws IOException {
1548 byte [] bytes = format(i);
1549
1550 Put put = new Put(bytes);
1551 put.addColumn(FAMILY_NAME, getQualifier(), bytes);
1552 this.table.put(put);
1553 Delete delete = new Delete(put.getRow());
1554 delete.addColumn(FAMILY_NAME, getQualifier());
1555 this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, delete);
1556 }
1557 }
1558
1559 static class SequentialReadTest extends TableTest {
1560 SequentialReadTest(Connection con, TestOptions options, Status status) {
1561 super(con, options, status);
1562 }
1563
1564 @Override
1565 void testRow(final int i) throws IOException {
1566 Get get = new Get(format(i));
1567 if (opts.addColumns) {
1568 get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1569 }
1570 if (opts.filterAll) {
1571 get.setFilter(new FilterAllFilter());
1572 }
1573 updateValueSize(table.get(get));
1574 }
1575 }
1576
1577 static class SequentialWriteTest extends BufferedMutatorTest {
1578 SequentialWriteTest(Connection con, TestOptions options, Status status) {
1579 super(con, options, status);
1580 }
1581
1582 @Override
1583 void testRow(final int i) throws IOException {
1584 byte[] row = format(i);
1585 Put put = new Put(row);
1586 for (int column = 0; column < opts.columns; column++) {
1587 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1588 byte[] value = generateData(this.rand, getValueLength(this.rand));
1589 if (opts.useTags) {
1590 byte[] tag = generateData(this.rand, TAG_LENGTH);
1591 Tag[] tags = new Tag[opts.noOfTags];
1592 for (int n = 0; n < opts.noOfTags; n++) {
1593 Tag t = new Tag((byte) n, tag);
1594 tags[n] = t;
1595 }
1596 KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
1597 value, tags);
1598 put.add(kv);
1599 updateValueSize(kv.getValueLength());
1600 } else {
1601 put.add(FAMILY_NAME, qualifier, value);
1602 updateValueSize(value.length);
1603 }
1604 }
1605 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1606 mutator.mutate(put);
1607 }
1608 }
1609
1610 static class FilteredScanTest extends TableTest {
1611 protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
1612
1613 FilteredScanTest(Connection con, TestOptions options, Status status) {
1614 super(con, options, status);
1615 }
1616
1617 @Override
1618 void testRow(int i) throws IOException {
1619 byte[] value = generateData(this.rand, getValueLength(this.rand));
1620 Scan scan = constructScan(value);
1621 ResultScanner scanner = null;
1622 try {
1623 scanner = this.table.getScanner(scan);
1624 for (Result r = null; (r = scanner.next()) != null;) {
1625 updateValueSize(r);
1626 }
1627 } finally {
1628 if (scanner != null) scanner.close();
1629 }
1630 }
1631
1632 protected Scan constructScan(byte[] valuePrefix) throws IOException {
1633 FilterList list = new FilterList();
1634 Filter filter = new SingleColumnValueFilter(
1635 FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL,
1636 new BinaryComparator(valuePrefix)
1637 );
1638 list.addFilter(filter);
1639 if(opts.filterAll) {
1640 list.addFilter(new FilterAllFilter());
1641 }
1642 Scan scan = new Scan();
1643 scan.setCaching(opts.caching);
1644 if (opts.addColumns) {
1645 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1646 } else {
1647 scan.addFamily(FAMILY_NAME);
1648 }
1649 scan.setFilter(list);
1650 return scan;
1651 }
1652 }
1653
1654
1655
1656
1657
1658
1659
1660 private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) {
1661 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
1662 ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns));
1663 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
1664 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
1665 .divide(BYTES_PER_MB, CXT);
1666 return FMT.format(mbps) + " MB/s";
1667 }
1668
1669
1670
1671
1672
1673
1674
1675 public static byte [] format(final int number) {
1676 byte [] b = new byte[ROW_LENGTH];
1677 int d = Math.abs(number);
1678 for (int i = b.length - 1; i >= 0; i--) {
1679 b[i] = (byte)((d % 10) + '0');
1680 d /= 10;
1681 }
1682 return b;
1683 }
1684
1685
1686
1687
1688
1689
1690
1691 public static byte[] generateData(final Random r, int length) {
1692 byte [] b = new byte [length];
1693 int i;
1694
1695 for(i = 0; i < (length-8); i += 8) {
1696 b[i] = (byte) (65 + r.nextInt(26));
1697 b[i+1] = b[i];
1698 b[i+2] = b[i];
1699 b[i+3] = b[i];
1700 b[i+4] = b[i];
1701 b[i+5] = b[i];
1702 b[i+6] = b[i];
1703 b[i+7] = b[i];
1704 }
1705
1706 byte a = (byte) (65 + r.nextInt(26));
1707 for(; i < length; i++) {
1708 b[i] = a;
1709 }
1710 return b;
1711 }
1712
1713
1714
1715
1716
1717 @Deprecated
1718 public static byte[] generateValue(final Random r) {
1719 return generateData(r, DEFAULT_VALUE_LENGTH);
1720 }
1721
1722 static byte [] getRandomRow(final Random random, final int totalRows) {
1723 return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1724 }
1725
1726 static RunResult runOneClient(final Class<? extends Test> cmd, Configuration conf, Connection con,
1727 TestOptions opts, final Status status)
1728 throws IOException, InterruptedException {
1729 status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
1730 opts.perClientRunRows + " rows");
1731 long totalElapsedTime;
1732
1733 final Test t;
1734 try {
1735 Constructor<? extends Test> constructor =
1736 cmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
1737 t = constructor.newInstance(con, opts, status);
1738 } catch (NoSuchMethodException e) {
1739 throw new IllegalArgumentException("Invalid command class: " +
1740 cmd.getName() + ". It does not provide a constructor as described by " +
1741 "the javadoc comment. Available constructors are: " +
1742 Arrays.toString(cmd.getConstructors()));
1743 } catch (Exception e) {
1744 throw new IllegalStateException("Failed to construct command class", e);
1745 }
1746 totalElapsedTime = t.test();
1747
1748 status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
1749 "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
1750 " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
1751 getAverageValueLength(opts), opts.columns) + ")");
1752
1753 return new RunResult(totalElapsedTime, t.getLatency());
1754 }
1755
1756 private static int getAverageValueLength(final TestOptions opts) {
1757 return opts.valueRandom? opts.valueSize/2: opts.valueSize;
1758 }
1759
1760 private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
1761 InterruptedException, ClassNotFoundException {
1762
1763
1764 LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts));
1765 try(Connection conn = ConnectionFactory.createConnection(getConf());
1766 Admin admin = conn.getAdmin()) {
1767 checkTable(admin, opts);
1768 }
1769 if (opts.nomapred) {
1770 doLocalClients(opts, getConf());
1771 } else {
1772 doMapReduce(opts, getConf());
1773 }
1774 }
1775
1776 protected void printUsage() {
1777 printUsage(this.getClass().getName(), null);
1778 }
1779
1780 protected static void printUsage(final String message) {
1781 printUsage(PerformanceEvaluation.class.getName(), message);
1782 }
1783
1784 protected static void printUsageAndExit(final String message, final int exitCode) {
1785 printUsage(message);
1786 System.exit(exitCode);
1787 }
1788
1789 protected static void printUsage(final String className, final String message) {
1790 if (message != null && message.length() > 0) {
1791 System.err.println(message);
1792 }
1793 System.err.println("Usage: java " + className + " \\");
1794 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>");
1795 System.err.println();
1796 System.err.println("Options:");
1797 System.err.println(" nomapred Run multiple clients using threads " +
1798 "(rather than use mapreduce)");
1799 System.err.println(" rows Rows each client runs. Default: One million");
1800 System.err.println(" size Total size in GiB. Mutually exclusive with --rows. " +
1801 "Default: 1.0.");
1802 System.err.println(" sampleRate Execute test on a sample of total " +
1803 "rows. Only supported by randomRead. Default: 1.0");
1804 System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
1805 "Default: 0");
1806 System.err.println(" table Alternate table name. Default: 'TestTable'");
1807 System.err.println(" multiGet If >0, when doing RandomRead, perform multiple gets " +
1808 "instead of single gets. Default: 0");
1809 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'");
1810 System.err.println(" flushCommits Used to determine if the test should flush the table. " +
1811 "Default: false");
1812 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True");
1813 System.err.println(" autoFlush Set autoFlush on htable. Default: False");
1814 System.err.println(" oneCon all the threads share the same connection. Default: False");
1815 System.err.println(" presplit Create presplit table. Recommended for accurate perf " +
1816 "analysis (see guide). Default: disabled");
1817 System.err.println(" inmemory Tries to keep the HFiles of the CF " +
1818 "inmemory as far as possible. Not guaranteed that reads are always served " +
1819 "from memory. Default: false");
1820 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " +
1821 "Default: false");
1822 System.err.println(" numoftags Specify the no of tags that would be needed. " +
1823 "This works only if usetags is true.");
1824 System.err.println(" filterAll Helps to filter out all the rows on the server side"
1825 + " there by not returning any thing back to the client. Helps to check the server side"
1826 + " performance. Uses FilterAllFilter internally. ");
1827 System.err.println(" latency Set to report operation latencies. Default: False");
1828 System.err.println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values()));
1829 System.err.println(" valueSize Pass value size to use: Default: 1024");
1830 System.err.println(" valueRandom Set if we should vary value size between 0 and " +
1831 "'valueSize'; set on read for stats on size: Default: Not set.");
1832 System.err.println(" valueZipf Set if we should vary value size between 0 and " +
1833 "'valueSize' in zipf form: Default: Not set.");
1834 System.err.println(" period Report every 'period' rows: " +
1835 "Default: opts.perClientRunRows / 10");
1836 System.err.println(" multiGet Batch gets together into groups of N. Only supported " +
1837 "by randomRead. Default: disabled");
1838 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true");
1839 System.err.println(" replicas Enable region replica testing. Defaults: 1.");
1840 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
1841 System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0");
1842 System.err.println(" columns Columns to write per row. Default: 1");
1843 System.err.println(" caching Scan caching to use. Default: 30");
1844 System.err.println();
1845 System.err.println(" Note: -D properties will be applied to the conf used. ");
1846 System.err.println(" For example: ");
1847 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true");
1848 System.err.println(" -Dmapreduce.task.timeout=60000");
1849 System.err.println();
1850 System.err.println("Command:");
1851 for (CmdDescriptor command : COMMANDS.values()) {
1852 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1853 }
1854 System.err.println();
1855 System.err.println("Args:");
1856 System.err.println(" nclients Integer. Required. Total number of " +
1857 "clients (and HRegionServers)");
1858 System.err.println(" running: 1 <= value <= 500");
1859 System.err.println("Examples:");
1860 System.err.println(" To run a single client doing the default 1M sequentialWrites:");
1861 System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
1862 System.err.println(" To run 10 clients doing increments over ten rows:");
1863 System.err.println(" $ bin/hbase " + className + " --rows=10 --nomapred increment 10");
1864 }
1865
1866
1867
1868
1869
1870
1871
1872 static TestOptions parseOpts(Queue<String> args) {
1873 TestOptions opts = new TestOptions();
1874
1875 String cmd = null;
1876 while ((cmd = args.poll()) != null) {
1877 if (cmd.equals("-h") || cmd.startsWith("--h")) {
1878
1879 args.add(cmd);
1880 break;
1881 }
1882
1883 final String nmr = "--nomapred";
1884 if (cmd.startsWith(nmr)) {
1885 opts.nomapred = true;
1886 continue;
1887 }
1888
1889 final String rows = "--rows=";
1890 if (cmd.startsWith(rows)) {
1891 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
1892 continue;
1893 }
1894
1895 final String sampleRate = "--sampleRate=";
1896 if (cmd.startsWith(sampleRate)) {
1897 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
1898 continue;
1899 }
1900
1901 final String table = "--table=";
1902 if (cmd.startsWith(table)) {
1903 opts.tableName = cmd.substring(table.length());
1904 continue;
1905 }
1906
1907 final String startRow = "--startRow=";
1908 if (cmd.startsWith(startRow)) {
1909 opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
1910 continue;
1911 }
1912
1913 final String compress = "--compress=";
1914 if (cmd.startsWith(compress)) {
1915 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
1916 continue;
1917 }
1918
1919 final String traceRate = "--traceRate=";
1920 if (cmd.startsWith(traceRate)) {
1921 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
1922 continue;
1923 }
1924
1925 final String blockEncoding = "--blockEncoding=";
1926 if (cmd.startsWith(blockEncoding)) {
1927 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
1928 continue;
1929 }
1930
1931 final String flushCommits = "--flushCommits=";
1932 if (cmd.startsWith(flushCommits)) {
1933 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
1934 continue;
1935 }
1936
1937 final String writeToWAL = "--writeToWAL=";
1938 if (cmd.startsWith(writeToWAL)) {
1939 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
1940 continue;
1941 }
1942
1943 final String presplit = "--presplit=";
1944 if (cmd.startsWith(presplit)) {
1945 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
1946 continue;
1947 }
1948
1949 final String inMemory = "--inmemory=";
1950 if (cmd.startsWith(inMemory)) {
1951 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
1952 continue;
1953 }
1954
1955 final String autoFlush = "--autoFlush=";
1956 if (cmd.startsWith(autoFlush)) {
1957 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
1958 continue;
1959 }
1960
1961 final String onceCon = "--oneCon=";
1962 if (cmd.startsWith(onceCon)) {
1963 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
1964 continue;
1965 }
1966
1967 final String latency = "--latency";
1968 if (cmd.startsWith(latency)) {
1969 opts.reportLatency = true;
1970 continue;
1971 }
1972
1973 final String multiGet = "--multiGet=";
1974 if (cmd.startsWith(multiGet)) {
1975 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
1976 continue;
1977 }
1978
1979 final String useTags = "--usetags=";
1980 if (cmd.startsWith(useTags)) {
1981 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
1982 continue;
1983 }
1984
1985 final String noOfTags = "--numoftags=";
1986 if (cmd.startsWith(noOfTags)) {
1987 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
1988 continue;
1989 }
1990
1991 final String replicas = "--replicas=";
1992 if (cmd.startsWith(replicas)) {
1993 opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
1994 continue;
1995 }
1996
1997 final String filterOutAll = "--filterAll";
1998 if (cmd.startsWith(filterOutAll)) {
1999 opts.filterAll = true;
2000 continue;
2001 }
2002
2003 final String size = "--size=";
2004 if (cmd.startsWith(size)) {
2005 opts.size = Float.parseFloat(cmd.substring(size.length()));
2006 continue;
2007 }
2008
2009 final String splitPolicy = "--splitPolicy=";
2010 if (cmd.startsWith(splitPolicy)) {
2011 opts.splitPolicy = cmd.substring(splitPolicy.length());
2012 continue;
2013 }
2014
2015 final String randomSleep = "--randomSleep=";
2016 if (cmd.startsWith(randomSleep)) {
2017 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2018 continue;
2019 }
2020
2021 final String bloomFilter = "--bloomFilter=";
2022 if (cmd.startsWith(bloomFilter)) {
2023 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2024 continue;
2025 }
2026
2027 final String valueSize = "--valueSize=";
2028 if (cmd.startsWith(valueSize)) {
2029 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2030 continue;
2031 }
2032
2033 final String valueRandom = "--valueRandom";
2034 if (cmd.startsWith(valueRandom)) {
2035 opts.valueRandom = true;
2036 if (opts.valueZipf) {
2037 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2038 }
2039 continue;
2040 }
2041
2042 final String valueZipf = "--valueZipf";
2043 if (cmd.startsWith(valueZipf)) {
2044 opts.valueZipf = true;
2045 if (opts.valueRandom) {
2046 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2047 }
2048 continue;
2049 }
2050
2051 final String period = "--period=";
2052 if (cmd.startsWith(period)) {
2053 opts.period = Integer.parseInt(cmd.substring(period.length()));
2054 continue;
2055 }
2056
2057 final String addColumns = "--addColumns=";
2058 if (cmd.startsWith(addColumns)) {
2059 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2060 continue;
2061 }
2062
2063 final String columns = "--columns=";
2064 if (cmd.startsWith(columns)) {
2065 opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2066 continue;
2067 }
2068
2069 final String caching = "--caching=";
2070 if (cmd.startsWith(caching)) {
2071 opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2072 continue;
2073 }
2074
2075 if (isCommandClass(cmd)) {
2076 opts.cmdName = cmd;
2077 opts.numClientThreads = Integer.parseInt(args.remove());
2078 int rowsPerGB = getRowsPerGB(opts);
2079 if (opts.size != DEFAULT_OPTS.size &&
2080 opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2081 throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
2082 }
2083 if (opts.size != DEFAULT_OPTS.size) {
2084
2085 opts.totalRows = (int) opts.size * rowsPerGB;
2086 opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
2087 } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2088
2089 opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
2090 opts.size = opts.totalRows / rowsPerGB;
2091 }
2092 break;
2093 } else {
2094 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2095 }
2096
2097
2098 System.err.println("Error: Wrong option or command: " + cmd);
2099 args.add(cmd);
2100 break;
2101 }
2102 return opts;
2103 }
2104
2105 static int getRowsPerGB(final TestOptions opts) {
2106 return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns());
2107 }
2108
2109 @Override
2110 public int run(String[] args) throws Exception {
2111
2112
2113 int errCode = -1;
2114 if (args.length < 1) {
2115 printUsage();
2116 return errCode;
2117 }
2118
2119 try {
2120 LinkedList<String> argv = new LinkedList<String>();
2121 argv.addAll(Arrays.asList(args));
2122 TestOptions opts = parseOpts(argv);
2123
2124
2125 if (!argv.isEmpty()) {
2126 errCode = 0;
2127 printUsage();
2128 return errCode;
2129 }
2130
2131
2132 if (opts.numClientThreads <= 0) {
2133 throw new IllegalArgumentException("Number of clients must be > 0");
2134 }
2135
2136 Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
2137 if (cmdClass != null) {
2138 runTest(cmdClass, opts);
2139 errCode = 0;
2140 }
2141
2142 } catch (Exception e) {
2143 e.printStackTrace();
2144 }
2145
2146 return errCode;
2147 }
2148
2149 private static boolean isCommandClass(String cmd) {
2150 return COMMANDS.containsKey(cmd);
2151 }
2152
2153 private static Class<? extends Test> determineCommandClass(String cmd) {
2154 CmdDescriptor descriptor = COMMANDS.get(cmd);
2155 return descriptor != null ? descriptor.getCmdClass() : null;
2156 }
2157
2158 public static void main(final String[] args) throws Exception {
2159 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
2160 System.exit(res);
2161 }
2162 }