View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import static org.junit.Assert.assertFalse;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.HBaseTestingUtility;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
35  import org.apache.hadoop.hbase.testclassification.SmallTests;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.FSTableDescriptors;
38  import org.apache.hadoop.hbase.util.FSUtils;
39  import org.apache.hadoop.hbase.wal.WAL;
40  import org.apache.hadoop.hbase.wal.WALFactory;
41  import org.apache.hadoop.hbase.wal.WALKey;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  
45  /**
46   * Test many concurrent appenders to an WAL while rolling the log.
47   */
48  @Category(SmallTests.class)
49  public class TestLogRollingNoCluster {
50    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
51    private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
52    private static final int THREAD_COUNT = 100; // Spin up this many threads
53  
54    /**
55     * Spin up a bunch of threads and have them all append to a WAL.  Roll the
56     * WAL frequently to try and trigger NPE.
57     * @throws IOException
58     * @throws InterruptedException
59     */
60    @Test
61    public void testContendedLogRolling() throws IOException, InterruptedException {
62      Path dir = TEST_UTIL.getDataTestDir();
63      // The implementation needs to know the 'handler' count.
64      TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT);
65      final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
66      FSUtils.setRootDir(conf, dir);
67      final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
68      final WAL wal = wals.getWAL(new byte[]{});
69      
70      Appender [] appenders = null;
71  
72      final int count = THREAD_COUNT;
73      appenders = new Appender[count];
74      try {
75        for (int i = 0; i < count; i++) {
76          // Have each appending thread write 'count' entries
77          appenders[i] = new Appender(wal, i, count);
78        }
79        for (int i = 0; i < count; i++) {
80          appenders[i].start();
81        }
82        for (int i = 0; i < count; i++) {
83          //ensure that all threads are joined before closing the wal
84          appenders[i].join();
85        }
86      } finally {
87        wals.close();
88      }
89      for (int i = 0; i < count; i++) {
90        assertFalse(appenders[i].isException());
91      }
92    }
93  
94    /**
95     * Appender thread.  Appends to passed wal file.
96     */
97    static class Appender extends Thread {
98      private final Log log;
99      private final WAL wal;
100     private final int count;
101     private Exception e = null;
102 
103     Appender(final WAL wal, final int index, final int count) {
104       super("" + index);
105       this.wal = wal;
106       this.count = count;
107       this.log = LogFactory.getLog("Appender:" + getName());
108     }
109 
110     /**
111      * @return Call when the thread is done.
112      */
113     boolean isException() {
114       return !isAlive() && this.e != null;
115     }
116 
117     Exception getException() {
118       return this.e;
119     }
120 
121     @Override
122     public void run() {
123       this.log.info(getName() +" started");
124       final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
125       try {
126         for (int i = 0; i < this.count; i++) {
127           long now = System.currentTimeMillis();
128           // Roll every ten edits
129           if (i % 10 == 0) {
130             this.wal.rollWriter();
131           }
132           WALEdit edit = new WALEdit();
133           byte[] bytes = Bytes.toBytes(i);
134           edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
135           final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
136           final FSTableDescriptors fts = new FSTableDescriptors(TEST_UTIL.getConfiguration());
137           final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
138           final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
139               TableName.META_TABLE_NAME, now, mvcc), edit, true);
140           wal.sync(txid);
141         }
142         String msg = getName() + " finished";
143         if (isException())
144           this.log.info(msg, getException());
145         else
146           this.log.info(msg);
147       } catch (Exception e) {
148         this.e = e;
149         log.info("Caught exception from Appender:" + getName(), e);
150       } finally {
151         // Call sync on our log.else threads just hang out.
152         try {
153           this.wal.sync();
154         } catch (IOException e) {
155           throw new RuntimeException(e);
156         }
157       }
158     }
159   }
160 
161   //@org.junit.Rule
162   //public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
163   //  new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
164 }