View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  import static org.mockito.Matchers.anyObject;
24  import static org.mockito.Mockito.spy;
25  import static org.mockito.Mockito.verify;
26  import static org.mockito.internal.verification.VerificationModeFactory.times;
27  
28  import java.io.IOException;
29  import java.net.ConnectException;
30  import java.net.InetAddress;
31  import java.net.InetSocketAddress;
32  import java.net.SocketTimeoutException;
33  import java.util.ArrayList;
34  import java.util.List;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellScanner;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.client.MetricsConnection;
47  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
48  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
49  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
50  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
51  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
52  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
53  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
54  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
55  import org.apache.hadoop.hbase.security.User;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.apache.hadoop.hbase.util.Pair;
58  import org.apache.hadoop.io.compress.GzipCodec;
59  import org.apache.hadoop.util.StringUtils;
60  import org.junit.Assert;
61  import org.junit.Test;
62  
63  import com.google.common.collect.ImmutableList;
64  import com.google.common.collect.Lists;
65  import com.google.protobuf.BlockingRpcChannel;
66  import com.google.protobuf.BlockingService;
67  import com.google.protobuf.Descriptors.MethodDescriptor;
68  import com.google.protobuf.Message;
69  import com.google.protobuf.RpcController;
70  import com.google.protobuf.ServiceException;
71  
72  /**
73   * Some basic ipc tests.
74   */
75  public abstract class AbstractTestIPC {
76  
77    private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
78  
79    private static byte[] CELL_BYTES = Bytes.toBytes("xyz");
80    private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
81    static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
82    static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
83    static final Configuration CONF = HBaseConfiguration.create();
84    // We are using the test TestRpcServiceProtos generated classes and Service because they are
85    // available and basic with methods like 'echo', and ping. Below we make a blocking service
86    // by passing in implementation of blocking interface. We use this service in all tests that
87    // follow.
88    static final BlockingService SERVICE =
89        TestRpcServiceProtos.TestProtobufRpcProto
90            .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
91  
92              @Override
93              public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
94                  throws ServiceException {
95                return null;
96              }
97  
98              @Override
99              public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
100                 throws ServiceException {
101               return null;
102             }
103 
104             @Override
105             public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
106                 throws ServiceException {
107               if (controller instanceof PayloadCarryingRpcController) {
108                 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
109                 // If cells, scan them to check we are able to iterate what we were given and since
110                 // this is
111                 // an echo, just put them back on the controller creating a new block. Tests our
112                 // block
113                 // building.
114                 CellScanner cellScanner = pcrc.cellScanner();
115                 List<Cell> list = null;
116                 if (cellScanner != null) {
117                   list = new ArrayList<Cell>();
118                   try {
119                     while (cellScanner.advance()) {
120                       list.add(cellScanner.current());
121                     }
122                   } catch (IOException e) {
123                     throw new ServiceException(e);
124                   }
125                 }
126                 cellScanner = CellUtil.createCellScanner(list);
127                 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
128               }
129               return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
130             }
131           });
132 
133   /**
134    * Instance of server. We actually don't do anything speical in here so could just use
135    * HBaseRpcServer directly.
136    */
137   static class TestRpcServer extends RpcServer {
138 
139     TestRpcServer() throws IOException {
140       this(new FifoRpcScheduler(CONF, 1));
141     }
142 
143     TestRpcServer(RpcScheduler scheduler) throws IOException {
144       super(null, "testRpcServer", Lists
145           .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
146           "localhost", 0), CONF, scheduler);
147     }
148 
149     @Override
150     public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
151         Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
152         throws IOException {
153       return super.call(service, md, param, cellScanner, receiveTime, status);
154     }
155   }
156 
157   protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
158 
159   /**
160    * Ensure we do not HAVE TO HAVE a codec.
161    * @throws InterruptedException
162    * @throws IOException
163    */
164   @Test
165   public void testNoCodec() throws InterruptedException, IOException {
166     Configuration conf = HBaseConfiguration.create();
167     AbstractRpcClient client = createRpcClientNoCodec(conf);
168     TestRpcServer rpcServer = new TestRpcServer();
169     try {
170       rpcServer.start();
171       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
172       final String message = "hello";
173       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
174       InetSocketAddress address = rpcServer.getListenerAddress();
175       if (address == null) {
176         throw new IOException("Listener channel is closed");
177       }
178       Pair<Message, CellScanner> r =
179           client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
180               new MetricsConnection.CallStats());
181       assertTrue(r.getSecond() == null);
182       // Silly assertion that the message is in the returned pb.
183       assertTrue(r.getFirst().toString().contains(message));
184     } finally {
185       client.close();
186       rpcServer.stop();
187     }
188   }
189 
190   protected abstract AbstractRpcClient createRpcClient(Configuration conf);
191 
192   /**
193    * It is hard to verify the compression is actually happening under the wraps. Hope that if
194    * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
195    * confirm that compression is happening down in the client and server).
196    * @throws IOException
197    * @throws InterruptedException
198    * @throws SecurityException
199    * @throws NoSuchMethodException
200    */
201   @Test
202   public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException,
203       NoSuchMethodException, ServiceException {
204     Configuration conf = new Configuration(HBaseConfiguration.create());
205     conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
206     List<Cell> cells = new ArrayList<Cell>();
207     int count = 3;
208     for (int i = 0; i < count; i++) {
209       cells.add(CELL);
210     }
211     AbstractRpcClient client = createRpcClient(conf);
212     TestRpcServer rpcServer = new TestRpcServer();
213     try {
214       rpcServer.start();
215       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
216       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
217       PayloadCarryingRpcController pcrc =
218           new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
219       InetSocketAddress address = rpcServer.getListenerAddress();
220       if (address == null) {
221         throw new IOException("Listener channel is closed");
222       }
223       Pair<Message, CellScanner> r =
224           client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
225               new MetricsConnection.CallStats());
226       int index = 0;
227       while (r.getSecond().advance()) {
228         assertTrue(CELL.equals(r.getSecond().current()));
229         index++;
230       }
231       assertEquals(count, index);
232     } finally {
233       client.close();
234       rpcServer.stop();
235     }
236   }
237 
238   protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
239       throws IOException;
240 
241   @Test
242   public void testRTEDuringConnectionSetup() throws Exception {
243     Configuration conf = HBaseConfiguration.create();
244     TestRpcServer rpcServer = new TestRpcServer();
245     AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
246     try {
247       rpcServer.start();
248       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
249       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
250       InetSocketAddress address = rpcServer.getListenerAddress();
251       if (address == null) {
252         throw new IOException("Listener channel is closed");
253       }
254       client.call(null, md, param, null, User.getCurrent(), address,
255           new MetricsConnection.CallStats());
256       fail("Expected an exception to have been thrown!");
257     } catch (Exception e) {
258       LOG.info("Caught expected exception: " + e.toString());
259       assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
260     } finally {
261       client.close();
262       rpcServer.stop();
263     }
264   }
265 
266   /** Tests that the rpc scheduler is called when requests arrive. */
267   @Test
268   public void testRpcScheduler() throws IOException, InterruptedException {
269     RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
270     RpcServer rpcServer = new TestRpcServer(scheduler);
271     verify(scheduler).init((RpcScheduler.Context) anyObject());
272     AbstractRpcClient client = createRpcClient(CONF);
273     try {
274       rpcServer.start();
275       verify(scheduler).start();
276       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
277       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
278       InetSocketAddress address = rpcServer.getListenerAddress();
279       if (address == null) {
280         throw new IOException("Listener channel is closed");
281       }
282       for (int i = 0; i < 10; i++) {
283         client.call(new PayloadCarryingRpcController(
284             CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
285             md.getOutputType().toProto(), User.getCurrent(), address,
286             new MetricsConnection.CallStats());
287       }
288       verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
289     } finally {
290       rpcServer.stop();
291       verify(scheduler).stop();
292     }
293   }
294 
295   /**
296    * Instance of RpcServer that echoes client hostAddress back to client
297    */
298   static class TestRpcServer1 extends RpcServer {
299 
300     private static BlockingInterface SERVICE1 =
301         new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
302           @Override
303           public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request)
304               throws ServiceException {
305             return EmptyResponseProto.newBuilder().build();
306           }
307 
308           @Override
309           public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
310               throws ServiceException {
311             final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress();
312             final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress();
313             return EchoResponseProto.newBuilder().setMessage(message).build();
314           }
315 
316           @Override
317           public EmptyResponseProto error(RpcController unused, EmptyRequestProto request)
318               throws ServiceException {
319             throw new ServiceException("error", new IOException("error"));
320           }
321         };
322 
323     TestRpcServer1() throws IOException {
324       this(new FifoRpcScheduler(CONF, 1));
325     }
326 
327     TestRpcServer1(RpcScheduler scheduler) throws IOException {
328       super(null, "testRemoteAddressInCallObject", Lists
329           .newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto
330               .newReflectiveBlockingService(SERVICE1), null)),
331           new InetSocketAddress("localhost", 0), CONF, scheduler);
332     }
333   }
334 
335   /**
336    * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
337    * remoteAddress set to its Call Object
338    * @throws ServiceException
339    */
340   @Test
341   public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException,
342       ServiceException {
343     final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
344     final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
345     final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
346     final AbstractRpcClient client =
347         new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null);
348     try {
349       rpcServer.start();
350       final InetSocketAddress isa = rpcServer.getListenerAddress();
351       if (isa == null) {
352         throw new IOException("Listener channel is closed");
353       }
354       final BlockingRpcChannel channel =
355           client.createBlockingRpcChannel(
356             ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()),
357             User.getCurrent(), 0);
358       TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
359           TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
360       final EchoRequestProto echoRequest =
361           EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
362       final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
363       Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage());
364     } finally {
365       client.close();
366       rpcServer.stop();
367     }
368   }
369 
370   @Test
371   public void testWrapException() throws Exception {
372     AbstractRpcClient client =
373         (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC");
374     final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
375     assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException);
376     assertTrue(client.wrapException(address,
377       new SocketTimeoutException()) instanceof SocketTimeoutException);
378     assertTrue(client.wrapException(address, new ConnectionClosingException(
379         "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
380     assertTrue(client
381         .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
382         .getCause() instanceof CallTimeoutException);
383   }
384 }