1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import org.apache.hadoop.hbase.classification.InterfaceAudience;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.hbase.HConstants;
23 import org.apache.hadoop.hbase.util.ReflectionUtils;
24
25
26
27
28 @InterfaceAudience.Private
29 public class RpcRetryingCallerFactory {
30
31
32 public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
33 protected final Configuration conf;
34 private final long pause;
35 private final int retries;
36 private final RetryingCallerInterceptor interceptor;
37 private final int startLogErrorsCnt;
38 private final boolean enableBackPressure;
39 private ServerStatisticTracker stats;
40
41 public RpcRetryingCallerFactory(Configuration conf) {
42 this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
43 }
44
45 public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
46 this.conf = conf;
47 pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
48 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
49 retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
50 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
51 startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
52 AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
53 this.interceptor = interceptor;
54 enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
55 HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
56 }
57
58
59
60
61 public void setStatisticTracker(ServerStatisticTracker statisticTracker) {
62 this.stats = statisticTracker;
63 }
64
65 public <T> RpcRetryingCaller<T> newCaller() {
66
67
68 RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor,
69 startLogErrorsCnt);
70
71
72 if (enableBackPressure && this.stats != null) {
73 caller = new StatsTrackingRpcRetryingCaller<T>(pause, retries, interceptor,
74 startLogErrorsCnt, stats);
75 }
76
77 return caller;
78 }
79
80 public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
81 return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
82 }
83
84 public static RpcRetryingCallerFactory instantiate(Configuration configuration,
85 ServerStatisticTracker stats) {
86 return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
87 }
88
89 public static RpcRetryingCallerFactory instantiate(Configuration configuration,
90 RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
91 String clazzName = RpcRetryingCallerFactory.class.getName();
92 String rpcCallerFactoryClazz =
93 configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
94 RpcRetryingCallerFactory factory;
95 if (rpcCallerFactoryClazz.equals(clazzName)) {
96 factory = new RpcRetryingCallerFactory(configuration, interceptor);
97 } else {
98 factory = ReflectionUtils.instantiateWithCustomCtor(
99 rpcCallerFactoryClazz, new Class[] { Configuration.class },
100 new Object[] { configuration });
101 }
102
103
104 factory.setStatisticTracker(stats);
105 return factory;
106 }
107 }