View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.util;
18  
19  import java.io.IOException;
20  import java.util.Arrays;
21  import java.util.HashSet;
22  import java.util.Set;
23  import java.util.concurrent.atomic.AtomicLong;
24  
25  import org.apache.commons.lang.math.RandomUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HRegionLocation;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.client.Get;
32  
33  import org.apache.hadoop.hbase.client.Consistency;
34  import org.apache.hadoop.hbase.client.HTableInterface;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.Table;
37  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
38  
39  /** Creates multiple threads that read and verify previously written data */
40  public class MultiThreadedReader extends MultiThreadedAction
41  {
42    private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
43  
44    protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
45    private final double verifyPercent;
46    protected volatile boolean aborted;
47  
48    protected MultiThreadedWriterBase writer = null;
49  
50    /**
51     * The number of keys verified in a sequence. This will never be larger than
52     * the total number of keys in the range. The reader might also verify
53     * random keys when it catches up with the writer.
54     */
55    private final AtomicLong numUniqueKeysVerified = new AtomicLong();
56  
57    /**
58     * Default maximum number of read errors to tolerate before shutting down all
59     * readers.
60     */
61    public static final int DEFAULT_MAX_ERRORS = 10;
62  
63    /**
64     * Default "window" size between the last key written by the writer and the
65     * key that we attempt to read. The lower this number, the stricter our
66     * testing is. If this is zero, we always attempt to read the highest key
67     * in the contiguous sequence of keys written by the writers.
68     */
69    public static final int DEFAULT_KEY_WINDOW = 0;
70  
71    /**
72     * Default batch size for multigets
73     */
74    public static final int DEFAULT_BATCH_SIZE = 1; //translates to simple GET (no multi GET)
75  
76    protected AtomicLong numKeysVerified = new AtomicLong(0);
77    protected AtomicLong numReadErrors = new AtomicLong(0);
78    protected AtomicLong numReadFailures = new AtomicLong(0);
79    protected AtomicLong nullResult = new AtomicLong(0);
80    private int maxErrors = DEFAULT_MAX_ERRORS;
81    private int keyWindow = DEFAULT_KEY_WINDOW;
82    private int batchSize = DEFAULT_BATCH_SIZE;
83    private int regionReplicaId = -1; // particular region replica id to do reads against if set
84  
85    public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
86        TableName tableName, double verifyPercent) throws IOException {
87      super(dataGen, conf, tableName, "R");
88      this.verifyPercent = verifyPercent;
89    }
90  
91    public void linkToWriter(MultiThreadedWriterBase writer) {
92      this.writer = writer;
93      writer.setTrackWroteKeys(true);
94    }
95  
96    public void setMaxErrors(int maxErrors) {
97      this.maxErrors = maxErrors;
98    }
99  
100   public void setKeyWindow(int keyWindow) {
101     this.keyWindow = keyWindow;
102   }
103 
104   public void setMultiGetBatchSize(int batchSize) {
105     this.batchSize = batchSize;
106   }
107 
108   public void setRegionReplicaId(int regionReplicaId) {
109     this.regionReplicaId = regionReplicaId;
110   }
111 
112   @Override
113   public void start(long startKey, long endKey, int numThreads) throws IOException {
114     super.start(startKey, endKey, numThreads);
115     if (verbose) {
116       LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
117     }
118 
119     addReaderThreads(numThreads);
120     startThreads(readers);
121   }
122 
123   protected void addReaderThreads(int numThreads) throws IOException {
124     for (int i = 0; i < numThreads; ++i) {
125       HBaseReaderThread reader = createReaderThread(i);
126       readers.add(reader);
127     }
128   }
129 
130   protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
131     HBaseReaderThread reader = new HBaseReaderThread(readerId);
132     Threads.setLoggingUncaughtExceptionHandler(reader);
133     return reader;
134   }
135 
136   public class HBaseReaderThread extends Thread {
137     protected final int readerId;
138     protected final Table table;
139 
140     /** The "current" key being read. Increases from startKey to endKey. */
141     private long curKey;
142 
143     /** Time when the thread started */
144     protected long startTimeMs;
145 
146     /** If we are ahead of the writer and reading a random key. */
147     private boolean readingRandomKey;
148 
149     private boolean printExceptionTrace = true;
150 
151     /**
152      * @param readerId only the keys with this remainder from division by
153      *          {@link #numThreads} will be read by this thread
154      */
155     public HBaseReaderThread(int readerId) throws IOException {
156       this.readerId = readerId;
157       table = createTable();
158       setName(getClass().getSimpleName() + "_" + readerId);
159     }
160 
161     protected HTableInterface createTable() throws IOException {
162       return connection.getTable(tableName);
163     }
164 
165     @Override
166     public void run() {
167       try {
168         runReader();
169       } finally {
170         closeTable();
171         numThreadsWorking.decrementAndGet();
172       }
173     }
174 
175     protected void closeTable() {
176       try {
177         if (table != null) {
178           table.close();
179         }
180       } catch (IOException e) {
181         LOG.error("Error closing table", e);
182       }
183     }
184 
185     private void runReader() {
186       if (verbose) {
187         LOG.info("Started thread #" + readerId + " for reads...");
188       }
189 
190       startTimeMs = System.currentTimeMillis();
191       curKey = startKey;
192       long [] keysForThisReader = new long[batchSize];
193       while (curKey < endKey && !aborted) {
194         int readingRandomKeyStartIndex = -1;
195         int numKeys = 0;
196         // if multiGet, loop until we have the number of keys equal to the batch size
197         do {
198           long k = getNextKeyToRead();
199           if (k < startKey || k >= endKey) {
200             numReadErrors.incrementAndGet();
201             throw new AssertionError("Load tester logic error: proposed key " +
202                 "to read " + k + " is out of range (startKey=" + startKey +
203                 ", endKey=" + endKey + ")");
204           }
205           if (k % numThreads != readerId ||
206               writer != null && writer.failedToWriteKey(k)) {
207             // Skip keys that this thread should not read, as well as the keys
208             // that we know the writer failed to write.
209             continue;
210           }
211           keysForThisReader[numKeys] = k;
212           if (readingRandomKey && readingRandomKeyStartIndex == -1) {
213             //store the first index of a random read
214             readingRandomKeyStartIndex = numKeys;
215           }
216           numKeys++;
217         } while (numKeys < batchSize && curKey < endKey && !aborted);
218 
219         if (numKeys > 0) { //meaning there is some key to read
220           readKey(keysForThisReader);
221           // We have verified some unique key(s).
222           numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ?
223               numKeys : readingRandomKeyStartIndex);
224         }
225       }
226     }
227 
228     /**
229      * Should only be used for the concurrent writer/reader workload. The
230      * maximum key we are allowed to read, subject to the "key window"
231      * constraint.
232      */
233     private long maxKeyWeCanRead() {
234       long insertedUpToKey = writer.wroteUpToKey();
235       if (insertedUpToKey >= endKey - 1) {
236         // The writer has finished writing our range, so we can read any
237         // key in the range.
238         return endKey - 1;
239       }
240       return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
241     }
242 
243     protected long getNextKeyToRead() {
244       readingRandomKey = false;
245       if (writer == null || curKey <= maxKeyWeCanRead()) {
246         return curKey++;
247       }
248 
249       // We caught up with the writer. See if we can read any keys at all.
250       long maxKeyToRead;
251       while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
252         // The writer has not written sufficient keys for us to be able to read
253         // anything at all. Sleep a bit. This should only happen in the
254         // beginning of a load test run.
255         Threads.sleepWithoutInterrupt(50);
256       }
257 
258       if (curKey <= maxKeyToRead) {
259         // The writer wrote some keys, and we are now allowed to read our
260         // current key.
261         return curKey++;
262       }
263 
264       // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
265       // Don't increment the current key -- we still have to try reading it
266       // later. Set a flag to make sure that we don't count this key towards
267       // the set of unique keys we have verified.
268       readingRandomKey = true;
269       return startKey + Math.abs(RandomUtils.nextLong())
270           % (maxKeyToRead - startKey + 1);
271     }
272 
273     private Get[] readKey(long[] keysToRead) {
274       Get [] gets = new Get[keysToRead.length];
275       int i = 0;
276       for (long keyToRead : keysToRead) {
277         try {
278           gets[i] = createGet(keyToRead);
279           if (keysToRead.length == 1) {
280             queryKey(gets[i], RandomUtils.nextInt(100) < verifyPercent, keyToRead);
281           }
282           i++;
283         } catch (IOException e) {
284           numReadFailures.addAndGet(1);
285           LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
286               + ", time from start: "
287               + (System.currentTimeMillis() - startTimeMs) + " ms");
288           if (printExceptionTrace) {
289             LOG.warn(e);
290             printExceptionTrace = false;
291           }
292         }
293       }
294       if (keysToRead.length > 1) {
295         try {
296           queryKey(gets, RandomUtils.nextInt(100) < verifyPercent, keysToRead);
297         } catch (IOException e) {
298           numReadFailures.addAndGet(gets.length);
299           for (long keyToRead : keysToRead) {
300             LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
301                 + ", time from start: "
302                 + (System.currentTimeMillis() - startTimeMs) + " ms");
303           }
304           if (printExceptionTrace) {
305             LOG.warn(e);
306             printExceptionTrace = false;
307           }
308         }
309       }
310       return gets;
311     }
312 
313     protected Get createGet(long keyToRead) throws IOException {
314       Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
315       String cfsString = "";
316       byte[][] columnFamilies = dataGenerator.getColumnFamilies();
317       for (byte[] cf : columnFamilies) {
318         get.addFamily(cf);
319         if (verbose) {
320           if (cfsString.length() > 0) {
321             cfsString += ", ";
322           }
323           cfsString += "[" + Bytes.toStringBinary(cf) + "]";
324         }
325       }
326       get = dataGenerator.beforeGet(keyToRead, get);
327       if (regionReplicaId > 0) {
328         get.setReplicaId(regionReplicaId);
329         get.setConsistency(Consistency.TIMELINE);
330       }
331       if (verbose) {
332         LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
333       }
334       return get;
335     }
336 
337     public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
338       // read the data
339       long start = System.nanoTime();
340       // Uses multi/batch gets
341       Result[] results = table.get(Arrays.asList(gets));
342       long end = System.nanoTime();
343       verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
344     }
345 
346     public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
347       // read the data
348 
349       long start = System.nanoTime();
350       // Uses simple get
351       Result result = table.get(get);
352       long end = System.nanoTime();
353       verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
354     }
355 
356     protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
357         Result[] results, Table table, boolean isNullExpected)
358         throws IOException {
359       totalOpTimeMs.addAndGet(elapsedNano / 1000000);
360       numKeys.addAndGet(gets.length);
361       int i = 0;
362       for (Result result : results) {
363         verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
364             isNullExpected);
365       }
366     }
367 
368     protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
369         Result result, Table table, boolean isNullExpected)
370         throws IOException {
371       verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano,
372           new Result[]{result}, table, isNullExpected);
373     }
374 
375     private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get,
376         Result result, Table table, boolean isNullExpected) throws IOException {
377       if (!result.isEmpty()) {
378         if (verify) {
379           numKeysVerified.incrementAndGet();
380         }
381       } else {
382         HRegionLocation hloc = connection.getRegionLocation(tableName,
383           get.getRow(), false);
384         String rowKey = Bytes.toString(get.getRow());
385         LOG.info("Key = " + rowKey + ", Region location: " + hloc);
386         if(isNullExpected) {
387           nullResult.incrementAndGet();
388           LOG.debug("Null result obtained for the key ="+rowKey);
389           return;
390         }
391       }
392       boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
393       long numErrorsAfterThis = 0;
394       if (isOk) {
395         long cols = 0;
396         // Count the columns for reporting purposes.
397         for (byte[] cf : result.getMap().keySet()) {
398           cols += result.getFamilyMap(cf).size();
399         }
400         numCols.addAndGet(cols);
401       } else {
402         if (writer != null) {
403           LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
404         }
405         numErrorsAfterThis = numReadErrors.incrementAndGet();
406       }
407 
408       if (numErrorsAfterThis > maxErrors) {
409         LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
410         aborted = true;
411       }
412     }
413   }
414 
415   public long getNumReadFailures() {
416     return numReadFailures.get();
417   }
418 
419   public long getNumReadErrors() {
420     return numReadErrors.get();
421   }
422 
423   public long getNumKeysVerified() {
424     return numKeysVerified.get();
425   }
426 
427   public long getNumUniqueKeysVerified() {
428     return numUniqueKeysVerified.get();
429   }
430 
431   public long getNullResultsCount() {
432     return nullResult.get();
433   }
434 
435   @Override
436   protected String progressInfo() {
437     StringBuilder sb = new StringBuilder();
438     appendToStatus(sb, "verified", numKeysVerified.get());
439     appendToStatus(sb, "READ FAILURES", numReadFailures.get());
440     appendToStatus(sb, "READ ERRORS", numReadErrors.get());
441     appendToStatus(sb, "NULL RESULT", nullResult.get());
442     return sb.toString();
443   }
444 }