View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.*;
22  
23  import java.io.IOException;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.atomic.AtomicReference;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.commons.logging.impl.Log4JLogger;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.Waiter.Predicate;
39  import org.apache.hadoop.hbase.client.Admin;
40  import org.apache.hadoop.hbase.client.Connection;
41  import org.apache.hadoop.hbase.client.ConnectionFactory;
42  import org.apache.hadoop.hbase.client.Consistency;
43  import org.apache.hadoop.hbase.client.Get;
44  import org.apache.hadoop.hbase.client.RpcRetryingCaller;
45  import org.apache.hadoop.hbase.client.Table;
46  import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
47  import org.apache.hadoop.hbase.testclassification.LargeTests;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
50  import org.apache.hadoop.hbase.util.Threads;
51  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
52  import org.apache.log4j.Level;
53  import org.junit.After;
54  import org.junit.Before;
55  import org.junit.Rule;
56  import org.junit.Test;
57  import org.junit.experimental.categories.Category;
58  import org.junit.rules.TestName;
59  import org.junit.runner.RunWith;
60  import org.junit.runners.Parameterized;
61  import org.junit.runners.Parameterized.Parameters;
62  
63  /**
64   * Tests failover of secondary region replicas.
65   */
66  @RunWith(Parameterized.class)
67  @Category(LargeTests.class)
68  public class TestRegionReplicaFailover {
69  
70    private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
71  
72    static {
73      ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
74    }
75  
76    private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
77  
78    private static final int NB_SERVERS = 3;
79  
80    protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1,
81        HBaseTestingUtility.fam2, HBaseTestingUtility.fam3};
82    protected final byte[] fam = HBaseTestingUtility.fam1;
83    protected final byte[] qual1 = Bytes.toBytes("qual1");
84    protected final byte[] value1 = Bytes.toBytes("value1");
85    protected final byte[] row = Bytes.toBytes("rowA");
86    protected final byte[] row2 = Bytes.toBytes("rowB");
87  
88    @Rule public TestName name = new TestName();
89  
90    private HTableDescriptor htd;
91  
92    /*
93     * We are testing with dist log split and dist log replay separately
94     */
95    @Parameters
96    public static Collection<Object[]> getParameters() {
97      Object[][] params =
98          new Boolean[][] { /*{true}, Disable DLR!!! It is going to be removed*/ {false} };
99      return Arrays.asList(params);
100   }
101 
102   @Parameterized.Parameter(0)
103   public boolean distributedLogReplay;
104 
105   @Before
106   public void before() throws Exception {
107     Configuration conf = HTU.getConfiguration();
108    // Up the handlers; this test needs more than usual.
109     conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
110     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
111     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
112     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
113     conf.setInt("replication.stats.thread.period.seconds", 5);
114     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
115     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
116 
117     HTU.startMiniCluster(NB_SERVERS);
118     htd = HTU.createTableDescriptor(
119       name.getMethodName().substring(0, name.getMethodName().length()-3));
120     htd.setRegionReplication(3);
121     HTU.getHBaseAdmin().createTable(htd);
122   }
123 
124   @After
125   public void after() throws Exception {
126     HTU.deleteTableIfAny(htd.getTableName());
127     HTU.shutdownMiniCluster();
128   }
129 
130   /**
131    * Tests the case where a newly created table with region replicas and no data, the secondary
132    * region replicas are available to read immediately.
133    */
134   @Test(timeout = 60000)
135   public void testSecondaryRegionWithEmptyRegion() throws IOException {
136     // Create a new table with region replication, don't put any data. Test that the secondary
137     // region replica is available to read.
138     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
139         Table table = connection.getTable(htd.getTableName())) {
140 
141       Get get = new Get(row);
142       get.setConsistency(Consistency.TIMELINE);
143       get.setReplicaId(1);
144       table.get(get); // this should not block
145     }
146   }
147 
148   /**
149    * Tests the case where if there is some data in the primary region, reopening the region replicas
150    * (enable/disable table, etc) makes the region replicas readable.
151    * @throws IOException
152    */
153   @Test(timeout = 60000)
154   public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
155     // Create a new table with region replication and load some data
156     // than disable and enable the table again and verify the data from secondary
157     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
158         Table table = connection.getTable(htd.getTableName())) {
159 
160       HTU.loadNumericRows(table, fam, 0, 1000);
161 
162       HTU.getHBaseAdmin().disableTable(htd.getTableName());
163       HTU.getHBaseAdmin().enableTable(htd.getTableName());
164 
165       HTU.verifyNumericRows(table, fam, 0, 1000, 1);
166     }
167   }
168 
169   /**
170    * Tests the case where killing a primary region with unflushed data recovers
171    */
172   @Test (timeout = 120000)
173   public void testPrimaryRegionKill() throws Exception {
174     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
175         Table table = connection.getTable(htd.getTableName())) {
176 
177       HTU.loadNumericRows(table, fam, 0, 1000);
178 
179       // wal replication is async, we have to wait until the replication catches up, or we timeout
180       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
181       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
182 
183       // we should not have flushed files now, but data in memstores of primary and secondary
184       // kill the primary region replica now, and ensure that when it comes back up, we can still
185       // read from it the same data from primary and secondaries
186       boolean aborted = false;
187       for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
188         for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
189           if (r.getRegionInfo().getReplicaId() == 0) {
190             LOG.info("Aborting region server hosting primary region replica");
191             rs.getRegionServer().abort("for test");
192             aborted = true;
193           }
194         }
195       }
196       assertTrue(aborted);
197 
198       // wal replication is async, we have to wait until the replication catches up, or we timeout
199       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
200       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
201       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
202     }
203 
204     // restart the region server
205     HTU.getMiniHBaseCluster().startRegionServer();
206   }
207 
208   /** wal replication is async, we have to wait until the replication catches up, or we timeout
209    */
210   private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
211       final int endRow, final int replicaId, final long timeout) throws Exception {
212     try {
213       HTU.waitFor(timeout, new Predicate<Exception>() {
214         @Override
215         public boolean evaluate() throws Exception {
216           try {
217             HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
218             return true;
219           } catch (AssertionError ae) {
220             return false;
221           }
222         }
223       });
224     } catch (Throwable t) {
225       // ignore this, but redo the verify do get the actual exception
226       HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
227     }
228   }
229 
230   /**
231    * Tests the case where killing a secondary region with unflushed data recovers, and the replica
232    * becomes available to read again shortly.
233    */
234   @Test (timeout = 120000)
235   public void testSecondaryRegionKill() throws Exception {
236     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
237         Table table = connection.getTable(htd.getTableName())) {
238       HTU.loadNumericRows(table, fam, 0, 1000);
239 
240       // wait for some time to ensure that async wal replication does it's magic
241       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
242       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
243 
244       // we should not have flushed files now, but data in memstores of primary and secondary
245       // kill the secondary region replica now, and ensure that when it comes back up, we can still
246       // read from it the same data
247       boolean aborted = false;
248       for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
249         for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
250           if (r.getRegionInfo().getReplicaId() == 1) {
251             LOG.info("Aborting region server hosting secondary region replica");
252             rs.getRegionServer().abort("for test");
253             aborted = true;
254           }
255         }
256       }
257       assertTrue(aborted);
258 
259       Threads.sleep(5000);
260 
261       HTU.verifyNumericRows(table, fam, 0, 1000, 1);
262       HTU.verifyNumericRows(table, fam, 0, 1000, 2);
263     }
264 
265     // restart the region server
266     HTU.getMiniHBaseCluster().startRegionServer();
267   }
268 
269   /**
270    * Tests the case where there are 3 region replicas and the primary is continuously accepting
271    * new writes while one of the secondaries is killed. Verification is done for both of the
272    * secondary replicas.
273    */
274   @Test (timeout = 120000)
275   public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
276     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
277         Table table = connection.getTable(htd.getTableName());
278         Admin admin = connection.getAdmin()) {
279       // start a thread to do the loading of primary
280       HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
281       admin.flush(table.getName());
282       HTU.loadNumericRows(table, fam, 1000, 2000);
283 
284       final AtomicReference<Throwable> ex = new AtomicReference<Throwable>(null);
285       final AtomicBoolean done = new AtomicBoolean(false);
286       final AtomicInteger key = new AtomicInteger(2000);
287 
288       Thread loader = new Thread() {
289         @Override
290         public void run() {
291           while (!done.get()) {
292             try {
293               HTU.loadNumericRows(table, fam, key.get(), key.get()+1000);
294               key.addAndGet(1000);
295             } catch (Throwable e) {
296               ex.compareAndSet(null, e);
297             }
298           }
299         }
300       };
301       loader.start();
302 
303       Thread aborter = new Thread() {
304         @Override
305         public void run() {
306           try {
307             boolean aborted = false;
308             for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
309               for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
310                 if (r.getRegionInfo().getReplicaId() == 1) {
311                   LOG.info("Aborting region server hosting secondary region replica");
312                   rs.getRegionServer().abort("for test");
313                   aborted = true;
314                 }
315               }
316             }
317             assertTrue(aborted);
318           } catch (Throwable e) {
319             ex.compareAndSet(null, e);
320           }
321         };
322       };
323 
324       aborter.start();
325       aborter.join();
326       done.set(true);
327       loader.join();
328 
329       assertNull(ex.get());
330 
331       assertTrue(key.get() > 1000); // assert that the test is working as designed
332       LOG.info("Loaded up to key :" + key.get());
333       verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
334       verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
335       verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
336     }
337 
338     // restart the region server
339     HTU.getMiniHBaseCluster().startRegionServer();
340   }
341 
342   /**
343    * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
344    * replicas should not block handlers on RS indefinitely.
345    */
346   @Test (timeout = 120000)
347   public void testLotsOfRegionReplicas() throws IOException {
348     int numRegions = NB_SERVERS * 20;
349     int regionReplication = 10;
350     String tableName = htd.getTableName().getNameAsString() + "2";
351     htd = HTU.createTableDescriptor(tableName);
352     htd.setRegionReplication(regionReplication);
353 
354     // dont care about splits themselves too much
355     byte[] startKey = Bytes.toBytes("aaa");
356     byte[] endKey = Bytes.toBytes("zzz");
357     byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
358     HTU.getHBaseAdmin().createTable(htd, startKey, endKey, numRegions);
359 
360     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
361         Table table = connection.getTable(htd.getTableName())) {
362 
363       for (int i = 1; i < splits.length; i++) {
364         for (int j = 0; j < regionReplication; j++) {
365           Get get = new Get(splits[i]);
366           get.setConsistency(Consistency.TIMELINE);
367           get.setReplicaId(j);
368           table.get(get); // this should not block. Regions should be coming online
369         }
370       }
371     }
372 
373     HTU.deleteTableIfAny(TableName.valueOf(tableName));
374   }
375 }