1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.mockito.Matchers.anyInt;
22 import static org.mockito.Mockito.doThrow;
23 import static org.mockito.Mockito.spy;
24
25 import java.io.IOException;
26 import java.net.InetSocketAddress;
27 import java.net.Socket;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 import javax.net.SocketFactory;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.CellScannable;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.KeyValueUtil;
43 import org.apache.hadoop.hbase.ServerName;
44 import org.apache.hadoop.hbase.Waiter;
45 import org.apache.hadoop.hbase.testclassification.SmallTests;
46 import org.apache.hadoop.hbase.client.MetricsConnection;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.RowMutations;
49 import org.apache.hadoop.hbase.codec.Codec;
50 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
51 import org.apache.hadoop.hbase.protobuf.RequestConverter;
52 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
53 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
54 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
55 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
56 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
57 import org.apache.hadoop.hbase.security.User;
58 import org.apache.hadoop.hbase.testclassification.SmallTests;
59 import org.apache.hadoop.net.NetUtils;
60 import org.junit.experimental.categories.Category;
61 import org.mockito.Mockito;
62 import org.mockito.invocation.InvocationOnMock;
63 import org.mockito.stubbing.Answer;
64
65 import com.google.protobuf.ByteString;
66 import com.google.protobuf.Descriptors.MethodDescriptor;
67
68 @Category({ SmallTests.class })
69 public class TestIPC extends AbstractTestIPC {
70
71 private static final Log LOG = LogFactory.getLog(TestIPC.class);
72
73 @Override
74 protected RpcClientImpl createRpcClientNoCodec(Configuration conf) {
75 return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
76 @Override
77 Codec getCodec() {
78 return null;
79 }
80 };
81 }
82
83 @Override
84 protected RpcClientImpl createRpcClient(Configuration conf) {
85 return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
86 }
87
88 @Override
89 protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf)
90 throws IOException {
91 SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
92 Mockito.doAnswer(new Answer<Socket>() {
93 @Override
94 public Socket answer(InvocationOnMock invocation) throws Throwable {
95 Socket s = spy((Socket) invocation.callRealMethod());
96 doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt());
97 return s;
98 }
99 }).when(spyFactory).createSocket();
100
101 return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
102 }
103
104 public static void main(String[] args) throws IOException, SecurityException,
105 NoSuchMethodException, InterruptedException {
106 if (args.length != 2) {
107 System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
108 return;
109 }
110
111
112 int cycles = Integer.parseInt(args[0]);
113 int cellcount = Integer.parseInt(args[1]);
114 Configuration conf = HBaseConfiguration.create();
115 TestRpcServer rpcServer = new TestRpcServer();
116 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
117 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
118 RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
119 KeyValue kv = BIG_CELL;
120 Put p = new Put(CellUtil.cloneRow(kv));
121 for (int i = 0; i < cellcount; i++) {
122 p.add(kv);
123 }
124 RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
125 rm.add(p);
126 try {
127 rpcServer.start();
128 long startTime = System.currentTimeMillis();
129 User user = User.getCurrent();
130 InetSocketAddress address = rpcServer.getListenerAddress();
131 if (address == null) {
132 throw new IOException("Listener channel is closed");
133 }
134 for (int i = 0; i < cycles; i++) {
135 List<CellScannable> cells = new ArrayList<CellScannable>();
136
137 ClientProtos.RegionAction.Builder builder =
138 RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
139 RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
140 MutationProto.newBuilder());
141 builder.setRegion(RegionSpecifier
142 .newBuilder()
143 .setType(RegionSpecifierType.REGION_NAME)
144 .setValue(
145 ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
146 if (i % 100000 == 0) {
147 LOG.info("" + i);
148
149
150
151 }
152 PayloadCarryingRpcController pcrc =
153 new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
154
155 client.call(pcrc, md, builder.build(), param, user, address,
156 new MetricsConnection.CallStats());
157
158
159
160
161 }
162 LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
163 + (System.currentTimeMillis() - startTime) + "ms");
164 } finally {
165 client.close();
166 rpcServer.stop();
167 }
168 }
169
170 }