View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNull;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.Coprocessor;
32  import org.apache.hadoop.hbase.CoprocessorEnvironment;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.HRegionLocation;
36  import org.apache.hadoop.hbase.ServerName;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.RegionLocator;
41  import org.apache.hadoop.hbase.client.Table;
42  import org.apache.hadoop.hbase.client.coprocessor.Batch;
43  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
44  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
45  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
46  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
47  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos;
48  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountRequest;
49  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountResponse;
50  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloRequest;
51  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloResponse;
52  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountRequest;
53  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountResponse;
54  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopRequest;
55  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopResponse;
56  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingRequest;
57  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingResponse;
58  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
59  import org.apache.hadoop.hbase.testclassification.MediumTests;
60  import org.apache.hadoop.hbase.util.Bytes;
61  import org.junit.After;
62  import org.junit.AfterClass;
63  import org.junit.Before;
64  import org.junit.BeforeClass;
65  import org.junit.Test;
66  import org.junit.experimental.categories.Category;
67  
68  import com.google.protobuf.RpcCallback;
69  import com.google.protobuf.RpcController;
70  import com.google.protobuf.Service;
71  import com.google.protobuf.ServiceException;
72  
73  @Category(MediumTests.class)
74  public class TestServerCustomProtocol {
75    private static final Log LOG = LogFactory.getLog(TestServerCustomProtocol.class);
76    static final String WHOAREYOU = "Who are you?";
77    static final String NOBODY = "nobody";
78    static final String HELLO = "Hello, ";
79  
80    /* Test protocol implementation */
81    public static class PingHandler extends PingProtos.PingService
82    implements Coprocessor, CoprocessorService {
83      private int counter = 0;
84  
85      @Override
86      public void start(CoprocessorEnvironment env) throws IOException {
87        if (env instanceof RegionCoprocessorEnvironment) return;
88        throw new CoprocessorException("Must be loaded on a table region!");
89      }
90  
91      @Override
92      public void stop(CoprocessorEnvironment env) throws IOException {
93        // Nothing to do.
94      }
95  
96      @Override
97      public void ping(RpcController controller, PingRequest request,
98          RpcCallback<PingResponse> done) {
99        this.counter++;
100       done.run(PingResponse.newBuilder().setPong("pong").build());
101     }
102 
103     @Override
104     public void count(RpcController controller, CountRequest request,
105         RpcCallback<CountResponse> done) {
106       done.run(CountResponse.newBuilder().setCount(this.counter).build());
107     }
108 
109     @Override
110     public void increment(RpcController controller,
111         IncrementCountRequest request, RpcCallback<IncrementCountResponse> done) {
112       this.counter += request.getDiff();
113       done.run(IncrementCountResponse.newBuilder().setCount(this.counter).build());
114     }
115 
116     @Override
117     public void hello(RpcController controller, HelloRequest request,
118         RpcCallback<HelloResponse> done) {
119       if (!request.hasName()) done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build());
120       else if (request.getName().equals(NOBODY)) done.run(HelloResponse.newBuilder().build());
121       else done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build());
122     }
123 
124     @Override
125     public void noop(RpcController controller, NoopRequest request,
126         RpcCallback<NoopResponse> done) {
127       done.run(NoopResponse.newBuilder().build());
128     }
129 
130     @Override
131     public Service getService() {
132       return this;
133     }
134   }
135 
136   private static final TableName TEST_TABLE = TableName.valueOf("test");
137   private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
138 
139   private static final byte[] ROW_A = Bytes.toBytes("aaa");
140   private static final byte[] ROW_B = Bytes.toBytes("bbb");
141   private static final byte[] ROW_C = Bytes.toBytes("ccc");
142 
143   private static final byte[] ROW_AB = Bytes.toBytes("abb");
144   private static final byte[] ROW_BC = Bytes.toBytes("bcc");
145 
146   private static HBaseTestingUtility util = new HBaseTestingUtility();
147 
148   @BeforeClass
149   public static void setupBeforeClass() throws Exception {
150     util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
151       PingHandler.class.getName());
152     util.startMiniCluster();
153   }
154 
155   @Before
156   public void before()  throws Exception {
157     final byte[][] SPLIT_KEYS = new byte[][] { ROW_B, ROW_C };
158     HTable table = util.createTable(TEST_TABLE, TEST_FAMILY, SPLIT_KEYS);
159 
160     Put puta = new Put( ROW_A );
161     puta.add(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
162     table.put(puta);
163 
164     Put putb = new Put( ROW_B );
165     putb.add(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
166     table.put(putb);
167 
168     Put putc = new Put( ROW_C );
169     putc.add(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
170     table.put(putc);
171   }
172 
173   @After
174   public void after() throws Exception {
175     util.deleteTable(TEST_TABLE);
176   }
177 
178   @AfterClass
179   public static void tearDownAfterClass() throws Exception {
180     util.shutdownMiniCluster();
181   }
182 
183   @Test
184   public void testSingleProxy() throws Throwable {
185     Table table = new HTable(util.getConfiguration(), TEST_TABLE);
186     Map<byte [], String> results = ping(table, null, null);
187     // There are three regions so should get back three results.
188     assertEquals(3, results.size());
189     for (Map.Entry<byte [], String> e: results.entrySet()) {
190       assertEquals("Invalid custom protocol response", "pong", e.getValue());
191     }
192     hello(table, "George", HELLO + "George");
193     LOG.info("Did george");
194     hello(table, null, "Who are you?");
195     LOG.info("Who are you");
196     hello(table, NOBODY, null);
197     LOG.info(NOBODY);
198     Map<byte [], Integer> intResults = table.coprocessorService(PingProtos.PingService.class,
199       null, null,
200       new Batch.Call<PingProtos.PingService, Integer>() {
201         @Override
202         public Integer call(PingProtos.PingService instance) throws IOException {
203           BlockingRpcCallback<PingProtos.CountResponse> rpcCallback =
204             new BlockingRpcCallback<PingProtos.CountResponse>();
205           instance.count(null, PingProtos.CountRequest.newBuilder().build(), rpcCallback);
206           return rpcCallback.get().getCount();
207         }
208       });
209     int count = -1;
210     for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
211       assertTrue(e.getValue() > 0);
212       count = e.getValue();
213     }
214     final int diff = 5;
215     intResults = table.coprocessorService(PingProtos.PingService.class,
216       null, null,
217       new Batch.Call<PingProtos.PingService, Integer>() {
218         @Override
219         public Integer call(PingProtos.PingService instance) throws IOException {
220           BlockingRpcCallback<PingProtos.IncrementCountResponse> rpcCallback =
221             new BlockingRpcCallback<PingProtos.IncrementCountResponse>();
222           instance.increment(null, PingProtos.IncrementCountRequest.newBuilder().setDiff(diff).build(),
223             rpcCallback);
224           return rpcCallback.get().getCount();
225         }
226       });
227     // There are three regions so should get back three results.
228     assertEquals(3, results.size());
229     for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
230       assertEquals(e.getValue().intValue(), count + diff);
231     }
232     table.close();
233   }
234 
235   private Map<byte [], String> hello(final Table table, final String send, final String response)
236   throws ServiceException, Throwable {
237     Map<byte [], String> results = hello(table, send);
238     for (Map.Entry<byte [], String> e: results.entrySet()) {
239       assertEquals("Invalid custom protocol response", response, e.getValue());
240     }
241     return results;
242   }
243 
244   private Map<byte [], String> hello(final Table table, final String send)
245   throws ServiceException, Throwable {
246     return hello(table, send, null, null);
247   }
248 
249   private Map<byte [], String> hello(final Table table, final String send, final byte [] start,
250       final byte [] end)
251   throws ServiceException, Throwable {
252     return table.coprocessorService(PingProtos.PingService.class,
253         start, end,
254         new Batch.Call<PingProtos.PingService, String>() {
255           @Override
256           public String call(PingProtos.PingService instance) throws IOException {
257             BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
258               new BlockingRpcCallback<PingProtos.HelloResponse>();
259             PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
260             if (send != null) builder.setName(send);
261             instance.hello(null, builder.build(), rpcCallback);
262             PingProtos.HelloResponse r = rpcCallback.get();
263             return r != null && r.hasResponse()? r.getResponse(): null;
264           }
265         });
266   }
267 
268   private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start,
269       final byte [] end)
270   throws ServiceException, Throwable {
271     return table.coprocessorService(PingProtos.PingService.class,
272         start, end,
273         new Batch.Call<PingProtos.PingService, String>() {
274           @Override
275           public String call(PingProtos.PingService instance) throws IOException {
276             BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
277               new BlockingRpcCallback<PingProtos.HelloResponse>();
278             PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
279             // Call ping on same instance.  Use result calling hello on same instance.
280             builder.setName(doPing(instance));
281             instance.hello(null, builder.build(), rpcCallback);
282             PingProtos.HelloResponse r = rpcCallback.get();
283             return r != null && r.hasResponse()? r.getResponse(): null;
284           }
285         });
286   }
287 
288   private Map<byte [], String> noop(final Table table, final byte [] start,
289       final byte [] end)
290   throws ServiceException, Throwable {
291     return table.coprocessorService(PingProtos.PingService.class, start, end,
292         new Batch.Call<PingProtos.PingService, String>() {
293           @Override
294           public String call(PingProtos.PingService instance) throws IOException {
295             BlockingRpcCallback<PingProtos.NoopResponse> rpcCallback =
296               new BlockingRpcCallback<PingProtos.NoopResponse>();
297             PingProtos.NoopRequest.Builder builder = PingProtos.NoopRequest.newBuilder();
298             instance.noop(null, builder.build(), rpcCallback);
299             rpcCallback.get();
300             // Looks like null is expected when void.  That is what the test below is looking for
301             return null;
302           }
303         });
304   }
305 
306   @Test
307   public void testSingleMethod() throws Throwable {
308     try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
309       RegionLocator locator = table.getRegionLocator();
310       Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class,
311         null, ROW_A,
312         new Batch.Call<PingProtos.PingService, String>() {
313           @Override
314           public String call(PingProtos.PingService instance) throws IOException {
315             BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
316               new BlockingRpcCallback<PingProtos.PingResponse>();
317             instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
318             return rpcCallback.get().getPong();
319           }
320         });
321       // Should have gotten results for 1 of the three regions only since we specified
322       // rows from 1 region
323       assertEquals(1, results.size());
324       verifyRegionResults(locator, results, ROW_A);
325   
326       final String name = "NAME";
327       results = hello(table, name, null, ROW_A);
328       // Should have gotten results for 1 of the three regions only since we specified
329       // rows from 1 region
330       assertEquals(1, results.size());
331       verifyRegionResults(locator, results, "Hello, NAME", ROW_A);
332     }
333   }
334 
335   @Test
336   public void testRowRange() throws Throwable {
337     try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
338       RegionLocator locator = table.getRegionLocator();
339       for (Entry<HRegionInfo, ServerName> e: table.getRegionLocations().entrySet()) {
340         LOG.info("Region " + e.getKey().getRegionNameAsString() + ", servername=" + e.getValue());
341       }
342       // Here are what regions looked like on a run:
343       //
344       // test,,1355943549657.c65d4822d8bdecc033a96451f3a0f55d.
345       // test,bbb,1355943549661.110393b070dd1ed93441e0bc9b3ffb7e.
346       // test,ccc,1355943549665.c3d6d125141359cbbd2a43eaff3cdf74.
347   
348       Map<byte [], String> results = ping(table, null, ROW_A);
349       // Should contain first region only.
350       assertEquals(1, results.size());
351       verifyRegionResults(locator, results, ROW_A);
352   
353       // Test start row + empty end
354       results = ping(table, ROW_BC, null);
355       assertEquals(2, results.size());
356       // should contain last 2 regions
357       HRegionLocation loc = table.getRegionLocation(ROW_A, true);
358       assertNull("Should be missing region for row aaa (prior to start row)",
359         results.get(loc.getRegionInfo().getRegionName()));
360       verifyRegionResults(locator, results, ROW_B);
361       verifyRegionResults(locator, results, ROW_C);
362   
363       // test empty start + end
364       results = ping(table, null, ROW_BC);
365       // should contain the first 2 regions
366       assertEquals(2, results.size());
367       verifyRegionResults(locator, results, ROW_A);
368       verifyRegionResults(locator, results, ROW_B);
369       loc = table.getRegionLocation(ROW_C, true);
370       assertNull("Should be missing region for row ccc (past stop row)",
371           results.get(loc.getRegionInfo().getRegionName()));
372   
373       // test explicit start + end
374       results = ping(table, ROW_AB, ROW_BC);
375       // should contain first 2 regions
376       assertEquals(2, results.size());
377       verifyRegionResults(locator, results, ROW_A);
378       verifyRegionResults(locator, results, ROW_B);
379       loc = table.getRegionLocation(ROW_C, true);
380       assertNull("Should be missing region for row ccc (past stop row)",
381           results.get(loc.getRegionInfo().getRegionName()));
382   
383       // test single region
384       results = ping(table, ROW_B, ROW_BC);
385       // should only contain region bbb
386       assertEquals(1, results.size());
387       verifyRegionResults(locator, results, ROW_B);
388       loc = table.getRegionLocation(ROW_A, true);
389       assertNull("Should be missing region for row aaa (prior to start)",
390           results.get(loc.getRegionInfo().getRegionName()));
391       loc = table.getRegionLocation(ROW_C, true);
392       assertNull("Should be missing region for row ccc (past stop row)",
393           results.get(loc.getRegionInfo().getRegionName()));
394     }
395   }
396 
397   private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end)
398   throws ServiceException, Throwable {
399     return table.coprocessorService(PingProtos.PingService.class, start, end,
400       new Batch.Call<PingProtos.PingService, String>() {
401         @Override
402         public String call(PingProtos.PingService instance) throws IOException {
403           return doPing(instance);
404         }
405       });
406   }
407 
408   private static String doPing(PingProtos.PingService instance) throws IOException {
409     BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
410         new BlockingRpcCallback<PingProtos.PingResponse>();
411       instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
412       return rpcCallback.get().getPong();
413   }
414 
415   @Test
416   public void testCompoundCall() throws Throwable {
417     try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
418       RegionLocator locator = table.getRegionLocator();
419       Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C);
420       verifyRegionResults(locator, results, "Hello, pong", ROW_A);
421       verifyRegionResults(locator, results, "Hello, pong", ROW_B);
422       verifyRegionResults(locator, results, "Hello, pong", ROW_C);
423     }
424   }
425 
426   @Test
427   public void testNullCall() throws Throwable {
428     try(HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
429       RegionLocator locator = table.getRegionLocator();
430       Map<byte[],String> results = hello(table, null, ROW_A, ROW_C);
431       verifyRegionResults(locator, results, "Who are you?", ROW_A);
432       verifyRegionResults(locator, results, "Who are you?", ROW_B);
433       verifyRegionResults(locator, results, "Who are you?", ROW_C);
434     }
435   }
436 
437   @Test
438   public void testNullReturn() throws Throwable {
439     try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
440       RegionLocator locator = table.getRegionLocator();
441       Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C);
442       verifyRegionResults(locator, results, null, ROW_A);
443       verifyRegionResults(locator, results, null, ROW_B);
444       verifyRegionResults(locator, results, null, ROW_C);
445     }
446   }
447 
448   @Test
449   public void testEmptyReturnType() throws Throwable {
450     try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
451       Map<byte[],String> results = noop(table, ROW_A, ROW_C);
452       assertEquals("Should have results from three regions", 3, results.size());
453       // all results should be null
454       for (Object v : results.values()) {
455         assertNull(v);
456       }
457     }
458   }
459 
460   private void verifyRegionResults(RegionLocator table,
461       Map<byte[],String> results, byte[] row) throws Exception {
462     verifyRegionResults(table, results, "pong", row);
463   }
464 
465   private void verifyRegionResults(RegionLocator regionLocator,
466       Map<byte[], String> results, String expected, byte[] row)
467   throws Exception {
468     for (Map.Entry<byte [], String> e: results.entrySet()) {
469       LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected +
470        ", result key=" + Bytes.toString(e.getKey()) +
471        ", value=" + e.getValue());
472     }
473     HRegionLocation loc = regionLocator.getRegionLocation(row, true);
474     byte[] region = loc.getRegionInfo().getRegionName();
475     assertTrue("Results should contain region " +
476       Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'",
477       results.containsKey(region));
478     assertEquals("Invalid result for row '"+Bytes.toStringBinary(row)+"'",
479       expected, results.get(region));
480   }
481 }