View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.HBaseTestingUtility;
28  import org.apache.hadoop.hbase.HRegionLocation;
29  import org.apache.hadoop.hbase.MiniHBaseCluster;
30  import org.apache.hadoop.hbase.ServerName;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.regionserver.HRegionServer;
33  import org.apache.hadoop.hbase.testclassification.ClientTests;
34  import org.apache.hadoop.hbase.testclassification.LargeTests;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.hbase.util.Pair;
37  import org.junit.AfterClass;
38  import org.junit.BeforeClass;
39  import org.junit.Test;
40  import org.junit.experimental.categories.Category;
41  
42  import static org.junit.Assert.assertEquals;
43  import static org.junit.Assert.assertNotNull;
44  import static org.junit.Assert.assertTrue;
45  
46  @Category({ LargeTests.class, ClientTests.class })
47  public class TestHTableMultiplexerFlushCache {
48    private static final Log LOG = LogFactory.getLog(TestHTableMultiplexerFlushCache.class);
49    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
50    private static byte[] FAMILY = Bytes.toBytes("testFamily");
51    private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1");
52    private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2");
53    private static byte[] VALUE1 = Bytes.toBytes("testValue1");
54    private static byte[] VALUE2 = Bytes.toBytes("testValue2");
55    private static int SLAVES = 3;
56    private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
57  
58    /**
59     * @throws java.lang.Exception
60     */
61    @BeforeClass
62    public static void setUpBeforeClass() throws Exception {
63      TEST_UTIL.startMiniCluster(SLAVES);
64    }
65  
66    /**
67     * @throws java.lang.Exception
68     */
69    @AfterClass
70    public static void tearDownAfterClass() throws Exception {
71      TEST_UTIL.shutdownMiniCluster();
72    }
73  
74    private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality,
75        byte[] value) throws Exception {
76      // verify that the Get returns the correct result
77      Result r;
78      Get get = new Get(row);
79      get.addColumn(family, quality);
80      int nbTry = 0;
81      do {
82        assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50);
83        nbTry++;
84        Thread.sleep(100);
85        r = htable.get(get);
86      } while (r == null || r.getValue(family, quality) == null);
87      assertEquals("value", Bytes.toStringBinary(value),
88        Bytes.toStringBinary(r.getValue(family, quality)));
89    }
90  
91    @Test
92    public void testOnRegionChange() throws Exception {
93      TableName TABLE = TableName.valueOf("testOnRegionChange");
94      final int NUM_REGIONS = 10;
95      HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
96        Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
97  
98      HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), 
99        PER_REGIONSERVER_QUEUE_SIZE);
100     
101     byte[][] startRows = htable.getStartKeys();
102     byte[] row = startRows[1];
103     assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
104 
105     Put put = new Put(row).add(FAMILY, QUALIFIER1, VALUE1);
106     assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
107     
108     checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
109 
110     // Now let's shutdown the regionserver and let regions moved to other servers.
111     HRegionLocation loc = htable.getRegionLocation(row);
112     MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); 
113     hbaseCluster.stopRegionServer(loc.getServerName());
114     TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
115 
116     // put with multiplexer.
117     put = new Put(row).add(FAMILY, QUALIFIER2, VALUE2);
118     assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
119 
120     checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
121   }
122 
123   @Test
124   public void testOnRegionMove() throws Exception {
125     // This test is doing near exactly the same thing that testOnRegionChange but avoiding the
126     // potential to get a ConnectionClosingException. By moving the region, we can be certain that
127     // the connection is still valid and that the implementation is correctly handling an invalid
128     // Region cache (and not just tearing down the entire connection).
129     TableName TABLE = TableName.valueOf("testOnRegionMove");
130     final int NUM_REGIONS = 10;
131     HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
132       Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
133 
134     HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
135       PER_REGIONSERVER_QUEUE_SIZE);
136 
137     final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
138     Pair<byte[][],byte[][]> startEndRows = regionLocator.getStartEndKeys();
139     byte[] row = startEndRows.getFirst()[1];
140     assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
141 
142     Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1);
143     assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
144 
145     checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
146 
147     final HRegionLocation loc = regionLocator.getRegionLocation(row);
148     final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
149     // The current server for the region we're writing to
150     final ServerName originalServer = loc.getServerName();
151     ServerName newServer = null;
152     // Find a new server to move that region to
153     for (int i = 0; i < SLAVES; i++) {
154       HRegionServer rs = hbaseCluster.getRegionServer(0);
155       if (!rs.getServerName().equals(originalServer.getServerName())) {
156         newServer = rs.getServerName();
157         break;
158       }
159     }
160     assertNotNull("Did not find a new RegionServer to use", newServer);
161 
162     // Move the region
163     LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer
164         +  " to " + newServer);
165     TEST_UTIL.getHBaseAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(),
166         Bytes.toBytes(newServer.getServerName()));
167 
168     TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
169 
170     // Send a new Put
171     put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2);
172     assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
173 
174     // We should see the update make it to the new server eventually
175     checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
176   }
177 }