1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import static org.junit.Assert.assertTrue;
21 import static org.junit.Assert.fail;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelInitializer;
24 import io.netty.channel.ChannelOutboundHandlerAdapter;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.epoll.EpollEventLoopGroup;
27 import io.netty.channel.nio.NioEventLoopGroup;
28 import io.netty.channel.socket.SocketChannel;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.List;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.CellScannable;
41 import org.apache.hadoop.hbase.CellUtil;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HBaseTestingUtility;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.KeyValue;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.Waiter;
49 import org.apache.hadoop.hbase.client.MetricsConnection;
50 import org.apache.hadoop.hbase.client.Put;
51 import org.apache.hadoop.hbase.client.RowMutations;
52 import org.apache.hadoop.hbase.codec.Codec;
53 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
54 import org.apache.hadoop.hbase.protobuf.RequestConverter;
55 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
56 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
57 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
58 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
59 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
60 import org.apache.hadoop.hbase.security.User;
61 import org.apache.hadoop.hbase.testclassification.SmallTests;
62 import org.apache.hadoop.util.StringUtils;
63 import org.junit.Test;
64 import org.junit.experimental.categories.Category;
65 import org.junit.runner.RunWith;
66 import org.junit.runners.Parameterized;
67 import org.junit.runners.Parameterized.Parameters;
68
69 import com.google.protobuf.ByteString;
70 import com.google.protobuf.Descriptors.MethodDescriptor;
71 import com.google.protobuf.Message;
72 import com.google.protobuf.RpcCallback;
73 import com.google.protobuf.RpcChannel;
74
75 @RunWith(Parameterized.class)
76 @Category({ SmallTests.class })
77 public class TestAsyncIPC extends AbstractTestIPC {
78
79 private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
80
81 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
82
83 @Parameters
84 public static Collection<Object[]> parameters() {
85 List<Object[]> paramList = new ArrayList<Object[]>();
86 paramList.add(new Object[] { false, false });
87 paramList.add(new Object[] { false, true });
88 paramList.add(new Object[] { true, false });
89 paramList.add(new Object[] { true, true });
90 return paramList;
91 }
92
93 private final boolean useNativeTransport;
94
95 private final boolean useGlobalEventLoopGroup;
96
97 public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) {
98 this.useNativeTransport = useNativeTransport;
99 this.useGlobalEventLoopGroup = useGlobalEventLoopGroup;
100 }
101
102 private void setConf(Configuration conf) {
103 conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport);
104 conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, useGlobalEventLoopGroup);
105 if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) {
106 if (useNativeTransport
107 && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup)
108 || (!useNativeTransport
109 && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) {
110 AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
111 AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
112 }
113 }
114 }
115
116 @Override
117 protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
118 setConf(conf);
119 return new AsyncRpcClient(conf) {
120
121 @Override
122 Codec getCodec() {
123 return null;
124 }
125
126 };
127 }
128
129 @Override
130 protected AsyncRpcClient createRpcClient(Configuration conf) {
131 setConf(conf);
132 return new AsyncRpcClient(conf);
133 }
134
135 @Override
136 protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
137 setConf(conf);
138 return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() {
139 @Override
140 protected void initChannel(SocketChannel ch) throws Exception {
141 ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
142 @Override
143 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
144 throws Exception {
145 promise.setFailure(new RuntimeException("Injected fault"));
146 }
147 });
148 }
149 });
150 }
151
152 @Test
153 public void testAsyncConnectionSetup() throws Exception {
154 TestRpcServer rpcServer = new TestRpcServer();
155 AsyncRpcClient client = createRpcClient(CONF);
156 try {
157 rpcServer.start();
158 InetSocketAddress address = rpcServer.getListenerAddress();
159 if (address == null) {
160 throw new IOException("Listener channel is closed");
161 }
162 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
163 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
164
165 RpcChannel channel =
166 client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
167 System.currentTimeMillis()), User.getCurrent(), 0);
168
169 final AtomicBoolean done = new AtomicBoolean(false);
170
171 channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType()
172 .toProto(), new RpcCallback<Message>() {
173 @Override
174 public void run(Message parameter) {
175 done.set(true);
176 }
177 });
178
179 TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
180 @Override
181 public boolean evaluate() throws Exception {
182 return done.get();
183 }
184 });
185 } finally {
186 client.close();
187 rpcServer.stop();
188 }
189 }
190
191 @Test
192 public void testRTEDuringAsyncConnectionSetup() throws Exception {
193 TestRpcServer rpcServer = new TestRpcServer();
194 AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF);
195 try {
196 rpcServer.start();
197 InetSocketAddress address = rpcServer.getListenerAddress();
198 if (address == null) {
199 throw new IOException("Listener channel is closed");
200 }
201 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
202 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
203
204 RpcChannel channel =
205 client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
206 System.currentTimeMillis()), User.getCurrent(), 0);
207
208 final AtomicBoolean done = new AtomicBoolean(false);
209
210 PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
211 controller.notifyOnFail(new RpcCallback<IOException>() {
212 @Override
213 public void run(IOException e) {
214 done.set(true);
215 LOG.info("Caught expected exception: " + e.toString());
216 assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
217 }
218 });
219
220 channel.callMethod(md, controller, param, md.getOutputType().toProto(),
221 new RpcCallback<Message>() {
222 @Override
223 public void run(Message parameter) {
224 done.set(true);
225 fail("Expected an exception to have been thrown!");
226 }
227 });
228
229 TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
230 @Override
231 public boolean evaluate() throws Exception {
232 return done.get();
233 }
234 });
235 } finally {
236 client.close();
237 rpcServer.stop();
238 }
239 }
240
241 public static void main(String[] args) throws IOException, SecurityException,
242 NoSuchMethodException, InterruptedException {
243 if (args.length != 2) {
244 System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>");
245 return;
246 }
247
248
249 int cycles = Integer.parseInt(args[0]);
250 int cellcount = Integer.parseInt(args[1]);
251 Configuration conf = HBaseConfiguration.create();
252 TestRpcServer rpcServer = new TestRpcServer();
253 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
254 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
255 AsyncRpcClient client = new AsyncRpcClient(conf);
256 KeyValue kv = BIG_CELL;
257 Put p = new Put(CellUtil.cloneRow(kv));
258 for (int i = 0; i < cellcount; i++) {
259 p.add(kv);
260 }
261 RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
262 rm.add(p);
263 try {
264 rpcServer.start();
265 InetSocketAddress address = rpcServer.getListenerAddress();
266 if (address == null) {
267 throw new IOException("Listener channel is closed");
268 }
269 long startTime = System.currentTimeMillis();
270 User user = User.getCurrent();
271 for (int i = 0; i < cycles; i++) {
272 List<CellScannable> cells = new ArrayList<CellScannable>();
273
274 ClientProtos.RegionAction.Builder builder =
275 RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
276 RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
277 MutationProto.newBuilder());
278 builder.setRegion(RegionSpecifier
279 .newBuilder()
280 .setType(RegionSpecifierType.REGION_NAME)
281 .setValue(
282 ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
283 if (i % 100000 == 0) {
284 LOG.info("" + i);
285
286
287
288 }
289 PayloadCarryingRpcController pcrc =
290 new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
291
292 client.call(pcrc, md, builder.build(), param, user, address,
293 new MetricsConnection.CallStats());
294
295
296
297
298 }
299 LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
300 + (System.currentTimeMillis() - startTime) + "ms");
301 } finally {
302 client.close();
303 rpcServer.stop();
304 }
305 }
306 }