1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.Set;
23
24 import org.apache.commons.cli.CommandLine;
25 import org.apache.commons.lang.StringUtils;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
31 import org.apache.hadoop.hbase.regionserver.HStore;
32 import org.apache.hadoop.hbase.regionserver.StoreEngine;
33 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
34 import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
35 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.MultiThreadedAction;
38 import org.apache.hadoop.hbase.util.MultiThreadedReader;
39 import org.apache.hadoop.hbase.util.MultiThreadedWriter;
40 import org.apache.hadoop.hbase.util.RegionSplitter;
41 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
42 import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
43 import org.junit.Assert;
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
51 private static final Log LOG = LogFactory.getLog(StripeCompactionsPerformanceEvaluation.class);
52 private static final TableName TABLE_NAME =
53 TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName());
54 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
55 private static final int MIN_NUM_SERVERS = 1;
56
57
58 private static final String DATAGEN_KEY = "datagen";
59 private static final String ITERATIONS_KEY = "iters";
60 private static final String PRELOAD_COUNT_KEY = "pwk";
61 private static final String WRITE_COUNT_KEY = "wk";
62 private static final String WRITE_THREADS_KEY = "wt";
63 private static final String READ_THREADS_KEY = "rt";
64 private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes";
65 private static final String SPLIT_SIZE_KEY = "splitsize";
66 private static final String SPLIT_PARTS_KEY = "splitparts";
67 private static final String VALUE_SIZE_KEY = "valsize";
68 private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards";
69
70
71 private LoadTestDataGenerator dataGen;
72 private int iterationCount;
73 private long preloadKeys;
74 private long writeKeys;
75 private int writeThreads;
76 private int readThreads;
77 private Long initialStripeCount;
78 private Long splitSize;
79 private Long splitParts;
80
81 private static final String VALUE_SIZE_DEFAULT = "512:4096";
82
83 protected IntegrationTestingUtility util = new IntegrationTestingUtility();
84
85 @Override
86 protected void addOptions() {
87 addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)");
88 addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many"
89 + " sequences. The number of such shards per server is specified (default is 1).");
90 addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare");
91 addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server");
92 addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server");
93 addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing");
94 addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading");
95 addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially");
96 addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes");
97 addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits");
98 addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;"
99 + " default " + VALUE_SIZE_DEFAULT);
100 }
101
102 @Override
103 protected void processOptions(CommandLine cmd) {
104 int minValueSize = 0, maxValueSize = 0;
105 String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
106 if (valueSize.contains(":")) {
107 String[] valueSizes = valueSize.split(":");
108 if (valueSize.length() != 2) throw new RuntimeException("Invalid value size: " + valueSize);
109 minValueSize = Integer.parseInt(valueSizes[0]);
110 maxValueSize = Integer.parseInt(valueSizes[1]);
111 } else {
112 minValueSize = maxValueSize = Integer.parseInt(valueSize);
113 }
114 String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase();
115 if ("default".equals(datagen)) {
116 dataGen = new MultiThreadedAction.DefaultDataGenerator(
117 minValueSize, maxValueSize, 1, 1, new byte[][] { COLUMN_FAMILY });
118 } else if ("sequential".equals(datagen)) {
119 int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1"));
120 dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards);
121 } else {
122 throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen);
123 }
124 iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1"));
125 preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000"));
126 writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000"));
127 writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10"));
128 readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20"));
129 initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY);
130 splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY);
131 splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY);
132 }
133
134 private Long getLongOrNull(CommandLine cmd, String option) {
135 if (!cmd.hasOption(option)) return null;
136 return Long.parseLong(cmd.getOptionValue(option));
137 }
138
139 @Override
140 public Configuration getConf() {
141 Configuration c = super.getConf();
142 if (c == null && util != null) {
143 conf = util.getConfiguration();
144 c = conf;
145 }
146 return c;
147 }
148
149 @Override
150 protected int doWork() throws Exception {
151 setUp();
152 try {
153 boolean isStripe = true;
154 for (int i = 0; i < iterationCount * 2; ++i) {
155 createTable(isStripe);
156 runOneTest((isStripe ? "Stripe" : "Default") + i, conf);
157 isStripe = !isStripe;
158 }
159 return 0;
160 } finally {
161 tearDown();
162 }
163 }
164
165
166 private void setUp() throws Exception {
167 this.util = new IntegrationTestingUtility();
168 LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers");
169 util.initializeCluster(MIN_NUM_SERVERS);
170 LOG.debug("Done initializing/checking cluster");
171 }
172
173 protected void deleteTable() throws Exception {
174 if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
175 LOG.info("Deleting table");
176 if (!util.getHBaseAdmin().isTableDisabled(TABLE_NAME)) {
177 util.getHBaseAdmin().disableTable(TABLE_NAME);
178 }
179 util.getHBaseAdmin().deleteTable(TABLE_NAME);
180 LOG.info("Deleted table");
181 }
182 }
183
184 private void createTable(boolean isStripe) throws Exception {
185 createTable(createHtd(isStripe));
186 }
187
188 private void tearDown() throws Exception {
189 deleteTable();
190 LOG.info("Restoring the cluster");
191 util.restoreCluster();
192 LOG.info("Done restoring the cluster");
193 }
194
195 private void runOneTest(String description, Configuration conf) throws Exception {
196 int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
197 long startKey = (long)preloadKeys * numServers;
198 long endKey = startKey + (long)writeKeys * numServers;
199 status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
200 description, numServers, startKey, endKey));
201
202 if (preloadKeys > 0) {
203 MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
204 long time = System.currentTimeMillis();
205 preloader.start(0, startKey, writeThreads);
206 preloader.waitForFinish();
207 if (preloader.getNumWriteFailures() > 0) {
208 throw new IOException("Preload failed");
209 }
210 int waitTime = (int)Math.min(preloadKeys / 100, 30000);
211 status(description + " preload took " + (System.currentTimeMillis()-time)/1000
212 + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
213 Thread.sleep(waitTime);
214 }
215
216 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
217 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
218
219 reader.linkToWriter(writer);
220
221 long testStartTime = System.currentTimeMillis();
222 writer.start(startKey, endKey, writeThreads);
223 reader.start(startKey, endKey, readThreads);
224 writer.waitForFinish();
225 reader.waitForFinish();
226
227
228 status("Readers and writers stopped for test " + description);
229
230 boolean success = writer.getNumWriteFailures() == 0;
231 if (!success) {
232 LOG.error("Write failed");
233 } else {
234 success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
235 if (!success) {
236 LOG.error("Read failed");
237 }
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255 status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
256 Assert.assertTrue(success);
257 }
258
259 private static void status(String s) {
260 LOG.info("STATUS " + s);
261 System.out.println(s);
262 }
263
264 private HTableDescriptor createHtd(boolean isStripe) throws Exception {
265 HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
266 htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
267 String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName();
268 htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy);
269 if (isStripe) {
270 htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
271 if (initialStripeCount != null) {
272 htd.setConfiguration(
273 StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString());
274 htd.setConfiguration(
275 HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount));
276 } else {
277 htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500");
278 }
279 if (splitSize != null) {
280 htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString());
281 }
282 if (splitParts != null) {
283 htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString());
284 }
285 } else {
286 htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10");
287 }
288 return htd;
289 }
290
291 protected void createTable(HTableDescriptor htd) throws Exception {
292 deleteTable();
293 if (util.getHBaseClusterInterface() instanceof MiniHBaseCluster) {
294 LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
295 htd.setConfiguration(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
296 }
297 byte[][] splits = new RegionSplitter.HexStringSplit().split(
298 util.getHBaseClusterInterface().getClusterStatus().getServersSize());
299 util.getHBaseAdmin().createTable(htd, splits);
300 }
301
302 public static class SeqShardedDataGenerator extends LoadTestDataGenerator {
303 private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") };
304 private static final int PAD_TO = 10;
305 private static final int PREFIX_PAD_TO = 7;
306
307 private final int numPartitions;
308
309 public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) {
310 super(minValueSize, maxValueSize);
311 this.numPartitions = numPartitions;
312 }
313
314 @Override
315 public byte[] getDeterministicUniqueKey(long keyBase) {
316 String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0");
317 return Bytes.toBytes(getPrefix(keyBase) + num);
318 }
319
320 private String getPrefix(long i) {
321 return StringUtils.leftPad(String.valueOf((int)(i % numPartitions)), PREFIX_PAD_TO, "0");
322 }
323
324 @Override
325 public byte[][] getColumnFamilies() {
326 return new byte[][] { COLUMN_FAMILY };
327 }
328
329 @Override
330 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
331 return COLUMN_NAMES;
332 }
333
334 @Override
335 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
336 return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
337 }
338
339 @Override
340 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
341 return LoadTestKVGenerator.verify(value, rowKey, cf, column);
342 }
343
344 @Override
345 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
346 return true;
347 }
348 };
349 }