View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.util;
18  
19  import java.io.IOException;
20  import java.io.InterruptedIOException;
21  import java.lang.reflect.Constructor;
22  import java.net.InetAddress;
23  import java.security.SecureRandom;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.List;
27  import java.util.Properties;
28  import java.util.Random;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import javax.crypto.spec.SecretKeySpec;
32  
33  import org.apache.commons.cli.CommandLine;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.client.Admin;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  import org.apache.hadoop.hbase.client.Durability;
48  import org.apache.hadoop.hbase.client.HBaseAdmin;
49  import org.apache.hadoop.hbase.io.compress.Compression;
50  import org.apache.hadoop.hbase.io.crypto.Cipher;
51  import org.apache.hadoop.hbase.io.crypto.Encryption;
52  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
53  import org.apache.hadoop.hbase.regionserver.BloomType;
54  import org.apache.hadoop.hbase.security.EncryptionUtil;
55  import org.apache.hadoop.hbase.security.User;
56  import org.apache.hadoop.hbase.security.access.AccessControlClient;
57  import org.apache.hadoop.hbase.security.access.Permission;
58  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
59  import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
60  import org.apache.hadoop.security.SecurityUtil;
61  import org.apache.hadoop.security.UserGroupInformation;
62  import org.apache.hadoop.util.ToolRunner;
63  
64  /**
65   * A command-line utility that reads, writes, and verifies data. Unlike
66   * {@link PerformanceEvaluation}, this tool validates the data written,
67   * and supports simultaneously writing and reading the same set of keys.
68   */
69  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
70  public class LoadTestTool extends AbstractHBaseTool {
71  
72    private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
73    private static final String COLON = ":";
74  
75    /** Table name for the test */
76    private TableName tableName;
77  
78    /** Column families for the test */
79    private byte[][] families;
80  
81    /** Table name to use of not overridden on the command line */
82    protected static final String DEFAULT_TABLE_NAME = "cluster_test";
83  
84    /** Column family used by the test */
85    public static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf");
86  
87    /** Column families used by the test */
88    public static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY };
89  
90    /** The default data size if not specified */
91    protected static final int DEFAULT_DATA_SIZE = 64;
92  
93    /** The number of reader/writer threads if not specified */
94    protected static final int DEFAULT_NUM_THREADS = 20;
95  
96    /** Usage string for the load option */
97    protected static final String OPT_USAGE_LOAD =
98        "<avg_cols_per_key>:<avg_data_size>" +
99        "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
100 
101   /** Usage string for the read option */
102   protected static final String OPT_USAGE_READ =
103       "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
104 
105   /** Usage string for the update option */
106   protected static final String OPT_USAGE_UPDATE =
107       "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
108       + ">][:<#whether to ignore nonce collisions=0>]";
109 
110   protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
111       Arrays.toString(BloomType.values());
112 
113   protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
114       "one of " + Arrays.toString(Compression.Algorithm.values());
115 
116   public static final String OPT_DATA_BLOCK_ENCODING_USAGE =
117     "Encoding algorithm (e.g. prefix "
118         + "compression) to use for data blocks in the test column family, "
119         + "one of " + Arrays.toString(DataBlockEncoding.values()) + ".";
120 
121   public static final String OPT_BLOOM = "bloom";
122   public static final String OPT_COMPRESSION = "compression";
123   public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
124   public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
125 
126   public static final String OPT_DATA_BLOCK_ENCODING =
127       HColumnDescriptor.DATA_BLOCK_ENCODING.toLowerCase();
128 
129   public static final String OPT_INMEMORY = "in_memory";
130   public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
131       "inmemory as far as possible.  Not guaranteed that reads are always served from inmemory";
132 
133   public static final String OPT_GENERATOR = "generator";
134   public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
135       + " Any args for this class can be passed as colon separated after class name";
136 
137   public static final String OPT_WRITER = "writer";
138   public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
139 
140   public static final String OPT_UPDATER = "updater";
141   public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
142 
143   public static final String OPT_READER = "reader";
144   public static final String OPT_READER_USAGE = "The class for executing the read requests";
145 
146   protected static final String OPT_KEY_WINDOW = "key_window";
147   protected static final String OPT_WRITE = "write";
148   protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
149   public static final String OPT_MULTIPUT = "multiput";
150   public static final String OPT_MULTIGET = "multiget_batchsize";
151   protected static final String OPT_NUM_KEYS = "num_keys";
152   protected static final String OPT_READ = "read";
153   protected static final String OPT_START_KEY = "start_key";
154   public static final String OPT_TABLE_NAME = "tn";
155   public static final String OPT_COLUMN_FAMILIES = "families";
156   protected static final String OPT_ZK_QUORUM = "zk";
157   protected static final String OPT_ZK_PARENT_NODE = "zk_root";
158   protected static final String OPT_SKIP_INIT = "skip_init";
159   protected static final String OPT_INIT_ONLY = "init_only";
160   protected static final String NUM_TABLES = "num_tables";
161   protected static final String OPT_REGIONS_PER_SERVER = "regions_per_server";
162   protected static final String OPT_BATCHUPDATE = "batchupdate";
163   protected static final String OPT_UPDATE = "update";
164 
165   public static final String OPT_ENCRYPTION = "encryption";
166   protected static final String OPT_ENCRYPTION_USAGE =
167     "Enables transparent encryption on the test table, one of " +
168     Arrays.toString(Encryption.getSupportedCiphers());
169 
170   public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
171   protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
172     = "Desired number of regions per region server. Defaults to 5.";
173   public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
174 
175   public static final String OPT_REGION_REPLICATION = "region_replication";
176   protected static final String OPT_REGION_REPLICATION_USAGE =
177       "Desired number of replicas per region";
178 
179   public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
180   protected static final String OPT_REGION_REPLICA_ID_USAGE =
181       "Region replica id to do the reads from";
182 
183   protected static final long DEFAULT_START_KEY = 0;
184 
185   /** This will be removed as we factor out the dependency on command line */
186   protected CommandLine cmd;
187 
188   protected MultiThreadedWriter writerThreads = null;
189   protected MultiThreadedReader readerThreads = null;
190   protected MultiThreadedUpdater updaterThreads = null;
191 
192   protected long startKey, endKey;
193 
194   protected boolean isWrite, isRead, isUpdate;
195   protected boolean deferredLogFlush;
196 
197   // Column family options
198   protected DataBlockEncoding dataBlockEncodingAlgo;
199   protected Compression.Algorithm compressAlgo;
200   protected BloomType bloomType;
201   private boolean inMemoryCF;
202 
203   private User userOwner;
204   // Writer options
205   protected int numWriterThreads = DEFAULT_NUM_THREADS;
206   protected int minColsPerKey, maxColsPerKey;
207   protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
208   protected boolean isMultiPut;
209 
210   // Updater options
211   protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
212   protected int updatePercent;
213   protected boolean ignoreConflicts = false;
214   protected boolean isBatchUpdate;
215 
216   // Reader options
217   private int numReaderThreads = DEFAULT_NUM_THREADS;
218   private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
219   private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
220   private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
221   private int verifyPercent;
222 
223   private int numTables = 1;
224 
225   private String superUser;
226 
227   private String userNames;
228   //This file is used to read authentication information in secure clusters.
229   private String authnFileName;
230 
231   private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
232   private int regionReplication = -1; // not set
233   private int regionReplicaId = -1; // not set
234 
235   // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
236   //       console tool itself should only be used from console.
237   protected boolean isSkipInit = false;
238   protected boolean isInitOnly = false;
239 
240   protected Cipher cipher = null;
241 
242   protected String[] splitColonSeparated(String option,
243       int minNumCols, int maxNumCols) {
244     String optVal = cmd.getOptionValue(option);
245     String[] cols = optVal.split(COLON);
246     if (cols.length < minNumCols || cols.length > maxNumCols) {
247       throw new IllegalArgumentException("Expected at least "
248           + minNumCols + " columns but no more than " + maxNumCols +
249           " in the colon-separated value '" + optVal + "' of the " +
250           "-" + option + " option");
251     }
252     return cols;
253   }
254 
255   protected int getNumThreads(String numThreadsStr) {
256     return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
257   }
258 
259   public byte[][] getColumnFamilies() {
260     return families;
261   }
262 
263   /**
264    * Apply column family options such as Bloom filters, compression, and data
265    * block encoding.
266    */
267   protected void applyColumnFamilyOptions(TableName tableName,
268       byte[][] columnFamilies) throws IOException {
269     Admin admin = new HBaseAdmin(conf);
270     HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
271     LOG.info("Disabling table " + tableName);
272     admin.disableTable(tableName);
273     for (byte[] cf : columnFamilies) {
274       HColumnDescriptor columnDesc = tableDesc.getFamily(cf);
275       boolean isNewCf = columnDesc == null;
276       if (isNewCf) {
277         columnDesc = new HColumnDescriptor(cf);
278       }
279       if (bloomType != null) {
280         columnDesc.setBloomFilterType(bloomType);
281       }
282       if (compressAlgo != null) {
283         columnDesc.setCompressionType(compressAlgo);
284       }
285       if (dataBlockEncodingAlgo != null) {
286         columnDesc.setDataBlockEncoding(dataBlockEncodingAlgo);
287       }
288       if (inMemoryCF) {
289         columnDesc.setInMemory(inMemoryCF);
290       }
291       if (cipher != null) {
292         byte[] keyBytes = new byte[cipher.getKeyLength()];
293         new SecureRandom().nextBytes(keyBytes);
294         columnDesc.setEncryptionType(cipher.getName());
295         columnDesc.setEncryptionKey(EncryptionUtil.wrapKey(conf,
296           User.getCurrent().getShortName(),
297           new SecretKeySpec(keyBytes, cipher.getName())));
298       }
299       if (isNewCf) {
300         admin.addColumn(tableName, columnDesc);
301       } else {
302         admin.modifyColumn(tableName, columnDesc);
303       }
304     }
305     LOG.info("Enabling table " + tableName);
306     admin.enableTable(tableName);
307     admin.close();
308   }
309 
310   @Override
311   protected void addOptions() {
312     addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
313         "without port numbers");
314     addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
315     addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
316     addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
317     addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
318     addOptWithArg(OPT_READ, OPT_USAGE_READ);
319     addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
320     addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
321     addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
322     addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
323     addOptWithArg(OPT_DATA_BLOCK_ENCODING, OPT_DATA_BLOCK_ENCODING_USAGE);
324     addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
325         "to tolerate before terminating all reader threads. The default is " +
326         MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
327     addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
328         "separate gets for every column in a row");
329     addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
330         "reads and writes for concurrent write/read workload. The default " +
331         "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
332 
333     addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
334         "separate puts for every column in a row");
335     addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
336         "separate updates for every column in a row");
337     addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
338     addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
339     addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
340     addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
341     addOptWithArg(OPT_READER, OPT_READER_USAGE);
342 
343     addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
344     addOptWithArg(OPT_START_KEY, "The first key to read/write " +
345         "(a 0-based index). The default value is " +
346         DEFAULT_START_KEY + ".");
347     addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
348         + "already exists");
349 
350     addOptWithArg(NUM_TABLES,
351       "A positive integer number. When a number n is speicfied, load test "
352           + "tool  will load n table parallely. -tn parameter value becomes "
353           + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
354 
355     addOptWithArg(OPT_REGIONS_PER_SERVER,
356       "A positive integer number. When a number n is specified, load test "
357           + "tool will create the test table with n regions per server");
358 
359     addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
360     addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
361     addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
362     addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
363     addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
364   }
365 
366   @Override
367   protected void processOptions(CommandLine cmd) {
368     this.cmd = cmd;
369 
370     tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
371         DEFAULT_TABLE_NAME));
372 
373     if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
374       String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
375       families = new byte[list.length][];
376       for (int i = 0; i < list.length; i++) {
377         families[i] = Bytes.toBytes(list[i]);
378       }
379     } else {
380       families = DEFAULT_COLUMN_FAMILIES;
381     }
382 
383     isWrite = cmd.hasOption(OPT_WRITE);
384     isRead = cmd.hasOption(OPT_READ);
385     isUpdate = cmd.hasOption(OPT_UPDATE);
386     isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
387     deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
388 
389     if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
390       throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
391         "-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified");
392     }
393 
394     if (isInitOnly && (isRead || isWrite || isUpdate)) {
395       throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
396           + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
397     }
398 
399     if (!isInitOnly) {
400       if (!cmd.hasOption(OPT_NUM_KEYS)) {
401         throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in "
402             + "read or write mode");
403       }
404       startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
405           String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
406       long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
407           Long.MAX_VALUE - startKey);
408       endKey = startKey + numKeys;
409       isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
410       System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
411     }
412 
413     parseColumnFamilyOptions(cmd);
414 
415     if (isWrite) {
416       String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
417 
418       int colIndex = 0;
419       minColsPerKey = 1;
420       maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
421       int avgColDataSize =
422           parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
423       minColDataSize = avgColDataSize / 2;
424       maxColDataSize = avgColDataSize * 3 / 2;
425 
426       if (colIndex < writeOpts.length) {
427         numWriterThreads = getNumThreads(writeOpts[colIndex++]);
428       }
429 
430       isMultiPut = cmd.hasOption(OPT_MULTIPUT);
431 
432       System.out.println("Multi-puts: " + isMultiPut);
433       System.out.println("Columns per key: " + minColsPerKey + ".."
434           + maxColsPerKey);
435       System.out.println("Data size per column: " + minColDataSize + ".."
436           + maxColDataSize);
437     }
438 
439     if (isUpdate) {
440       String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
441       int colIndex = 0;
442       updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
443       if (colIndex < mutateOpts.length) {
444         numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
445       }
446       if (colIndex < mutateOpts.length) {
447         ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
448       }
449 
450       isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
451 
452       System.out.println("Batch updates: " + isBatchUpdate);
453       System.out.println("Percent of keys to update: " + updatePercent);
454       System.out.println("Updater threads: " + numUpdaterThreads);
455       System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
456     }
457 
458     if (isRead) {
459       String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
460       int colIndex = 0;
461       verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
462       if (colIndex < readOpts.length) {
463         numReaderThreads = getNumThreads(readOpts[colIndex++]);
464       }
465 
466       if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
467         maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
468             0, Integer.MAX_VALUE);
469       }
470 
471       if (cmd.hasOption(OPT_KEY_WINDOW)) {
472         keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
473             0, Integer.MAX_VALUE);
474       }
475 
476       if (cmd.hasOption(OPT_MULTIGET)) {
477         multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
478             0, Integer.MAX_VALUE);
479       }
480 
481       System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
482       System.out.println("Percent of keys to verify: " + verifyPercent);
483       System.out.println("Reader threads: " + numReaderThreads);
484     }
485 
486     numTables = 1;
487     if (cmd.hasOption(NUM_TABLES)) {
488       numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
489     }
490 
491     numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
492     if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
493       numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
494     }
495 
496     regionReplication = 1;
497     if (cmd.hasOption(OPT_REGION_REPLICATION)) {
498       regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
499     }
500 
501     regionReplicaId = -1;
502     if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
503       regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
504     }
505   }
506 
507   private void parseColumnFamilyOptions(CommandLine cmd) {
508     String dataBlockEncodingStr = cmd.getOptionValue(OPT_DATA_BLOCK_ENCODING);
509     dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
510         DataBlockEncoding.valueOf(dataBlockEncodingStr);
511 
512     String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
513     compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
514         Compression.Algorithm.valueOf(compressStr);
515 
516     String bloomStr = cmd.getOptionValue(OPT_BLOOM);
517     bloomType = bloomStr == null ? BloomType.ROW :
518         BloomType.valueOf(bloomStr);
519 
520     inMemoryCF = cmd.hasOption(OPT_INMEMORY);
521     if (cmd.hasOption(OPT_ENCRYPTION)) {
522       cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
523     }
524 
525   }
526 
527   public void initTestTable() throws IOException {
528     Durability durability = Durability.USE_DEFAULT;
529     if (deferredLogFlush) {
530       durability = Durability.ASYNC_WAL;
531     }
532 
533     HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
534       getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
535         regionReplication, durability);
536     applyColumnFamilyOptions(tableName, getColumnFamilies());
537   }
538 
539   @Override
540   protected int doWork() throws IOException {
541     if (numTables > 1) {
542       return parallelLoadTables();
543     } else {
544       return loadTable();
545     }
546   }
547 
548   protected int loadTable() throws IOException {
549     if (cmd.hasOption(OPT_ZK_QUORUM)) {
550       conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
551     }
552     if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
553       conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
554     }
555 
556     if (isInitOnly) {
557       LOG.info("Initializing only; no reads or writes");
558       initTestTable();
559       return 0;
560     }
561 
562     if (!isSkipInit) {
563       initTestTable();
564     }
565     LoadTestDataGenerator dataGen = null;
566     if (cmd.hasOption(OPT_GENERATOR)) {
567       String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
568       dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
569       String[] args;
570       if (dataGen instanceof LoadTestDataGeneratorWithACL) {
571         LOG.info("Using LoadTestDataGeneratorWithACL");
572         if (User.isHBaseSecurityEnabled(conf)) {
573           LOG.info("Security is enabled");
574           authnFileName = clazzAndArgs[1];
575           superUser = clazzAndArgs[2];
576           userNames = clazzAndArgs[3];
577           args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
578           Properties authConfig = new Properties();
579           authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
580           try {
581             addAuthInfoToConf(authConfig, conf, superUser, userNames);
582           } catch (IOException exp) {
583             LOG.error(exp);
584             return EXIT_FAILURE;
585           }
586           userOwner = User.create(loginAndReturnUGI(conf, superUser));
587         } else {
588           superUser = clazzAndArgs[1];
589           userNames = clazzAndArgs[2];
590           args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
591           userOwner = User.createUserForTesting(conf, superUser, new String[0]);
592         }
593       } else {
594         args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
595             clazzAndArgs.length);
596       }
597       dataGen.initialize(args);
598     } else {
599       // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
600       dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
601           minColsPerKey, maxColsPerKey, families);
602     }
603 
604     if (userOwner != null) {
605       LOG.info("Granting permissions for user " + userOwner.getShortName());
606       Permission.Action[] actions = {
607         Permission.Action.ADMIN, Permission.Action.CREATE,
608         Permission.Action.READ, Permission.Action.WRITE };
609       try {
610         AccessControlClient.grant(ConnectionFactory.createConnection(conf),
611             tableName, userOwner.getShortName(), null, null, actions);
612       } catch (Throwable e) {
613         LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e);
614         return EXIT_FAILURE;
615       }
616     }
617 
618     if (userNames != null) {
619       // This will be comma separated list of expressions.
620       String users[] = userNames.split(",");
621       User user = null;
622       for (String userStr : users) {
623         if (User.isHBaseSecurityEnabled(conf)) {
624           user = User.create(loginAndReturnUGI(conf, userStr));
625         } else {
626           user = User.createUserForTesting(conf, userStr, new String[0]);
627         }
628       }
629     }
630 
631     if (isWrite) {
632       if (userOwner != null) {
633         writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
634       } else {
635         String writerClass = null;
636         if (cmd.hasOption(OPT_WRITER)) {
637           writerClass = cmd.getOptionValue(OPT_WRITER);
638         } else {
639           writerClass = MultiThreadedWriter.class.getCanonicalName();
640         }
641 
642         writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
643       }
644       writerThreads.setMultiPut(isMultiPut);
645     }
646 
647     if (isUpdate) {
648       if (userOwner != null) {
649         updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
650             userOwner, userNames);
651       } else {
652         String updaterClass = null;
653         if (cmd.hasOption(OPT_UPDATER)) {
654           updaterClass = cmd.getOptionValue(OPT_UPDATER);
655         } else {
656           updaterClass = MultiThreadedUpdater.class.getCanonicalName();
657         }
658         updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
659       }
660       updaterThreads.setBatchUpdate(isBatchUpdate);
661       updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
662     }
663 
664     if (isRead) {
665       if (userOwner != null) {
666         readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
667             userNames);
668       } else {
669         String readerClass = null;
670         if (cmd.hasOption(OPT_READER)) {
671           readerClass = cmd.getOptionValue(OPT_READER);
672         } else {
673           readerClass = MultiThreadedReader.class.getCanonicalName();
674         }
675         readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
676       }
677       readerThreads.setMaxErrors(maxReadErrors);
678       readerThreads.setKeyWindow(keyWindow);
679       readerThreads.setMultiGetBatchSize(multiGetBatchSize);
680       readerThreads.setRegionReplicaId(regionReplicaId);
681     }
682 
683     if (isUpdate && isWrite) {
684       LOG.info("Concurrent write/update workload: making updaters aware of the " +
685         "write point");
686       updaterThreads.linkToWriter(writerThreads);
687     }
688 
689     if (isRead && (isUpdate || isWrite)) {
690       LOG.info("Concurrent write/read workload: making readers aware of the " +
691         "write point");
692       readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
693     }
694 
695     if (isWrite) {
696       System.out.println("Starting to write data...");
697       writerThreads.start(startKey, endKey, numWriterThreads);
698     }
699 
700     if (isUpdate) {
701       LOG.info("Starting to mutate data...");
702       System.out.println("Starting to mutate data...");
703       // TODO : currently append and increment operations not tested with tags
704       // Will update this aftet it is done
705       updaterThreads.start(startKey, endKey, numUpdaterThreads);
706     }
707 
708     if (isRead) {
709       System.out.println("Starting to read data...");
710       readerThreads.start(startKey, endKey, numReaderThreads);
711     }
712 
713     if (isWrite) {
714       writerThreads.waitForFinish();
715     }
716 
717     if (isUpdate) {
718       updaterThreads.waitForFinish();
719     }
720 
721     if (isRead) {
722       readerThreads.waitForFinish();
723     }
724 
725     boolean success = true;
726     if (isWrite) {
727       success = success && writerThreads.getNumWriteFailures() == 0;
728     }
729     if (isUpdate) {
730       success = success && updaterThreads.getNumWriteFailures() == 0;
731     }
732     if (isRead) {
733       success = success && readerThreads.getNumReadErrors() == 0
734           && readerThreads.getNumReadFailures() == 0;
735     }
736     return success ? EXIT_SUCCESS : EXIT_FAILURE;
737   }
738 
739   private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
740     try {
741       Class<?> clazz = Class.forName(clazzName);
742       Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
743           byte[][].class);
744       return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
745           minColsPerKey, maxColsPerKey, families);
746     } catch (Exception e) {
747       throw new IOException(e);
748     }
749   }
750 
751   private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
752       , LoadTestDataGenerator dataGen) throws IOException {
753     try {
754       Class<?> clazz = Class.forName(clazzName);
755       Constructor<?> constructor = clazz.getConstructor(
756         LoadTestDataGenerator.class, Configuration.class, TableName.class);
757       return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
758     } catch (Exception e) {
759       throw new IOException(e);
760     }
761   }
762 
763   private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
764       , LoadTestDataGenerator dataGen) throws IOException {
765     try {
766       Class<?> clazz = Class.forName(clazzName);
767       Constructor<?> constructor = clazz.getConstructor(
768         LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
769       return (MultiThreadedUpdater) constructor.newInstance(
770         dataGen, conf, tableName, updatePercent);
771     } catch (Exception e) {
772       throw new IOException(e);
773     }
774   }
775 
776   private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
777       , LoadTestDataGenerator dataGen) throws IOException {
778     try {
779       Class<?> clazz = Class.forName(clazzName);
780       Constructor<?> constructor = clazz.getConstructor(
781         LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
782       return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
783     } catch (Exception e) {
784       throw new IOException(e);
785     }
786   }
787 
788   public static byte[] generateData(final Random r, int length) {
789     byte [] b = new byte [length];
790     int i = 0;
791 
792     for(i = 0; i < (length-8); i += 8) {
793       b[i] = (byte) (65 + r.nextInt(26));
794       b[i+1] = b[i];
795       b[i+2] = b[i];
796       b[i+3] = b[i];
797       b[i+4] = b[i];
798       b[i+5] = b[i];
799       b[i+6] = b[i];
800       b[i+7] = b[i];
801     }
802 
803     byte a = (byte) (65 + r.nextInt(26));
804     for(; i < length; i++) {
805       b[i] = a;
806     }
807     return b;
808   }
809   public static void main(String[] args) {
810     new LoadTestTool().doStaticMain(args);
811   }
812 
813   /**
814    * When NUM_TABLES is specified, the function starts multiple worker threads
815    * which individually start a LoadTestTool instance to load a table. Each
816    * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2"
817    * , table names will be "test_1", "test_2"
818    *
819    * @throws IOException
820    */
821   private int parallelLoadTables()
822       throws IOException {
823     // create new command args
824     String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
825     String[] newArgs = null;
826     if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
827       newArgs = new String[cmdLineArgs.length + 2];
828       newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
829       newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
830       System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
831     } else {
832       newArgs = cmdLineArgs;
833     }
834 
835     int tableNameValueIndex = -1;
836     for (int j = 0; j < newArgs.length; j++) {
837       if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
838         tableNameValueIndex = j + 1;
839       } else if (newArgs[j].endsWith(NUM_TABLES)) {
840         // change NUM_TABLES to 1 so that each worker loads one table
841         newArgs[j + 1] = "1";
842       }
843     }
844 
845     // starting to load multiple tables
846     List<WorkerThread> workers = new ArrayList<WorkerThread>();
847     for (int i = 0; i < numTables; i++) {
848       String[] workerArgs = newArgs.clone();
849       workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
850       WorkerThread worker = new WorkerThread(i, workerArgs);
851       workers.add(worker);
852       LOG.info(worker + " starting");
853       worker.start();
854     }
855 
856     // wait for all workers finish
857     LOG.info("Waiting for worker threads to finish");
858     for (WorkerThread t : workers) {
859       try {
860         t.join();
861       } catch (InterruptedException ie) {
862         IOException iie = new InterruptedIOException();
863         iie.initCause(ie);
864         throw iie;
865       }
866       checkForErrors();
867     }
868 
869     return EXIT_SUCCESS;
870   }
871 
872   // If an exception is thrown by one of worker threads, it will be
873   // stored here.
874   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
875 
876   private void workerThreadError(Throwable t) {
877     thrown.compareAndSet(null, t);
878   }
879 
880   /**
881    * Check for errors in the writer threads. If any is found, rethrow it.
882    */
883   private void checkForErrors() throws IOException {
884     Throwable thrown = this.thrown.get();
885     if (thrown == null) return;
886     if (thrown instanceof IOException) {
887       throw (IOException) thrown;
888     } else {
889       throw new RuntimeException(thrown);
890     }
891   }
892 
893   class WorkerThread extends Thread {
894     private String[] workerArgs;
895 
896     WorkerThread(int i, String[] args) {
897       super("WorkerThread-" + i);
898       workerArgs = args;
899     }
900 
901     @Override
902     public void run() {
903       try {
904         int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
905         if (ret != 0) {
906           throw new RuntimeException("LoadTestTool exit with non-zero return code.");
907         }
908       } catch (Exception ex) {
909         LOG.error("Error in worker thread", ex);
910         workerThreadError(ex);
911       }
912     }
913   }
914 
915   private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
916       String userList) throws IOException {
917     List<String> users = new ArrayList(Arrays.asList(userList.split(",")));
918     users.add(owner);
919     for (String user : users) {
920       String keyTabFileConfKey = "hbase." + user + ".keytab.file";
921       String principalConfKey = "hbase." + user + ".kerberos.principal";
922       if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
923         throw new IOException("Authentication configs missing for user : " + user);
924       }
925     }
926     for (String key : authConfig.stringPropertyNames()) {
927       conf.set(key, authConfig.getProperty(key));
928     }
929     LOG.debug("Added authentication properties to config successfully.");
930   }
931 
932   public static UserGroupInformation loginAndReturnUGI(Configuration conf, String username)
933       throws IOException {
934     String hostname = InetAddress.getLocalHost().getHostName();
935     String keyTabFileConfKey = "hbase." + username + ".keytab.file";
936     String keyTabFileLocation = conf.get(keyTabFileConfKey);
937     String principalConfKey = "hbase." + username + ".kerberos.principal";
938     String principal = SecurityUtil.getServerPrincipal(conf.get(principalConfKey), hostname);
939     if (keyTabFileLocation == null || principal == null) {
940       LOG.warn("Principal or key tab file null for : " + principalConfKey + ", "
941           + keyTabFileConfKey);
942     }
943     UserGroupInformation ugi =
944         UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
945     return ugi;
946   }
947 }