View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  
24  import java.io.IOException;
25  import java.util.List;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.CellScannable;
30  import org.apache.hadoop.hbase.CellScanner;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.testclassification.MediumTests;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
36  import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
37  import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
38  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
39  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.junit.AfterClass;
42  import org.junit.BeforeClass;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  
46  import com.google.common.collect.Lists;
47  
48  @Category(MediumTests.class)
49  public class TestRpcControllerFactory {
50  
51    public static class StaticRpcControllerFactory extends RpcControllerFactory {
52  
53      public StaticRpcControllerFactory(Configuration conf) {
54        super(conf);
55      }
56  
57      @Override
58      public PayloadCarryingRpcController newController() {
59        return new CountingRpcController(super.newController());
60      }
61  
62      @Override
63      public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
64        return new CountingRpcController(super.newController(cellScanner));
65      }
66  
67      @Override
68      public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
69        return new CountingRpcController(super.newController(cellIterables));
70      }
71    }
72  
73    public static class CountingRpcController extends DelegatingPayloadCarryingRpcController {
74  
75      private static AtomicInteger INT_PRIORITY = new AtomicInteger();
76      private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
77  
78      public CountingRpcController(PayloadCarryingRpcController delegate) {
79        super(delegate);
80      }
81  
82      @Override
83      public void setPriority(int priority) {
84        super.setPriority(priority);
85        INT_PRIORITY.incrementAndGet();
86      }
87  
88      @Override
89      public void setPriority(TableName tn) {
90        super.setPriority(tn);
91        // ignore counts for system tables - it could change and we really only want to check on what
92        // the client should change
93        if (!tn.isSystemTable()) {
94          TABLE_PRIORITY.incrementAndGet();
95        }
96  
97      }
98    }
99  
100   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
101 
102   @BeforeClass
103   public static void setup() throws Exception {
104     // load an endpoint so we have an endpoint to test - it doesn't matter which one, but
105     // this is already in tests, so we can just use it.
106     Configuration conf = UTIL.getConfiguration();
107     conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
108       ProtobufCoprocessorService.class.getName());
109 
110     UTIL.startMiniCluster();
111   }
112 
113   @AfterClass
114   public static void teardown() throws Exception {
115     UTIL.shutdownMiniCluster();
116   }
117 
118   /**
119    * check some of the methods and make sure we are incrementing each time. Its a bit tediuous to
120    * cover all methods here and really is a bit brittle since we can always add new methods but
121    * won't be sure to add them here. So we just can cover the major ones.
122    * @throws Exception on failure
123    */
124   @Test
125   public void testCountController() throws Exception {
126     Configuration conf = new Configuration(UTIL.getConfiguration());
127     // setup our custom controller
128     conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
129       StaticRpcControllerFactory.class.getName());
130 
131     TableName name = TableName.valueOf("testcustomcontroller");
132     UTIL.createTable(name, fam1).close();
133 
134     // change one of the connection properties so we get a new HConnection with our configuration
135     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
136 
137     Connection connection = ConnectionFactory.createConnection(conf);
138     Table table = connection.getTable(name);
139     byte[] row = Bytes.toBytes("row");
140     Put p = new Put(row);
141     p.add(fam1, fam1, Bytes.toBytes("val0"));
142     table.put(p);
143 
144     Integer counter = 1;
145     counter = verifyCount(counter);
146 
147     Delete d = new Delete(row);
148     d.addColumn(fam1, fam1);
149     table.delete(d);
150     counter = verifyCount(counter);
151 
152     Put p2 = new Put(row);
153     p2.add(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
154     table.batch(Lists.newArrayList(p, p2), new Object[2]);
155     // this only goes to a single server, so we don't need to change the count here
156     counter = verifyCount(counter);
157 
158     Append append = new Append(row);
159     append.add(fam1, fam1, Bytes.toBytes("val2"));
160     table.append(append);
161     counter = verifyCount(counter);
162 
163     // and check the major lookup calls as well
164     Get g = new Get(row);
165     table.get(g);
166     counter = verifyCount(counter);
167 
168     ResultScanner scan = table.getScanner(fam1);
169     scan.next();
170     scan.close();
171     counter = verifyCount(counter);
172 
173     Get g2 = new Get(row);
174     table.get(Lists.newArrayList(g, g2));
175     // same server, so same as above for not changing count
176     counter = verifyCount(counter);
177 
178     // make sure all the scanner types are covered
179     Scan scanInfo = new Scan(row);
180     // regular small
181     scanInfo.setSmall(true);
182     counter = doScan(table, scanInfo, counter);
183 
184     // reversed, small
185     scanInfo.setReversed(true);
186     counter = doScan(table, scanInfo, counter);
187 
188     // reversed, regular
189     scanInfo.setSmall(false);
190     counter = doScan(table, scanInfo, counter);
191 
192     table.close();
193   }
194 
195   int doScan(Table table, Scan scan, int expectedCount) throws IOException {
196     ResultScanner results = table.getScanner(scan);
197     results.next();
198     results.close();
199     return verifyCount(expectedCount);
200   }
201 
202   int verifyCount(Integer counter) {
203     assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
204     assertEquals(0, CountingRpcController.INT_PRIORITY.get());
205     return counter + 1;
206   }
207 
208   @Test
209   public void testFallbackToDefaultRpcControllerFactory() {
210     Configuration conf = new Configuration(UTIL.getConfiguration());
211     conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, "foo.bar.Baz");
212 
213     // Should not fail
214     RpcControllerFactory factory = RpcControllerFactory.instantiate(conf);
215     assertNotNull(factory);
216     assertEquals(factory.getClass(), RpcControllerFactory.class);
217   }
218 }