View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.concurrent.ThreadLocalRandom;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.hbase.CategoryBasedTimeout;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.client.Durability;
38  import org.apache.hadoop.hbase.client.Increment;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
41  import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
42  import org.apache.hadoop.hbase.testclassification.MediumTests;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.wal.WAL;
45  import org.junit.After;
46  import org.junit.Rule;
47  import org.junit.Test;
48  import org.junit.experimental.categories.Category;
49  import org.junit.rules.TestName;
50  import org.junit.rules.TestRule;
51  
52  
53  /**
54   * Increments with some concurrency against a region to ensure we get the right answer.
55   * Test is parameterized to run the fast and slow path increments; if fast,
56   * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true.
57   *
58   * <p>There is similar test up in TestAtomicOperation. It does a test where it has 100 threads
59   * doing increments across two column families all on one row and the increments are connected to
60   * prove atomicity on row.
61   */
62  @Category(MediumTests.class)
63  public class TestRegionIncrement {
64    private static final Log LOG = LogFactory.getLog(TestRegionIncrement.class);
65    @Rule public TestName name = new TestName();
66    @Rule public final TestRule timeout =
67        CategoryBasedTimeout.builder().withTimeout(this.getClass()).
68          withLookingForStuckThread(true).build();
69    private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
70    private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment");
71    private static final int THREAD_COUNT = 10;
72    private static final int INCREMENT_COUNT = 10000;
73    public static Collection<Object []> data() {
74      return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE});
75    }
76  
77    @After
78    public void tearDown() throws Exception {
79      TEST_UTIL.cleanupTestDir();
80    }
81  
82    private HRegion getRegion(final Configuration conf, final String tableName) throws IOException {
83      WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
84        TEST_UTIL.getDataTestDir().toString(), conf);
85      return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
86        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf,
87        false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
88    }
89  
90    private void closeRegion(final HRegion region) throws IOException {
91      region.close();
92      region.getWAL().close();
93    }
94  
95    /**
96     * Increments a single cell a bunch of times.
97     */
98    private static class SingleCellIncrementer extends Thread {
99      private final int count;
100     private final HRegion region;
101     private final Increment increment;
102 
103     SingleCellIncrementer(final int i, final int count, final HRegion region,
104         final Increment increment) {
105       super("" + i);
106       setDaemon(true);
107       this.count = count;
108       this.region = region;
109       this.increment = increment;
110     }
111 
112     @Override
113     public void run() {
114       for (int i = 0; i < this.count; i++) {
115         try {
116           this.region.increment(this.increment);
117           // LOG.info(getName() + " " + i);
118         } catch (IOException e) {
119           throw new RuntimeException(e);
120         }
121       }
122     }
123   }
124 
125   /**
126    * Increments a random row's Cell <code>count</code> times.
127    */
128   private static class CrossRowCellIncrementer extends Thread {
129     private final int count;
130     private final HRegion region;
131     private final Increment [] increments;
132 
133     CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) {
134       super("" + i);
135       setDaemon(true);
136       this.count = count;
137       this.region = region;
138       this.increments = new Increment[range];
139       for (int ii = 0; ii < range; ii++) {
140         this.increments[ii] = new Increment(Bytes.toBytes(i));
141         this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
142       }
143     }
144 
145     @Override
146     public void run() {
147       for (int i = 0; i < this.count; i++) {
148         try {
149           int index = ThreadLocalRandom.current().nextInt(0, this.increments.length);
150           this.region.increment(this.increments[index]);
151           // LOG.info(getName() + " " + index);
152         } catch (IOException e) {
153           throw new RuntimeException(e);
154         }
155       }
156     }
157   }
158 
159   /**
160    * Have each thread update its own Cell. Avoid contention with another thread.
161    * @throws IOException
162    * @throws InterruptedException
163    */
164   @Test
165   public void testUnContendedSingleCellIncrement()
166   throws IOException, InterruptedException {
167     final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
168         TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
169     long startTime = System.currentTimeMillis();
170     try {
171       SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT];
172       for (int i = 0; i < threads.length; i++) {
173         byte [] rowBytes = Bytes.toBytes(i);
174         Increment increment = new Increment(rowBytes);
175         increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1);
176         threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment);
177       }
178       for (int i = 0; i < threads.length; i++) {
179         threads[i].start();
180       }
181       for (int i = 0; i < threads.length; i++) {
182         threads[i].join();
183       }
184       RegionScanner regionScanner = region.getScanner(new Scan());
185       List<Cell> cells = new ArrayList<Cell>(THREAD_COUNT);
186       while(regionScanner.next(cells)) continue;
187       assertEquals(THREAD_COUNT, cells.size());
188       long total = 0;
189       for (Cell cell: cells) total += Bytes.toLong(cell.getValue());
190       assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
191     } finally {
192       closeRegion(region);
193       LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
194     }
195   }
196 
197   /**
198    * Have each thread update its own Cell. Avoid contention with another thread.
199    * @throws IOException
200    * @throws InterruptedException
201    */
202   @Test
203   public void testContendedAcrossCellsIncrement()
204   throws IOException, InterruptedException {
205     final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
206         TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
207     long startTime = System.currentTimeMillis();
208     try {
209       CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT];
210       for (int i = 0; i < threads.length; i++) {
211         threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT);
212       }
213       for (int i = 0; i < threads.length; i++) {
214         threads[i].start();
215       }
216       for (int i = 0; i < threads.length; i++) {
217         threads[i].join();
218       }
219       RegionScanner regionScanner = region.getScanner(new Scan());
220       List<Cell> cells = new ArrayList<Cell>(100);
221       while(regionScanner.next(cells)) continue;
222       assertEquals(THREAD_COUNT, cells.size());
223       long total = 0;
224       for (Cell cell: cells) total += Bytes.toLong(cell.getValue());
225       assertEquals(INCREMENT_COUNT * THREAD_COUNT, total);
226     } finally {
227       closeRegion(region);
228       LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms");
229     }
230   }
231 }