View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import static org.junit.Assert.assertTrue;
21  import static org.junit.Assert.fail;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInitializer;
24  import io.netty.channel.ChannelOutboundHandlerAdapter;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.epoll.EpollEventLoopGroup;
27  import io.netty.channel.nio.NioEventLoopGroup;
28  import io.netty.channel.socket.SocketChannel;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.List;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.CellScannable;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HBaseTestingUtility;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.ServerName;
48  import org.apache.hadoop.hbase.Waiter;
49  import org.apache.hadoop.hbase.client.MetricsConnection;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.RowMutations;
52  import org.apache.hadoop.hbase.codec.Codec;
53  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
54  import org.apache.hadoop.hbase.protobuf.RequestConverter;
55  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
56  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
57  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
58  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
59  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
60  import org.apache.hadoop.hbase.security.User;
61  import org.apache.hadoop.hbase.testclassification.SmallTests;
62  import org.apache.hadoop.util.StringUtils;
63  import org.junit.Test;
64  import org.junit.experimental.categories.Category;
65  import org.junit.runner.RunWith;
66  import org.junit.runners.Parameterized;
67  import org.junit.runners.Parameterized.Parameters;
68  
69  import com.google.protobuf.ByteString;
70  import com.google.protobuf.Descriptors.MethodDescriptor;
71  import com.google.protobuf.Message;
72  import com.google.protobuf.RpcCallback;
73  import com.google.protobuf.RpcChannel;
74  
75  @RunWith(Parameterized.class)
76  @Category({ SmallTests.class })
77  public class TestAsyncIPC extends AbstractTestIPC {
78  
79    private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
80  
81    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
82  
83    @Parameters
84    public static Collection<Object[]> parameters() {
85      List<Object[]> paramList = new ArrayList<Object[]>();
86      paramList.add(new Object[] { false, false });
87      paramList.add(new Object[] { false, true });
88      paramList.add(new Object[] { true, false });
89      paramList.add(new Object[] { true, true });
90      return paramList;
91    }
92  
93    private final boolean useNativeTransport;
94  
95    private final boolean useGlobalEventLoopGroup;
96  
97    public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) {
98      this.useNativeTransport = useNativeTransport;
99      this.useGlobalEventLoopGroup = useGlobalEventLoopGroup;
100   }
101 
102   private void setConf(Configuration conf) {
103     conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport);
104     conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, useGlobalEventLoopGroup);
105     if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) {
106       if (useNativeTransport
107           && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup)
108           || (!useNativeTransport
109           && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) {
110         AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
111         AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
112       }
113     }
114   }
115 
116   @Override
117   protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
118     setConf(conf);
119     return new AsyncRpcClient(conf) {
120 
121       @Override
122       Codec getCodec() {
123         return null;
124       }
125 
126     };
127   }
128 
129   @Override
130   protected AsyncRpcClient createRpcClient(Configuration conf) {
131     setConf(conf);
132     return new AsyncRpcClient(conf);
133   }
134 
135   @Override
136   protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
137     setConf(conf);
138     return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() {
139           @Override
140           protected void initChannel(SocketChannel ch) throws Exception {
141             ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
142               @Override
143               public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
144                   throws Exception {
145                 promise.setFailure(new RuntimeException("Injected fault"));
146               }
147             });
148           }
149         });
150   }
151 
152   @Test
153   public void testAsyncConnectionSetup() throws Exception {
154     TestRpcServer rpcServer = new TestRpcServer();
155     AsyncRpcClient client = createRpcClient(CONF);
156     try {
157       rpcServer.start();
158       InetSocketAddress address = rpcServer.getListenerAddress();
159       if (address == null) {
160         throw new IOException("Listener channel is closed");
161       }
162       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
163       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
164 
165       RpcChannel channel =
166           client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
167             System.currentTimeMillis()), User.getCurrent(), 0);
168 
169       final AtomicBoolean done = new AtomicBoolean(false);
170 
171       channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType()
172           .toProto(), new RpcCallback<Message>() {
173         @Override
174         public void run(Message parameter) {
175           done.set(true);
176         }
177       });
178 
179       TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
180         @Override
181         public boolean evaluate() throws Exception {
182           return done.get();
183         }
184       });
185     } finally {
186       client.close();
187       rpcServer.stop();
188     }
189   }
190 
191   @Test
192   public void testRTEDuringAsyncConnectionSetup() throws Exception {
193     TestRpcServer rpcServer = new TestRpcServer();
194     AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF);
195     try {
196       rpcServer.start();
197       InetSocketAddress address = rpcServer.getListenerAddress();
198       if (address == null) {
199         throw new IOException("Listener channel is closed");
200       }
201       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
202       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
203 
204       RpcChannel channel =
205           client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
206             System.currentTimeMillis()), User.getCurrent(), 0);
207 
208       final AtomicBoolean done = new AtomicBoolean(false);
209 
210       PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
211       controller.notifyOnFail(new RpcCallback<IOException>() {
212         @Override
213         public void run(IOException e) {
214           done.set(true);
215           LOG.info("Caught expected exception: " + e.toString());
216           assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
217         }
218       });
219 
220       channel.callMethod(md, controller, param, md.getOutputType().toProto(),
221         new RpcCallback<Message>() {
222           @Override
223           public void run(Message parameter) {
224             done.set(true);
225             fail("Expected an exception to have been thrown!");
226           }
227         });
228 
229       TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
230         @Override
231         public boolean evaluate() throws Exception {
232           return done.get();
233         }
234       });
235     } finally {
236       client.close();
237       rpcServer.stop();
238     }
239   }
240 
241   public static void main(String[] args) throws IOException, SecurityException,
242       NoSuchMethodException, InterruptedException {
243     if (args.length != 2) {
244       System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>");
245       return;
246     }
247     // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
248     // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
249     int cycles = Integer.parseInt(args[0]);
250     int cellcount = Integer.parseInt(args[1]);
251     Configuration conf = HBaseConfiguration.create();
252     TestRpcServer rpcServer = new TestRpcServer();
253     MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
254     EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
255     AsyncRpcClient client = new AsyncRpcClient(conf);
256     KeyValue kv = BIG_CELL;
257     Put p = new Put(CellUtil.cloneRow(kv));
258     for (int i = 0; i < cellcount; i++) {
259       p.add(kv);
260     }
261     RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
262     rm.add(p);
263     try {
264       rpcServer.start();
265       InetSocketAddress address = rpcServer.getListenerAddress();
266       if (address == null) {
267         throw new IOException("Listener channel is closed");
268       }
269       long startTime = System.currentTimeMillis();
270       User user = User.getCurrent();
271       for (int i = 0; i < cycles; i++) {
272         List<CellScannable> cells = new ArrayList<CellScannable>();
273         // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
274         ClientProtos.RegionAction.Builder builder =
275             RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
276               RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
277               MutationProto.newBuilder());
278         builder.setRegion(RegionSpecifier
279             .newBuilder()
280             .setType(RegionSpecifierType.REGION_NAME)
281             .setValue(
282               ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
283         if (i % 100000 == 0) {
284           LOG.info("" + i);
285           // Uncomment this for a thread dump every so often.
286           // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
287           // "Thread dump " + Thread.currentThread().getName());
288         }
289         PayloadCarryingRpcController pcrc =
290             new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
291         // Pair<Message, CellScanner> response =
292         client.call(pcrc, md, builder.build(), param, user, address,
293             new MetricsConnection.CallStats());
294         /*
295          * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
296          * count);
297          */
298       }
299       LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
300           + (System.currentTimeMillis() - startTime) + "ms");
301     } finally {
302       client.close();
303       rpcServer.stop();
304     }
305   }
306 }