1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.UndeclaredThrowableException;
25 import java.net.SocketTimeoutException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.DoNotRetryIOException;
34 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
35 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36 import org.apache.hadoop.hbase.util.ExceptionUtil;
37 import org.apache.hadoop.ipc.RemoteException;
38
39 import com.google.protobuf.ServiceException;
40
41
42
43
44
45
46
47
48
49
50 @InterfaceAudience.Private
51 public class RpcRetryingCaller<T> {
52 public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
53
54
55
56 private long globalStartTime;
57
58
59
60 private final static int MIN_RPC_TIMEOUT = 2000;
61
62 private final int startLogErrorsCnt;
63
64 private final long pause;
65 private final int retries;
66 private final AtomicBoolean cancelled = new AtomicBoolean(false);
67 private final RetryingCallerInterceptor interceptor;
68 private final RetryingCallerInterceptorContext context;
69
70 public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
71 this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
72 }
73
74 public RpcRetryingCaller(long pause, int retries,
75 RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {
76 this.pause = pause;
77 this.retries = retries;
78 this.interceptor = interceptor;
79 context = interceptor.createEmptyContext();
80 this.startLogErrorsCnt = startLogErrorsCnt;
81 }
82
83 private int getRemainingTime(int callTimeout) {
84 if (callTimeout <= 0) {
85 return 0;
86 } else {
87 if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE;
88 int remainingTime = (int) (callTimeout -
89 (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
90 if (remainingTime < MIN_RPC_TIMEOUT) {
91
92
93
94 remainingTime = MIN_RPC_TIMEOUT;
95 }
96 return remainingTime;
97 }
98 }
99
100 public void cancel(){
101 synchronized (cancelled){
102 cancelled.set(true);
103 cancelled.notifyAll();
104 }
105 }
106
107
108
109
110
111
112
113
114
115 public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
116 throws IOException, RuntimeException {
117 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
118 new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
119 this.globalStartTime = EnvironmentEdgeManager.currentTime();
120 context.clear();
121 for (int tries = 0;; tries++) {
122 long expectedSleep;
123 try {
124 callable.prepare(tries != 0);
125 interceptor.intercept(context.prepare(callable, tries));
126 return callable.call(getRemainingTime(callTimeout));
127 } catch (PreemptiveFastFailException e) {
128 throw e;
129 } catch (Throwable t) {
130 ExceptionUtil.rethrowIfInterrupt(t);
131 if (tries > startLogErrorsCnt) {
132 LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
133 (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
134 + "cancelled=" + cancelled.get() + ", msg="
135 + callable.getExceptionMessageAdditionalDetail());
136 }
137
138
139 interceptor.handleFailure(context, t);
140 t = translateException(t);
141 callable.throwable(t, retries != 1);
142 RetriesExhaustedException.ThrowableWithExtraContext qt =
143 new RetriesExhaustedException.ThrowableWithExtraContext(t,
144 EnvironmentEdgeManager.currentTime(), toString());
145 exceptions.add(qt);
146 if (tries >= retries - 1) {
147 throw new RetriesExhaustedException(tries, exceptions);
148 }
149
150
151
152 expectedSleep = callable.sleep(pause, tries + 1);
153
154
155 long duration = singleCallDuration(expectedSleep);
156 if (duration > callTimeout) {
157 String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
158 ": " + callable.getExceptionMessageAdditionalDetail();
159 throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
160 }
161 } finally {
162 interceptor.updateFailureInfo(context);
163 }
164 try {
165 if (expectedSleep > 0) {
166 synchronized (cancelled) {
167 if (cancelled.get()) return null;
168 cancelled.wait(expectedSleep);
169 }
170 }
171 if (cancelled.get()) return null;
172 } catch (InterruptedException e) {
173 throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries);
174 }
175 }
176 }
177
178
179
180
181 private long singleCallDuration(final long expectedSleep) {
182 return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep;
183 }
184
185
186
187
188
189
190
191
192
193
194 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
195 throws IOException, RuntimeException {
196
197 this.globalStartTime = EnvironmentEdgeManager.currentTime();
198 try {
199 callable.prepare(false);
200 return callable.call(callTimeout);
201 } catch (Throwable t) {
202 Throwable t2 = translateException(t);
203 ExceptionUtil.rethrowIfInterrupt(t2);
204
205 if (t2 instanceof IOException) {
206 throw (IOException)t2;
207 } else {
208 throw new RuntimeException(t2);
209 }
210 }
211 }
212
213
214
215
216
217
218
219 static Throwable translateException(Throwable t) throws DoNotRetryIOException {
220 if (t instanceof UndeclaredThrowableException) {
221 if (t.getCause() != null) {
222 t = t.getCause();
223 }
224 }
225 if (t instanceof RemoteException) {
226 t = ((RemoteException)t).unwrapRemoteException();
227 }
228 if (t instanceof LinkageError) {
229 throw new DoNotRetryIOException(t);
230 }
231 if (t instanceof ServiceException) {
232 ServiceException se = (ServiceException)t;
233 Throwable cause = se.getCause();
234 if (cause != null) {
235 if (cause instanceof DoNotRetryIOException) {
236 throw (DoNotRetryIOException)cause;
237 } else if (cause instanceof NeedUnmanagedConnectionException) {
238 throw new DoNotRetryIOException(cause);
239 }
240 }
241
242 t = cause;
243
244 translateException(t);
245 } else if (t instanceof DoNotRetryIOException) {
246 throw (DoNotRetryIOException)t;
247 } else if (t instanceof NeedUnmanagedConnectionException) {
248 throw new DoNotRetryIOException(t);
249 }
250 return t;
251 }
252
253 @Override
254 public String toString() {
255 return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime +
256 ", pause=" + pause + ", retries=" + retries + '}';
257 }
258 }