1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import java.io.IOException;
20 import java.io.InterruptedIOException;
21 import java.lang.reflect.Constructor;
22 import java.net.InetAddress;
23 import java.security.SecureRandom;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.Random;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import javax.crypto.spec.SecretKeySpec;
32
33 import org.apache.commons.cli.CommandLine;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HTableDescriptor;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.hbase.client.Admin;
46 import org.apache.hadoop.hbase.client.ConnectionFactory;
47 import org.apache.hadoop.hbase.client.Durability;
48 import org.apache.hadoop.hbase.client.HBaseAdmin;
49 import org.apache.hadoop.hbase.io.compress.Compression;
50 import org.apache.hadoop.hbase.io.crypto.Cipher;
51 import org.apache.hadoop.hbase.io.crypto.Encryption;
52 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
53 import org.apache.hadoop.hbase.regionserver.BloomType;
54 import org.apache.hadoop.hbase.security.EncryptionUtil;
55 import org.apache.hadoop.hbase.security.User;
56 import org.apache.hadoop.hbase.security.access.AccessControlClient;
57 import org.apache.hadoop.hbase.security.access.Permission;
58 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
59 import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
60 import org.apache.hadoop.security.SecurityUtil;
61 import org.apache.hadoop.security.UserGroupInformation;
62 import org.apache.hadoop.util.ToolRunner;
63
64
65
66
67
68
69 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
70 public class LoadTestTool extends AbstractHBaseTool {
71
72 private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
73 private static final String COLON = ":";
74
75
76 private TableName tableName;
77
78
79 private byte[][] families;
80
81
82 protected static final String DEFAULT_TABLE_NAME = "cluster_test";
83
84
85 public static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf");
86
87
88 public static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY };
89
90
91 protected static final int DEFAULT_DATA_SIZE = 64;
92
93
94 protected static final int DEFAULT_NUM_THREADS = 20;
95
96
97 protected static final String OPT_USAGE_LOAD =
98 "<avg_cols_per_key>:<avg_data_size>" +
99 "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
100
101
102 protected static final String OPT_USAGE_READ =
103 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
104
105
106 protected static final String OPT_USAGE_UPDATE =
107 "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
108 + ">][:<#whether to ignore nonce collisions=0>]";
109
110 protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
111 Arrays.toString(BloomType.values());
112
113 protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
114 "one of " + Arrays.toString(Compression.Algorithm.values());
115
116 public static final String OPT_DATA_BLOCK_ENCODING_USAGE =
117 "Encoding algorithm (e.g. prefix "
118 + "compression) to use for data blocks in the test column family, "
119 + "one of " + Arrays.toString(DataBlockEncoding.values()) + ".";
120
121 public static final String OPT_BLOOM = "bloom";
122 public static final String OPT_COMPRESSION = "compression";
123 public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
124 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
125
126 public static final String OPT_DATA_BLOCK_ENCODING =
127 HColumnDescriptor.DATA_BLOCK_ENCODING.toLowerCase();
128
129 public static final String OPT_INMEMORY = "in_memory";
130 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
131 "inmemory as far as possible. Not guaranteed that reads are always served from inmemory";
132
133 public static final String OPT_GENERATOR = "generator";
134 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
135 + " Any args for this class can be passed as colon separated after class name";
136
137 public static final String OPT_WRITER = "writer";
138 public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
139
140 public static final String OPT_UPDATER = "updater";
141 public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
142
143 public static final String OPT_READER = "reader";
144 public static final String OPT_READER_USAGE = "The class for executing the read requests";
145
146 protected static final String OPT_KEY_WINDOW = "key_window";
147 protected static final String OPT_WRITE = "write";
148 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
149 public static final String OPT_MULTIPUT = "multiput";
150 public static final String OPT_MULTIGET = "multiget_batchsize";
151 protected static final String OPT_NUM_KEYS = "num_keys";
152 protected static final String OPT_READ = "read";
153 protected static final String OPT_START_KEY = "start_key";
154 public static final String OPT_TABLE_NAME = "tn";
155 public static final String OPT_COLUMN_FAMILIES = "families";
156 protected static final String OPT_ZK_QUORUM = "zk";
157 protected static final String OPT_ZK_PARENT_NODE = "zk_root";
158 protected static final String OPT_SKIP_INIT = "skip_init";
159 protected static final String OPT_INIT_ONLY = "init_only";
160 protected static final String NUM_TABLES = "num_tables";
161 protected static final String OPT_REGIONS_PER_SERVER = "regions_per_server";
162 protected static final String OPT_BATCHUPDATE = "batchupdate";
163 protected static final String OPT_UPDATE = "update";
164
165 public static final String OPT_ENCRYPTION = "encryption";
166 protected static final String OPT_ENCRYPTION_USAGE =
167 "Enables transparent encryption on the test table, one of " +
168 Arrays.toString(Encryption.getSupportedCiphers());
169
170 public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
171 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
172 = "Desired number of regions per region server. Defaults to 5.";
173 public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
174
175 public static final String OPT_REGION_REPLICATION = "region_replication";
176 protected static final String OPT_REGION_REPLICATION_USAGE =
177 "Desired number of replicas per region";
178
179 public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
180 protected static final String OPT_REGION_REPLICA_ID_USAGE =
181 "Region replica id to do the reads from";
182
183 protected static final long DEFAULT_START_KEY = 0;
184
185
186 protected CommandLine cmd;
187
188 protected MultiThreadedWriter writerThreads = null;
189 protected MultiThreadedReader readerThreads = null;
190 protected MultiThreadedUpdater updaterThreads = null;
191
192 protected long startKey, endKey;
193
194 protected boolean isWrite, isRead, isUpdate;
195 protected boolean deferredLogFlush;
196
197
198 protected DataBlockEncoding dataBlockEncodingAlgo;
199 protected Compression.Algorithm compressAlgo;
200 protected BloomType bloomType;
201 private boolean inMemoryCF;
202
203 private User userOwner;
204
205 protected int numWriterThreads = DEFAULT_NUM_THREADS;
206 protected int minColsPerKey, maxColsPerKey;
207 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
208 protected boolean isMultiPut;
209
210
211 protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
212 protected int updatePercent;
213 protected boolean ignoreConflicts = false;
214 protected boolean isBatchUpdate;
215
216
217 private int numReaderThreads = DEFAULT_NUM_THREADS;
218 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
219 private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
220 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
221 private int verifyPercent;
222
223 private int numTables = 1;
224
225 private String superUser;
226
227 private String userNames;
228
229 private String authnFileName;
230
231 private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
232 private int regionReplication = -1;
233 private int regionReplicaId = -1;
234
235
236
237 protected boolean isSkipInit = false;
238 protected boolean isInitOnly = false;
239
240 protected Cipher cipher = null;
241
242 protected String[] splitColonSeparated(String option,
243 int minNumCols, int maxNumCols) {
244 String optVal = cmd.getOptionValue(option);
245 String[] cols = optVal.split(COLON);
246 if (cols.length < minNumCols || cols.length > maxNumCols) {
247 throw new IllegalArgumentException("Expected at least "
248 + minNumCols + " columns but no more than " + maxNumCols +
249 " in the colon-separated value '" + optVal + "' of the " +
250 "-" + option + " option");
251 }
252 return cols;
253 }
254
255 protected int getNumThreads(String numThreadsStr) {
256 return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
257 }
258
259 public byte[][] getColumnFamilies() {
260 return families;
261 }
262
263
264
265
266
267 protected void applyColumnFamilyOptions(TableName tableName,
268 byte[][] columnFamilies) throws IOException {
269 Admin admin = new HBaseAdmin(conf);
270 HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
271 LOG.info("Disabling table " + tableName);
272 admin.disableTable(tableName);
273 for (byte[] cf : columnFamilies) {
274 HColumnDescriptor columnDesc = tableDesc.getFamily(cf);
275 boolean isNewCf = columnDesc == null;
276 if (isNewCf) {
277 columnDesc = new HColumnDescriptor(cf);
278 }
279 if (bloomType != null) {
280 columnDesc.setBloomFilterType(bloomType);
281 }
282 if (compressAlgo != null) {
283 columnDesc.setCompressionType(compressAlgo);
284 }
285 if (dataBlockEncodingAlgo != null) {
286 columnDesc.setDataBlockEncoding(dataBlockEncodingAlgo);
287 }
288 if (inMemoryCF) {
289 columnDesc.setInMemory(inMemoryCF);
290 }
291 if (cipher != null) {
292 byte[] keyBytes = new byte[cipher.getKeyLength()];
293 new SecureRandom().nextBytes(keyBytes);
294 columnDesc.setEncryptionType(cipher.getName());
295 columnDesc.setEncryptionKey(EncryptionUtil.wrapKey(conf,
296 User.getCurrent().getShortName(),
297 new SecretKeySpec(keyBytes, cipher.getName())));
298 }
299 if (isNewCf) {
300 admin.addColumn(tableName, columnDesc);
301 } else {
302 admin.modifyColumn(tableName, columnDesc);
303 }
304 }
305 LOG.info("Enabling table " + tableName);
306 admin.enableTable(tableName);
307 admin.close();
308 }
309
310 @Override
311 protected void addOptions() {
312 addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
313 "without port numbers");
314 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
315 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
316 addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
317 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
318 addOptWithArg(OPT_READ, OPT_USAGE_READ);
319 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
320 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
321 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
322 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
323 addOptWithArg(OPT_DATA_BLOCK_ENCODING, OPT_DATA_BLOCK_ENCODING_USAGE);
324 addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
325 "to tolerate before terminating all reader threads. The default is " +
326 MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
327 addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
328 "separate gets for every column in a row");
329 addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
330 "reads and writes for concurrent write/read workload. The default " +
331 "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
332
333 addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
334 "separate puts for every column in a row");
335 addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
336 "separate updates for every column in a row");
337 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
338 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
339 addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
340 addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
341 addOptWithArg(OPT_READER, OPT_READER_USAGE);
342
343 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
344 addOptWithArg(OPT_START_KEY, "The first key to read/write " +
345 "(a 0-based index). The default value is " +
346 DEFAULT_START_KEY + ".");
347 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
348 + "already exists");
349
350 addOptWithArg(NUM_TABLES,
351 "A positive integer number. When a number n is speicfied, load test "
352 + "tool will load n table parallely. -tn parameter value becomes "
353 + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
354
355 addOptWithArg(OPT_REGIONS_PER_SERVER,
356 "A positive integer number. When a number n is specified, load test "
357 + "tool will create the test table with n regions per server");
358
359 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
360 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
361 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
362 addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
363 addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
364 }
365
366 @Override
367 protected void processOptions(CommandLine cmd) {
368 this.cmd = cmd;
369
370 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
371 DEFAULT_TABLE_NAME));
372
373 if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
374 String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
375 families = new byte[list.length][];
376 for (int i = 0; i < list.length; i++) {
377 families[i] = Bytes.toBytes(list[i]);
378 }
379 } else {
380 families = DEFAULT_COLUMN_FAMILIES;
381 }
382
383 isWrite = cmd.hasOption(OPT_WRITE);
384 isRead = cmd.hasOption(OPT_READ);
385 isUpdate = cmd.hasOption(OPT_UPDATE);
386 isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
387 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
388
389 if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
390 throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
391 "-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified");
392 }
393
394 if (isInitOnly && (isRead || isWrite || isUpdate)) {
395 throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
396 + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
397 }
398
399 if (!isInitOnly) {
400 if (!cmd.hasOption(OPT_NUM_KEYS)) {
401 throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in "
402 + "read or write mode");
403 }
404 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
405 String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
406 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
407 Long.MAX_VALUE - startKey);
408 endKey = startKey + numKeys;
409 isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
410 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
411 }
412
413 parseColumnFamilyOptions(cmd);
414
415 if (isWrite) {
416 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
417
418 int colIndex = 0;
419 minColsPerKey = 1;
420 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
421 int avgColDataSize =
422 parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
423 minColDataSize = avgColDataSize / 2;
424 maxColDataSize = avgColDataSize * 3 / 2;
425
426 if (colIndex < writeOpts.length) {
427 numWriterThreads = getNumThreads(writeOpts[colIndex++]);
428 }
429
430 isMultiPut = cmd.hasOption(OPT_MULTIPUT);
431
432 System.out.println("Multi-puts: " + isMultiPut);
433 System.out.println("Columns per key: " + minColsPerKey + ".."
434 + maxColsPerKey);
435 System.out.println("Data size per column: " + minColDataSize + ".."
436 + maxColDataSize);
437 }
438
439 if (isUpdate) {
440 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
441 int colIndex = 0;
442 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
443 if (colIndex < mutateOpts.length) {
444 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
445 }
446 if (colIndex < mutateOpts.length) {
447 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
448 }
449
450 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
451
452 System.out.println("Batch updates: " + isBatchUpdate);
453 System.out.println("Percent of keys to update: " + updatePercent);
454 System.out.println("Updater threads: " + numUpdaterThreads);
455 System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
456 }
457
458 if (isRead) {
459 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
460 int colIndex = 0;
461 verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
462 if (colIndex < readOpts.length) {
463 numReaderThreads = getNumThreads(readOpts[colIndex++]);
464 }
465
466 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
467 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
468 0, Integer.MAX_VALUE);
469 }
470
471 if (cmd.hasOption(OPT_KEY_WINDOW)) {
472 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
473 0, Integer.MAX_VALUE);
474 }
475
476 if (cmd.hasOption(OPT_MULTIGET)) {
477 multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
478 0, Integer.MAX_VALUE);
479 }
480
481 System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
482 System.out.println("Percent of keys to verify: " + verifyPercent);
483 System.out.println("Reader threads: " + numReaderThreads);
484 }
485
486 numTables = 1;
487 if (cmd.hasOption(NUM_TABLES)) {
488 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
489 }
490
491 numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
492 if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
493 numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
494 }
495
496 regionReplication = 1;
497 if (cmd.hasOption(OPT_REGION_REPLICATION)) {
498 regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
499 }
500
501 regionReplicaId = -1;
502 if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
503 regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
504 }
505 }
506
507 private void parseColumnFamilyOptions(CommandLine cmd) {
508 String dataBlockEncodingStr = cmd.getOptionValue(OPT_DATA_BLOCK_ENCODING);
509 dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
510 DataBlockEncoding.valueOf(dataBlockEncodingStr);
511
512 String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
513 compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
514 Compression.Algorithm.valueOf(compressStr);
515
516 String bloomStr = cmd.getOptionValue(OPT_BLOOM);
517 bloomType = bloomStr == null ? BloomType.ROW :
518 BloomType.valueOf(bloomStr);
519
520 inMemoryCF = cmd.hasOption(OPT_INMEMORY);
521 if (cmd.hasOption(OPT_ENCRYPTION)) {
522 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
523 }
524
525 }
526
527 public void initTestTable() throws IOException {
528 Durability durability = Durability.USE_DEFAULT;
529 if (deferredLogFlush) {
530 durability = Durability.ASYNC_WAL;
531 }
532
533 HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
534 getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
535 regionReplication, durability);
536 applyColumnFamilyOptions(tableName, getColumnFamilies());
537 }
538
539 @Override
540 protected int doWork() throws IOException {
541 if (numTables > 1) {
542 return parallelLoadTables();
543 } else {
544 return loadTable();
545 }
546 }
547
548 protected int loadTable() throws IOException {
549 if (cmd.hasOption(OPT_ZK_QUORUM)) {
550 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
551 }
552 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
553 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
554 }
555
556 if (isInitOnly) {
557 LOG.info("Initializing only; no reads or writes");
558 initTestTable();
559 return 0;
560 }
561
562 if (!isSkipInit) {
563 initTestTable();
564 }
565 LoadTestDataGenerator dataGen = null;
566 if (cmd.hasOption(OPT_GENERATOR)) {
567 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
568 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
569 String[] args;
570 if (dataGen instanceof LoadTestDataGeneratorWithACL) {
571 LOG.info("Using LoadTestDataGeneratorWithACL");
572 if (User.isHBaseSecurityEnabled(conf)) {
573 LOG.info("Security is enabled");
574 authnFileName = clazzAndArgs[1];
575 superUser = clazzAndArgs[2];
576 userNames = clazzAndArgs[3];
577 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
578 Properties authConfig = new Properties();
579 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
580 try {
581 addAuthInfoToConf(authConfig, conf, superUser, userNames);
582 } catch (IOException exp) {
583 LOG.error(exp);
584 return EXIT_FAILURE;
585 }
586 userOwner = User.create(loginAndReturnUGI(conf, superUser));
587 } else {
588 superUser = clazzAndArgs[1];
589 userNames = clazzAndArgs[2];
590 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
591 userOwner = User.createUserForTesting(conf, superUser, new String[0]);
592 }
593 } else {
594 args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
595 clazzAndArgs.length);
596 }
597 dataGen.initialize(args);
598 } else {
599
600 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
601 minColsPerKey, maxColsPerKey, families);
602 }
603
604 if (userOwner != null) {
605 LOG.info("Granting permissions for user " + userOwner.getShortName());
606 Permission.Action[] actions = {
607 Permission.Action.ADMIN, Permission.Action.CREATE,
608 Permission.Action.READ, Permission.Action.WRITE };
609 try {
610 AccessControlClient.grant(ConnectionFactory.createConnection(conf),
611 tableName, userOwner.getShortName(), null, null, actions);
612 } catch (Throwable e) {
613 LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e);
614 return EXIT_FAILURE;
615 }
616 }
617
618 if (userNames != null) {
619
620 String users[] = userNames.split(",");
621 User user = null;
622 for (String userStr : users) {
623 if (User.isHBaseSecurityEnabled(conf)) {
624 user = User.create(loginAndReturnUGI(conf, userStr));
625 } else {
626 user = User.createUserForTesting(conf, userStr, new String[0]);
627 }
628 }
629 }
630
631 if (isWrite) {
632 if (userOwner != null) {
633 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
634 } else {
635 String writerClass = null;
636 if (cmd.hasOption(OPT_WRITER)) {
637 writerClass = cmd.getOptionValue(OPT_WRITER);
638 } else {
639 writerClass = MultiThreadedWriter.class.getCanonicalName();
640 }
641
642 writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
643 }
644 writerThreads.setMultiPut(isMultiPut);
645 }
646
647 if (isUpdate) {
648 if (userOwner != null) {
649 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
650 userOwner, userNames);
651 } else {
652 String updaterClass = null;
653 if (cmd.hasOption(OPT_UPDATER)) {
654 updaterClass = cmd.getOptionValue(OPT_UPDATER);
655 } else {
656 updaterClass = MultiThreadedUpdater.class.getCanonicalName();
657 }
658 updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
659 }
660 updaterThreads.setBatchUpdate(isBatchUpdate);
661 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
662 }
663
664 if (isRead) {
665 if (userOwner != null) {
666 readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
667 userNames);
668 } else {
669 String readerClass = null;
670 if (cmd.hasOption(OPT_READER)) {
671 readerClass = cmd.getOptionValue(OPT_READER);
672 } else {
673 readerClass = MultiThreadedReader.class.getCanonicalName();
674 }
675 readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
676 }
677 readerThreads.setMaxErrors(maxReadErrors);
678 readerThreads.setKeyWindow(keyWindow);
679 readerThreads.setMultiGetBatchSize(multiGetBatchSize);
680 readerThreads.setRegionReplicaId(regionReplicaId);
681 }
682
683 if (isUpdate && isWrite) {
684 LOG.info("Concurrent write/update workload: making updaters aware of the " +
685 "write point");
686 updaterThreads.linkToWriter(writerThreads);
687 }
688
689 if (isRead && (isUpdate || isWrite)) {
690 LOG.info("Concurrent write/read workload: making readers aware of the " +
691 "write point");
692 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
693 }
694
695 if (isWrite) {
696 System.out.println("Starting to write data...");
697 writerThreads.start(startKey, endKey, numWriterThreads);
698 }
699
700 if (isUpdate) {
701 LOG.info("Starting to mutate data...");
702 System.out.println("Starting to mutate data...");
703
704
705 updaterThreads.start(startKey, endKey, numUpdaterThreads);
706 }
707
708 if (isRead) {
709 System.out.println("Starting to read data...");
710 readerThreads.start(startKey, endKey, numReaderThreads);
711 }
712
713 if (isWrite) {
714 writerThreads.waitForFinish();
715 }
716
717 if (isUpdate) {
718 updaterThreads.waitForFinish();
719 }
720
721 if (isRead) {
722 readerThreads.waitForFinish();
723 }
724
725 boolean success = true;
726 if (isWrite) {
727 success = success && writerThreads.getNumWriteFailures() == 0;
728 }
729 if (isUpdate) {
730 success = success && updaterThreads.getNumWriteFailures() == 0;
731 }
732 if (isRead) {
733 success = success && readerThreads.getNumReadErrors() == 0
734 && readerThreads.getNumReadFailures() == 0;
735 }
736 return success ? EXIT_SUCCESS : EXIT_FAILURE;
737 }
738
739 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
740 try {
741 Class<?> clazz = Class.forName(clazzName);
742 Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
743 byte[][].class);
744 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
745 minColsPerKey, maxColsPerKey, families);
746 } catch (Exception e) {
747 throw new IOException(e);
748 }
749 }
750
751 private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
752 , LoadTestDataGenerator dataGen) throws IOException {
753 try {
754 Class<?> clazz = Class.forName(clazzName);
755 Constructor<?> constructor = clazz.getConstructor(
756 LoadTestDataGenerator.class, Configuration.class, TableName.class);
757 return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
758 } catch (Exception e) {
759 throw new IOException(e);
760 }
761 }
762
763 private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
764 , LoadTestDataGenerator dataGen) throws IOException {
765 try {
766 Class<?> clazz = Class.forName(clazzName);
767 Constructor<?> constructor = clazz.getConstructor(
768 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
769 return (MultiThreadedUpdater) constructor.newInstance(
770 dataGen, conf, tableName, updatePercent);
771 } catch (Exception e) {
772 throw new IOException(e);
773 }
774 }
775
776 private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
777 , LoadTestDataGenerator dataGen) throws IOException {
778 try {
779 Class<?> clazz = Class.forName(clazzName);
780 Constructor<?> constructor = clazz.getConstructor(
781 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
782 return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
783 } catch (Exception e) {
784 throw new IOException(e);
785 }
786 }
787
788 public static byte[] generateData(final Random r, int length) {
789 byte [] b = new byte [length];
790 int i = 0;
791
792 for(i = 0; i < (length-8); i += 8) {
793 b[i] = (byte) (65 + r.nextInt(26));
794 b[i+1] = b[i];
795 b[i+2] = b[i];
796 b[i+3] = b[i];
797 b[i+4] = b[i];
798 b[i+5] = b[i];
799 b[i+6] = b[i];
800 b[i+7] = b[i];
801 }
802
803 byte a = (byte) (65 + r.nextInt(26));
804 for(; i < length; i++) {
805 b[i] = a;
806 }
807 return b;
808 }
809 public static void main(String[] args) {
810 new LoadTestTool().doStaticMain(args);
811 }
812
813
814
815
816
817
818
819
820
821 private int parallelLoadTables()
822 throws IOException {
823
824 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
825 String[] newArgs = null;
826 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
827 newArgs = new String[cmdLineArgs.length + 2];
828 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
829 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
830 System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
831 } else {
832 newArgs = cmdLineArgs;
833 }
834
835 int tableNameValueIndex = -1;
836 for (int j = 0; j < newArgs.length; j++) {
837 if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
838 tableNameValueIndex = j + 1;
839 } else if (newArgs[j].endsWith(NUM_TABLES)) {
840
841 newArgs[j + 1] = "1";
842 }
843 }
844
845
846 List<WorkerThread> workers = new ArrayList<WorkerThread>();
847 for (int i = 0; i < numTables; i++) {
848 String[] workerArgs = newArgs.clone();
849 workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
850 WorkerThread worker = new WorkerThread(i, workerArgs);
851 workers.add(worker);
852 LOG.info(worker + " starting");
853 worker.start();
854 }
855
856
857 LOG.info("Waiting for worker threads to finish");
858 for (WorkerThread t : workers) {
859 try {
860 t.join();
861 } catch (InterruptedException ie) {
862 IOException iie = new InterruptedIOException();
863 iie.initCause(ie);
864 throw iie;
865 }
866 checkForErrors();
867 }
868
869 return EXIT_SUCCESS;
870 }
871
872
873
874 protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
875
876 private void workerThreadError(Throwable t) {
877 thrown.compareAndSet(null, t);
878 }
879
880
881
882
883 private void checkForErrors() throws IOException {
884 Throwable thrown = this.thrown.get();
885 if (thrown == null) return;
886 if (thrown instanceof IOException) {
887 throw (IOException) thrown;
888 } else {
889 throw new RuntimeException(thrown);
890 }
891 }
892
893 class WorkerThread extends Thread {
894 private String[] workerArgs;
895
896 WorkerThread(int i, String[] args) {
897 super("WorkerThread-" + i);
898 workerArgs = args;
899 }
900
901 @Override
902 public void run() {
903 try {
904 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
905 if (ret != 0) {
906 throw new RuntimeException("LoadTestTool exit with non-zero return code.");
907 }
908 } catch (Exception ex) {
909 LOG.error("Error in worker thread", ex);
910 workerThreadError(ex);
911 }
912 }
913 }
914
915 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
916 String userList) throws IOException {
917 List<String> users = new ArrayList(Arrays.asList(userList.split(",")));
918 users.add(owner);
919 for (String user : users) {
920 String keyTabFileConfKey = "hbase." + user + ".keytab.file";
921 String principalConfKey = "hbase." + user + ".kerberos.principal";
922 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
923 throw new IOException("Authentication configs missing for user : " + user);
924 }
925 }
926 for (String key : authConfig.stringPropertyNames()) {
927 conf.set(key, authConfig.getProperty(key));
928 }
929 LOG.debug("Added authentication properties to config successfully.");
930 }
931
932 public static UserGroupInformation loginAndReturnUGI(Configuration conf, String username)
933 throws IOException {
934 String hostname = InetAddress.getLocalHost().getHostName();
935 String keyTabFileConfKey = "hbase." + username + ".keytab.file";
936 String keyTabFileLocation = conf.get(keyTabFileConfKey);
937 String principalConfKey = "hbase." + username + ".kerberos.principal";
938 String principal = SecurityUtil.getServerPrincipal(conf.get(principalConfKey), hostname);
939 if (keyTabFileLocation == null || principal == null) {
940 LOG.warn("Principal or key tab file null for : " + principalConfKey + ", "
941 + keyTabFileConfKey);
942 }
943 UserGroupInformation ugi =
944 UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
945 return ugi;
946 }
947 }