View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.Random;
24  import java.util.concurrent.atomic.AtomicLong;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
30  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
31  import org.apache.hadoop.hbase.client.Get;
32  import org.apache.hadoop.hbase.client.HBaseAdmin;
33  import org.apache.hadoop.hbase.client.HTable;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.ResultScanner;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
40  import org.apache.hadoop.hbase.testclassification.MediumTests;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.util.StringUtils;
43  import org.apache.hadoop.util.Tool;
44  import org.apache.hadoop.util.ToolRunner;
45  import org.junit.Test;
46  import org.junit.experimental.categories.Category;
47  
48  import com.google.common.collect.Lists;
49  
50  /**
51   * Test case that uses multiple threads to read and write multifamily rows
52   * into a table, verifying that reads never see partially-complete writes.
53   *
54   * This can run as a junit test, or with a main() function which runs against
55   * a real cluster (eg for testing with failures, region movement, etc)
56   */
57  @Category(MediumTests.class)
58  public class TestAcidGuarantees implements Tool {
59    protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
60    public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
61    public static final byte [] FAMILY_A = Bytes.toBytes("A");
62    public static final byte [] FAMILY_B = Bytes.toBytes("B");
63    public static final byte [] FAMILY_C = Bytes.toBytes("C");
64    public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
65  
66    public static final byte[][] FAMILIES = new byte[][] {
67      FAMILY_A, FAMILY_B, FAMILY_C };
68  
69    private HBaseTestingUtility util;
70  
71    public static int NUM_COLS_TO_CHECK = 50;
72  
73    // when run as main
74    private Configuration conf;
75  
76    private void createTableIfMissing()
77      throws IOException {
78      try {
79        util.createTable(TABLE_NAME, FAMILIES);
80      } catch (TableExistsException tee) {
81      }
82    }
83  
84    public TestAcidGuarantees() {
85      // Set small flush size for minicluster so we exercise reseeking scanners
86      Configuration conf = HBaseConfiguration.create();
87      conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
88      // prevent aggressive region split
89      conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
90              ConstantSizeRegionSplitPolicy.class.getName());
91      util = new HBaseTestingUtility(conf);
92    }
93  
94    public void setHBaseTestingUtil(HBaseTestingUtility util) {
95      this.util = util;
96    }
97  
98    /**
99     * Thread that does random full-row writes into a table.
100    */
101   public static class AtomicityWriter extends RepeatingTestThread {
102     Random rand = new Random();
103     byte data[] = new byte[10];
104     byte targetRows[][];
105     byte targetFamilies[][];
106     Table table;
107     AtomicLong numWritten = new AtomicLong();
108 
109     public AtomicityWriter(TestContext ctx, byte targetRows[][],
110                            byte targetFamilies[][]) throws IOException {
111       super(ctx);
112       this.targetRows = targetRows;
113       this.targetFamilies = targetFamilies;
114       table = new HTable(ctx.getConf(), TABLE_NAME);
115     }
116     public void doAnAction() throws Exception {
117       // Pick a random row to write into
118       byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
119       Put p = new Put(targetRow);
120       rand.nextBytes(data);
121 
122       for (byte[] family : targetFamilies) {
123         for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
124           byte qualifier[] = Bytes.toBytes("col" + i);
125           p.add(family, qualifier, data);
126         }
127       }
128       table.put(p);
129       numWritten.getAndIncrement();
130     }
131   }
132 
133   /**
134    * Thread that does single-row reads in a table, looking for partially
135    * completed rows.
136    */
137   public static class AtomicGetReader extends RepeatingTestThread {
138     byte targetRow[];
139     byte targetFamilies[][];
140     Table table;
141     int numVerified = 0;
142     AtomicLong numRead = new AtomicLong();
143 
144     public AtomicGetReader(TestContext ctx, byte targetRow[],
145                            byte targetFamilies[][]) throws IOException {
146       super(ctx);
147       this.targetRow = targetRow;
148       this.targetFamilies = targetFamilies;
149       table = new HTable(ctx.getConf(), TABLE_NAME);
150     }
151 
152     public void doAnAction() throws Exception {
153       Get g = new Get(targetRow);
154       Result res = table.get(g);
155       byte[] gotValue = null;
156       if (res.getRow() == null) {
157         // Trying to verify but we didn't find the row - the writing
158         // thread probably just hasn't started writing yet, so we can
159         // ignore this action
160         return;
161       }
162 
163       for (byte[] family : targetFamilies) {
164         for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
165           byte qualifier[] = Bytes.toBytes("col" + i);
166           byte thisValue[] = res.getValue(family, qualifier);
167           if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
168             gotFailure(gotValue, res);
169           }
170           numVerified++;
171           gotValue = thisValue;
172         }
173       }
174       numRead.getAndIncrement();
175     }
176 
177     private void gotFailure(byte[] expected, Result res) {
178       StringBuilder msg = new StringBuilder();
179       msg.append("Failed after ").append(numVerified).append("!");
180       msg.append("Expected=").append(Bytes.toStringBinary(expected));
181       msg.append("Got:\n");
182       for (Cell kv : res.listCells()) {
183         msg.append(kv.toString());
184         msg.append(" val= ");
185         msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
186         msg.append("\n");
187       }
188       throw new RuntimeException(msg.toString());
189     }
190   }
191 
192   /**
193    * Thread that does full scans of the table looking for any partially completed
194    * rows.
195    */
196   public static class AtomicScanReader extends RepeatingTestThread {
197     byte targetFamilies[][];
198     Table table;
199     AtomicLong numScans = new AtomicLong();
200     AtomicLong numRowsScanned = new AtomicLong();
201 
202     public AtomicScanReader(TestContext ctx,
203                            byte targetFamilies[][]) throws IOException {
204       super(ctx);
205       this.targetFamilies = targetFamilies;
206       table = new HTable(ctx.getConf(), TABLE_NAME);
207     }
208 
209     public void doAnAction() throws Exception {
210       Scan s = new Scan();
211       for (byte[] family : targetFamilies) {
212         s.addFamily(family);
213       }
214       ResultScanner scanner = table.getScanner(s);
215 
216       for (Result res : scanner) {
217         byte[] gotValue = null;
218 
219         for (byte[] family : targetFamilies) {
220           for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
221             byte qualifier[] = Bytes.toBytes("col" + i);
222             byte thisValue[] = res.getValue(family, qualifier);
223             if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
224               gotFailure(gotValue, res);
225             }
226             gotValue = thisValue;
227           }
228         }
229         numRowsScanned.getAndIncrement();
230       }
231       numScans.getAndIncrement();
232     }
233 
234     private void gotFailure(byte[] expected, Result res) {
235       StringBuilder msg = new StringBuilder();
236       msg.append("Failed after ").append(numRowsScanned).append("!");
237       msg.append("Expected=").append(Bytes.toStringBinary(expected));
238       msg.append("Got:\n");
239       for (Cell kv : res.listCells()) {
240         msg.append(kv.toString());
241         msg.append(" val= ");
242         msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
243         msg.append("\n");
244       }
245       throw new RuntimeException(msg.toString());
246     }
247   }
248 
249   public void runTestAtomicity(long millisToRun,
250       int numWriters,
251       int numGetters,
252       int numScanners,
253       int numUniqueRows) throws Exception {
254     runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
255   }
256 
257   public void runTestAtomicity(long millisToRun,
258       int numWriters,
259       int numGetters,
260       int numScanners,
261       int numUniqueRows,
262       final boolean systemTest) throws Exception {
263     createTableIfMissing();
264     TestContext ctx = new TestContext(util.getConfiguration());
265 
266     byte rows[][] = new byte[numUniqueRows][];
267     for (int i = 0; i < numUniqueRows; i++) {
268       rows[i] = Bytes.toBytes("test_row_" + i);
269     }
270 
271     List<AtomicityWriter> writers = Lists.newArrayList();
272     for (int i = 0; i < numWriters; i++) {
273       AtomicityWriter writer = new AtomicityWriter(
274           ctx, rows, FAMILIES);
275       writers.add(writer);
276       ctx.addThread(writer);
277     }
278     // Add a flusher
279     ctx.addThread(new RepeatingTestThread(ctx) {
280       HBaseAdmin admin = util.getHBaseAdmin();
281       public void doAnAction() throws Exception {
282         try {
283           admin.flush(TABLE_NAME);
284         } catch(IOException ioe) {
285           LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
286         }
287         // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
288         // we would flush as often as possible.  On a running cluster, this isn't practical:
289         // (1) we will cause a lot of load due to all the flushing and compacting
290         // (2) we cannot change the flushing/compacting related Configuration options to try to
291         // alleviate this
292         // (3) it is an unrealistic workload, since no one would actually flush that often.
293         // Therefore, let's flush every minute to have more flushes than usual, but not overload
294         // the running cluster.
295         if (systemTest) Thread.sleep(60000);
296       }
297     });
298 
299     List<AtomicGetReader> getters = Lists.newArrayList();
300     for (int i = 0; i < numGetters; i++) {
301       AtomicGetReader getter = new AtomicGetReader(
302           ctx, rows[i % numUniqueRows], FAMILIES);
303       getters.add(getter);
304       ctx.addThread(getter);
305     }
306 
307     List<AtomicScanReader> scanners = Lists.newArrayList();
308     for (int i = 0; i < numScanners; i++) {
309       AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
310       scanners.add(scanner);
311       ctx.addThread(scanner);
312     }
313 
314     ctx.startThreads();
315     ctx.waitFor(millisToRun);
316     ctx.stop();
317 
318     LOG.info("Finished test. Writers:");
319     for (AtomicityWriter writer : writers) {
320       LOG.info("  wrote " + writer.numWritten.get());
321     }
322     LOG.info("Readers:");
323     for (AtomicGetReader reader : getters) {
324       LOG.info("  read " + reader.numRead.get());
325     }
326     LOG.info("Scanners:");
327     for (AtomicScanReader scanner : scanners) {
328       LOG.info("  scanned " + scanner.numScans.get());
329       LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
330     }
331   }
332 
333   @Test
334   public void testGetAtomicity() throws Exception {
335     util.startMiniCluster(1);
336     try {
337       runTestAtomicity(20000, 5, 5, 0, 3);
338     } finally {
339       util.shutdownMiniCluster();
340     }
341   }
342 
343   @Test
344   public void testScanAtomicity() throws Exception {
345     util.startMiniCluster(1);
346     try {
347       runTestAtomicity(20000, 5, 0, 5, 3);
348     } finally {
349       util.shutdownMiniCluster();
350     }
351   }
352 
353   @Test
354   public void testMixedAtomicity() throws Exception {
355     util.startMiniCluster(1);
356     try {
357       runTestAtomicity(20000, 5, 2, 2, 3);
358     } finally {
359       util.shutdownMiniCluster();
360     }
361   }
362 
363   ////////////////////////////////////////////////////////////////////////////
364   // Tool interface
365   ////////////////////////////////////////////////////////////////////////////
366   @Override
367   public Configuration getConf() {
368     return conf;
369   }
370 
371   @Override
372   public void setConf(Configuration c) {
373     this.conf = c;
374     this.util = new HBaseTestingUtility(c);
375   }
376 
377   @Override
378   public int run(String[] arg0) throws Exception {
379     Configuration c = getConf();
380     int millis = c.getInt("millis", 5000);
381     int numWriters = c.getInt("numWriters", 50);
382     int numGetters = c.getInt("numGetters", 2);
383     int numScanners = c.getInt("numScanners", 2);
384     int numUniqueRows = c.getInt("numUniqueRows", 3);
385     runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true);
386     return 0;
387   }
388 
389   public static void main(String args[]) throws Exception {
390     Configuration c = HBaseConfiguration.create();
391     int status;
392     try {
393       TestAcidGuarantees test = new TestAcidGuarantees();
394       status = ToolRunner.run(c, test, args);
395     } catch (Exception e) {
396       LOG.error("Exiting due to error", e);
397       status = -1;
398     }
399     System.exit(status);
400   }
401 
402 
403 }
404