View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.net.InetSocketAddress;
27  import java.util.ArrayList;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Random;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicReference;
34  import java.util.concurrent.locks.ReadWriteLock;
35  import java.util.concurrent.locks.ReentrantReadWriteLock;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.CellScanner;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.codec.Codec;
44  import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
45  import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
46  import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
47  import org.apache.hadoop.hbase.ipc.RpcClientImpl;
48  import org.apache.hadoop.hbase.ipc.RpcScheduler;
49  import org.apache.hadoop.hbase.ipc.RpcServer;
50  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
51  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
52  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
53  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
54  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
55  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
56  import org.apache.hadoop.hbase.security.User;
57  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
58  import org.apache.hadoop.hbase.util.Pair;
59  import org.apache.hadoop.hbase.util.Threads;
60  import org.junit.Ignore;
61  import org.junit.Test;
62  import org.junit.experimental.categories.Category;
63  import com.google.common.collect.Lists;
64  import com.google.protobuf.BlockingService;
65  import com.google.protobuf.Message;
66  import com.google.protobuf.RpcController;
67  import com.google.protobuf.ServiceException;
68  import com.google.protobuf.Descriptors.MethodDescriptor;
69  
70  @Category(IntegrationTests.class)
71  public class IntegrationTestRpcClient {
72  
73    private static final Log LOG = LogFactory.getLog(IntegrationTestRpcClient.class);
74  
75    private final Configuration conf;
76  
77    private int numIterations = 10;
78  
79    public IntegrationTestRpcClient() {
80      conf = HBaseConfiguration.create();
81    }
82  
83    static class TestRpcServer extends RpcServer {
84  
85      TestRpcServer(Configuration conf) throws IOException {
86        this(new FifoRpcScheduler(conf, 1), conf);
87      }
88  
89      TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
90        super(null, "testRpcServer", Lists
91            .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
92            "localhost", 0), conf, scheduler);
93      }
94  
95      @Override
96      public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
97          Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
98          throws IOException {
99        return super.call(service, md, param, cellScanner, receiveTime, status);
100     }
101   }
102 
103   static final BlockingService SERVICE =
104       TestRpcServiceProtos.TestProtobufRpcProto
105       .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
106 
107         @Override
108         public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
109             throws ServiceException {
110           return null;
111         }
112 
113         @Override
114         public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
115             throws ServiceException {
116           return null;
117         }
118 
119         @Override
120         public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
121             throws ServiceException {
122           return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
123         }
124       });
125 
126   protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
127     return isSyncClient ?
128         new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
129           new AsyncRpcClient(conf) {
130           @Override
131           Codec getCodec() {
132             return null;
133           }
134         };
135   }
136 
137   static String BIG_PAYLOAD;
138 
139   static {
140     StringBuilder builder = new StringBuilder();
141 
142     while (builder.length() < 1024 * 1024) { // 2 MB
143       builder.append("big.payload.");
144     }
145 
146     BIG_PAYLOAD = builder.toString();
147   }
148 
149   class Cluster {
150     Random random = new Random();
151     ReadWriteLock lock = new ReentrantReadWriteLock();
152     HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
153     List<TestRpcServer> serverList = new ArrayList<>();
154     int maxServers;
155     int minServers;
156 
157     Cluster(int minServers, int maxServers) {
158       this.minServers = minServers;
159       this.maxServers = maxServers;
160     }
161 
162     TestRpcServer startServer() throws IOException {
163       lock.writeLock().lock();
164       try {
165         if (rpcServers.size() >= maxServers) {
166           return null;
167         }
168 
169         TestRpcServer rpcServer = new TestRpcServer(conf);
170         rpcServer.start();
171         InetSocketAddress address = rpcServer.getListenerAddress();        
172         if (address == null) {
173           throw new IOException("Listener channel is closed");
174         }
175         rpcServers.put(address, rpcServer);
176         serverList.add(rpcServer);
177         LOG.info("Started server: " + address);
178         return rpcServer;
179       } finally {
180         lock.writeLock().unlock();
181       }
182     }
183 
184     void stopRandomServer() throws Exception {
185       lock.writeLock().lock();
186       TestRpcServer rpcServer = null;
187       try {
188         if (rpcServers.size() <= minServers) {
189           return;
190         }
191         int size = rpcServers.size();
192         int rand = random.nextInt(size);
193         rpcServer = serverList.remove(rand);
194         InetSocketAddress address = rpcServer.getListenerAddress();
195         if (address == null) {
196           // Throw exception here. We can't remove this instance from the server map because
197           // we no longer have access to its map key
198           throw new IOException("Listener channel is closed");
199         }
200         rpcServers.remove(address);
201 
202         if (rpcServer != null) {
203           stopServer(rpcServer);
204         }
205       } finally {
206         lock.writeLock().unlock();
207       }
208     }
209 
210     void stopServer(TestRpcServer rpcServer) throws InterruptedException {
211       InetSocketAddress address = rpcServer.getListenerAddress();
212       LOG.info("Stopping server: " + address);
213       rpcServer.stop();
214       rpcServer.join();
215       LOG.info("Stopped server: " + address);
216     }
217 
218     void stopRunning() throws InterruptedException {
219       lock.writeLock().lock();
220       try {
221         for (TestRpcServer rpcServer : serverList) {
222           stopServer(rpcServer);
223         }
224 
225       } finally {
226         lock.writeLock().unlock();
227       }
228     }
229 
230     TestRpcServer getRandomServer() {
231       lock.readLock().lock();
232       try {
233         int size = rpcServers.size();
234         int rand = random.nextInt(size);
235         return serverList.get(rand);
236       } finally {
237         lock.readLock().unlock();
238       }
239     }
240   }
241 
242   static class MiniChaosMonkey extends Thread {
243     AtomicBoolean running = new  AtomicBoolean(true);
244     Random random = new Random();
245     AtomicReference<Exception> exception = new AtomicReference<>(null);
246     Cluster cluster;
247 
248     public MiniChaosMonkey(Cluster cluster) {
249       this.cluster = cluster;
250     }
251 
252     @Override
253     public void run() {
254       while (running.get()) {
255         switch (random.nextInt() % 2) {
256         case 0: //start a server
257           try {
258             cluster.startServer();
259           } catch (Exception e) {
260             LOG.warn(e);
261             exception.compareAndSet(null, e);
262           }
263           break;
264 
265         case 1: // stop a server
266           try {
267             cluster.stopRandomServer();
268           } catch (Exception e) {
269             LOG.warn(e);
270             exception.compareAndSet(null, e);
271           }
272         default:
273         }
274 
275         Threads.sleep(100);
276       }
277     }
278 
279     void stopRunning() {
280       running.set(false);
281     }
282 
283     void rethrowException() throws Exception {
284       if (exception.get() != null) {
285         throw exception.get();
286       }
287     }
288   }
289 
290   static class SimpleClient extends Thread {
291     AbstractRpcClient rpcClient;
292     AtomicBoolean running = new  AtomicBoolean(true);
293     AtomicReference<Throwable> exception = new AtomicReference<>(null);
294     Cluster cluster;
295     String id;
296     long numCalls = 0;
297     Random random = new Random();
298 
299     public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) {
300       this.cluster = cluster;
301       this.rpcClient = rpcClient;
302       this.id = id;
303     }
304 
305     @Override
306     public void run() {
307       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
308 
309       while (running.get()) {
310         boolean isBigPayload = random.nextBoolean();
311         String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
312         EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
313         EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build();
314 
315         TestRpcServer server = cluster.getRandomServer();
316         try {
317           User user = User.getCurrent();
318           InetSocketAddress address = server.getListenerAddress();
319           if (address == null) {
320             throw new IOException("Listener channel is closed");
321           }
322           ret = (EchoResponseProto)
323               rpcClient.callBlockingMethod(md, null, param, ret, user, address);
324         } catch (Exception e) {
325           LOG.warn(e);
326           continue; // expected in case connection is closing or closed
327         }
328 
329         try {
330           assertNotNull(ret);
331           assertEquals(message, ret.getMessage());
332         } catch (Throwable t) {
333           exception.compareAndSet(null, t);
334         }
335 
336         numCalls++;
337       }
338     }
339 
340     void stopRunning() {
341       running.set(false);
342     }
343 
344     void rethrowException() throws Throwable {
345       if (exception.get() != null) {
346         throw exception.get();
347       }
348     }
349   }
350 
351   @Test (timeout = 900000)
352   public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
353     for (int i = 0; i < numIterations; i++) {
354       TimeoutThread.runWithTimeout(new Callable<Void>() {
355         @Override
356         public Void call() throws Exception {
357           try {
358             testRpcWithChaosMonkey(true);
359           } catch (Throwable e) {
360             if (e instanceof Exception) {
361               throw (Exception)e;
362             } else {
363               throw new Exception(e);
364             }
365           }
366           return null;
367         }
368       }, 90000);
369     }
370   }
371 
372   @Test (timeout = 900000)
373   @Ignore // TODO: test fails with async client
374   public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
375     for (int i = 0; i < numIterations; i++) {
376       TimeoutThread.runWithTimeout(new Callable<Void>() {
377         @Override
378         public Void call() throws Exception {
379           try {
380             testRpcWithChaosMonkey(false);
381           } catch (Throwable e) {
382             if (e instanceof Exception) {
383               throw (Exception)e;
384             } else {
385               throw new Exception(e);
386             }
387           }
388           return null;
389         }
390       }, 90000);
391     }
392   }
393 
394   static class TimeoutThread extends Thread {
395     long timeout;
396     public TimeoutThread(long timeout) {
397       this.timeout = timeout;
398     }
399 
400     @Override
401     public void run() {
402       try {
403         Thread.sleep(timeout);
404         Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
405         System.exit(1); // a timeout happened
406       } catch (InterruptedException e) {
407         // this is what we want
408       }
409     }
410 
411     // runs in the same thread context but injects a timeout thread which will exit the JVM on
412     // timeout
413     static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
414       TimeoutThread thread = new TimeoutThread(timeout);
415       thread.start();
416       callable.call();
417       thread.interrupt();
418     }
419   }
420 
421   public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
422     LOG.info("Starting test");
423     Cluster cluster = new Cluster(10, 100);
424     for (int i = 0; i < 10; i++) {
425       cluster.startServer();
426     }
427 
428     ArrayList<SimpleClient> clients = new ArrayList<>();
429 
430     // all threads should share the same rpc client
431     AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient);
432 
433     for (int i = 0; i < 30; i++) {
434       String clientId = "client_" + i + "_";
435       LOG.info("Starting client: " + clientId);
436       SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
437       client.start();
438       clients.add(client);
439     }
440 
441     LOG.info("Starting MiniChaosMonkey");
442     MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
443     cm.start();
444 
445     Threads.sleep(30000);
446 
447     LOG.info("Stopping MiniChaosMonkey");
448     cm.stopRunning();
449     cm.join();
450     cm.rethrowException();
451 
452     LOG.info("Stopping clients");
453     for (SimpleClient client : clients) {
454       LOG.info("Stopping client: " + client.id);
455       LOG.info(client.id + " numCalls:" + client.numCalls);
456       client.stopRunning();
457       client.join();
458       client.rethrowException();
459       assertTrue(client.numCalls > 10);
460     }
461 
462     LOG.info("Stopping RpcClient");
463     rpcClient.close();
464 
465     LOG.info("Stopping Cluster");
466     cluster.stopRunning();
467   }
468 }