1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.protobuf.BlockingRpcChannel;
23 import com.google.protobuf.Descriptors;
24 import com.google.protobuf.Message;
25 import com.google.protobuf.RpcController;
26 import com.google.protobuf.ServiceException;
27
28 import java.io.IOException;
29 import java.net.ConnectException;
30 import java.net.InetSocketAddress;
31 import java.net.SocketAddress;
32 import java.net.SocketTimeoutException;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.CellScanner;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.client.MetricsConnection;
42 import org.apache.hadoop.hbase.codec.Codec;
43 import org.apache.hadoop.hbase.codec.KeyValueCodec;
44 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
45 import org.apache.hadoop.hbase.security.User;
46 import org.apache.hadoop.hbase.security.UserProvider;
47 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48 import org.apache.hadoop.hbase.util.Pair;
49 import org.apache.hadoop.hbase.util.PoolMap;
50 import org.apache.hadoop.io.compress.CompressionCodec;
51
52
53
54
55 @InterfaceAudience.Private
56 public abstract class AbstractRpcClient implements RpcClient {
57
58 public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
59
60 protected final Configuration conf;
61 protected String clusterId;
62 protected final SocketAddress localAddr;
63 protected final MetricsConnection metrics;
64
65 protected UserProvider userProvider;
66 protected final IPCUtil ipcUtil;
67
68 protected final int minIdleTimeBeforeClose;
69
70 protected final int maxRetries;
71 protected final long failureSleep;
72 protected final boolean tcpNoDelay;
73 protected final boolean tcpKeepAlive;
74 protected final Codec codec;
75 protected final CompressionCodec compressor;
76 protected final boolean fallbackAllowed;
77
78 protected final int connectTO;
79 protected final int readTO;
80 protected final int writeTO;
81
82
83
84
85
86
87
88
89
90 public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
91 MetricsConnection metrics) {
92 this.userProvider = UserProvider.instantiate(conf);
93 this.localAddr = localAddr;
94 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
95 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
96 this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
97 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
98 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
99 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
100 this.ipcUtil = new IPCUtil(conf);
101
102 this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000);
103 this.conf = conf;
104 this.codec = getCodec();
105 this.compressor = getCompressor(conf);
106 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
107 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
108 this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
109 this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
110 this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
111 this.metrics = metrics;
112
113
114 if (LOG.isDebugEnabled()) {
115 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
116 ", tcpKeepAlive=" + this.tcpKeepAlive +
117 ", tcpNoDelay=" + this.tcpNoDelay +
118 ", connectTO=" + this.connectTO +
119 ", readTO=" + this.readTO +
120 ", writeTO=" + this.writeTO +
121 ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
122 ", maxRetries=" + this.maxRetries +
123 ", fallbackAllowed=" + this.fallbackAllowed +
124 ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
125 }
126 }
127
128 @VisibleForTesting
129 public static String getDefaultCodec(final Configuration c) {
130
131
132
133 return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
134 }
135
136
137
138
139
140 Codec getCodec() {
141
142
143 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
144 if (className == null || className.length() == 0) return null;
145 try {
146 return (Codec)Class.forName(className).newInstance();
147 } catch (Exception e) {
148 throw new RuntimeException("Failed getting codec " + className, e);
149 }
150 }
151
152 @Override
153 public boolean hasCellBlockSupport() {
154 return this.codec != null;
155 }
156
157
158
159
160
161
162 private static CompressionCodec getCompressor(final Configuration conf) {
163 String className = conf.get("hbase.client.rpc.compressor", null);
164 if (className == null || className.isEmpty()) return null;
165 try {
166 return (CompressionCodec)Class.forName(className).newInstance();
167 } catch (Exception e) {
168 throw new RuntimeException("Failed getting compressor " + className, e);
169 }
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188 protected static PoolMap.PoolType getPoolType(Configuration config) {
189 return PoolMap.PoolType
190 .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
191 PoolMap.PoolType.ThreadLocal);
192 }
193
194
195
196
197
198
199
200
201 protected static int getPoolSize(Configuration config) {
202 return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
203 }
204
205
206
207
208
209
210
211
212
213
214
215 Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
216 Message param, Message returnType, final User ticket, final InetSocketAddress isa)
217 throws ServiceException {
218 if (pcrc == null) {
219 pcrc = new PayloadCarryingRpcController();
220 }
221
222 Pair<Message, CellScanner> val;
223 try {
224 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
225 cs.setStartTime(EnvironmentEdgeManager.currentTime());
226 val = call(pcrc, md, param, returnType, ticket, isa, cs);
227
228 pcrc.setCellScanner(val.getSecond());
229
230 cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
231 if (metrics != null) {
232 metrics.updateRpc(md, param, cs);
233 }
234 if (LOG.isTraceEnabled()) {
235 LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
236 }
237 return val.getFirst();
238 } catch (Throwable e) {
239 throw new ServiceException(e);
240 }
241 }
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258 protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
259 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
260 InetSocketAddress isa, MetricsConnection.CallStats callStats)
261 throws IOException, InterruptedException;
262
263 @Override
264 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
265 int defaultOperationTimeout) {
266 return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
267 }
268
269
270
271
272
273
274
275
276
277
278
279 protected IOException wrapException(InetSocketAddress addr, Exception exception) {
280 if (exception instanceof ConnectException) {
281
282 return (ConnectException) new ConnectException("Call to " + addr
283 + " failed on connection exception: " + exception).initCause(exception);
284 } else if (exception instanceof SocketTimeoutException) {
285 return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
286 + " failed because " + exception).initCause(exception);
287 } else if (exception instanceof ConnectionClosingException) {
288 return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
289 + " failed on local exception: " + exception).initCause(exception);
290 } else {
291 return (IOException) new IOException("Call to " + addr + " failed on local exception: "
292 + exception).initCause(exception);
293 }
294 }
295
296
297
298
299 @VisibleForTesting
300 public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
301 private final InetSocketAddress isa;
302 private final AbstractRpcClient rpcClient;
303 private final User ticket;
304 private final int channelOperationTimeout;
305
306
307
308
309 protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
310 final ServerName sn, final User ticket, int channelOperationTimeout) {
311 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
312 this.rpcClient = rpcClient;
313 this.ticket = ticket;
314 this.channelOperationTimeout = channelOperationTimeout;
315 }
316
317 @Override
318 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
319 Message param, Message returnType) throws ServiceException {
320 PayloadCarryingRpcController pcrc;
321 if (controller != null && controller instanceof PayloadCarryingRpcController) {
322 pcrc = (PayloadCarryingRpcController) controller;
323 if (!pcrc.hasCallTimeout()) {
324 pcrc.setCallTimeout(channelOperationTimeout);
325 }
326 } else {
327 pcrc = new PayloadCarryingRpcController();
328 pcrc.setCallTimeout(channelOperationTimeout);
329 }
330
331 return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
332 }
333 }
334 }