View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.lang.reflect.UndeclaredThrowableException;
27  import java.net.InetAddress;
28  import java.net.InetSocketAddress;
29  import java.util.ArrayList;
30  import java.util.Date;
31  import java.util.HashSet;
32  import java.util.LinkedHashMap;
33  import java.util.List;
34  import java.util.concurrent.BlockingQueue;
35  import java.util.Map;
36  import java.util.Map.Entry;
37  import java.util.NavigableMap;
38  import java.util.Set;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ConcurrentMap;
41  import java.util.concurrent.ExecutorService;
42  import java.util.concurrent.LinkedBlockingQueue;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicInteger;
47  
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.conf.Configuration;
51  import org.apache.hadoop.hbase.DoNotRetryIOException;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HRegionLocation;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.MasterNotRunningException;
58  import org.apache.hadoop.hbase.MetaTableAccessor;
59  import org.apache.hadoop.hbase.RegionLocations;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.TableName;
62  import org.apache.hadoop.hbase.TableNotEnabledException;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
65  import org.apache.hadoop.hbase.classification.InterfaceAudience;
66  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
67  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
68  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
69  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
70  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
71  import org.apache.hadoop.hbase.client.coprocessor.Batch;
72  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
73  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
74  import org.apache.hadoop.hbase.ipc.RpcClient;
75  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
76  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
77  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
78  import org.apache.hadoop.hbase.protobuf.RequestConverter;
79  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
80  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
81  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
82  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
83  import org.apache.hadoop.hbase.protobuf.generated.*;
84  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
85  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
86  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
87  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
88  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
89  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
90  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
91  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
92  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
93  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
94  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
95  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
96  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
97  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
98  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeResponse;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
182 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
183 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
184 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
185 import org.apache.hadoop.hbase.security.User;
186 import org.apache.hadoop.hbase.security.UserProvider;
187 import org.apache.hadoop.hbase.util.Bytes;
188 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
189 import org.apache.hadoop.hbase.util.ExceptionUtil;
190 import org.apache.hadoop.hbase.util.Threads;
191 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
192 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
193 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
194 import org.apache.hadoop.ipc.RemoteException;
195 import org.apache.zookeeper.KeeperException;
196 
197 import com.google.common.annotations.VisibleForTesting;
198 import com.google.protobuf.BlockingRpcChannel;
199 import com.google.protobuf.RpcController;
200 import com.google.protobuf.ServiceException;
201 
202 /**
203  * An internal, non-instantiable class that manages creation of {@link HConnection}s.
204  */
205 @SuppressWarnings("serial")
206 @InterfaceAudience.Private
207 // NOTE: DO NOT make this class public. It was made package-private on purpose.
208 class ConnectionManager {
209   static final Log LOG = LogFactory.getLog(ConnectionManager.class);
210 
211   public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
212   private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
213   private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
214 
215   // An LRU Map of HConnectionKey -> HConnection (TableServer).  All
216   // access must be synchronized.  This map is not private because tests
217   // need to be able to tinker with it.
218   static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
219 
220   public static final int MAX_CACHED_CONNECTION_INSTANCES;
221 
222   /**
223    * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
224    * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
225    */
226   private static volatile NonceGenerator nonceGenerator = null;
227   /** The nonce generator lock. Only taken when creating HConnection, which gets a private copy. */
228   private static Object nonceGeneratorCreateLock = new Object();
229 
230   static {
231     // We set instances to one more than the value specified for {@link
232     // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
233     // connections to the ensemble from the one client is 30, so in that case we
234     // should run into zk issues before the LRU hit this value of 31.
235     MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
236       HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
237     CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
238         (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
239       @Override
240       protected boolean removeEldestEntry(
241           Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
242          return size() > MAX_CACHED_CONNECTION_INSTANCES;
243        }
244     };
245   }
246 
247   /** Dummy nonce generator for disabled nonces. */
248   static class NoNonceGenerator implements NonceGenerator {
249     @Override
250     public long getNonceGroup() {
251       return HConstants.NO_NONCE;
252     }
253     @Override
254     public long newNonce() {
255       return HConstants.NO_NONCE;
256     }
257   }
258 
259   /*
260    * Non-instantiable.
261    */
262   private ConnectionManager() {
263     super();
264   }
265 
266   /**
267    * @param conn The connection for which to replace the generator.
268    * @param cnm Replaces the nonce generator used, for testing.
269    * @return old nonce generator.
270    */
271   @VisibleForTesting
272   static NonceGenerator injectNonceGeneratorForTesting(
273       ClusterConnection conn, NonceGenerator cnm) {
274     HConnectionImplementation connImpl = (HConnectionImplementation)conn;
275     NonceGenerator ng = connImpl.getNonceGenerator();
276     LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
277     connImpl.nonceGenerator = cnm;
278     return ng;
279   }
280 
281   /**
282    * Get the connection that goes with the passed <code>conf</code> configuration instance.
283    * If no current connection exists, method creates a new connection and keys it using
284    * connection-specific properties from the passed {@link Configuration}; see
285    * {@link HConnectionKey}.
286    * @param conf configuration
287    * @return HConnection object for <code>conf</code>
288    * @throws ZooKeeperConnectionException
289    */
290   @Deprecated
291   public static HConnection getConnection(final Configuration conf) throws IOException {
292     return getConnectionInternal(conf);
293   }
294 
295 
296   static ClusterConnection getConnectionInternal(final Configuration conf)
297     throws IOException {
298     HConnectionKey connectionKey = new HConnectionKey(conf);
299     synchronized (CONNECTION_INSTANCES) {
300       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
301       if (connection == null) {
302         connection = (HConnectionImplementation)createConnection(conf, true);
303         CONNECTION_INSTANCES.put(connectionKey, connection);
304       } else if (connection.isClosed()) {
305         ConnectionManager.deleteConnection(connectionKey, true);
306         connection = (HConnectionImplementation)createConnection(conf, true);
307         CONNECTION_INSTANCES.put(connectionKey, connection);
308       }
309       connection.incCount();
310       return connection;
311     }
312   }
313 
314   /**
315    * Create a new HConnection instance using the passed <code>conf</code> instance.
316    * <p>Note: This bypasses the usual HConnection life cycle management done by
317    * {@link #getConnection(Configuration)}. The caller is responsible for
318    * calling {@link HConnection#close()} on the returned connection instance.
319    *
320    * This is the recommended way to create HConnections.
321    * {@code
322    * HConnection connection = ConnectionManagerInternal.createConnection(conf);
323    * HTableInterface table = connection.getTable("mytable");
324    * table.get(...);
325    * ...
326    * table.close();
327    * connection.close();
328    * }
329    *
330    * @param conf configuration
331    * @return HConnection object for <code>conf</code>
332    * @throws ZooKeeperConnectionException
333    */
334   public static HConnection createConnection(Configuration conf) throws IOException {
335     return createConnectionInternal(conf);
336   }
337 
338   static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
339     UserProvider provider = UserProvider.instantiate(conf);
340     return createConnection(conf, false, null, provider.getCurrent());
341   }
342 
343   /**
344    * Create a new HConnection instance using the passed <code>conf</code> instance.
345    * <p>Note: This bypasses the usual HConnection life cycle management done by
346    * {@link #getConnection(Configuration)}. The caller is responsible for
347    * calling {@link HConnection#close()} on the returned connection instance.
348    * This is the recommended way to create HConnections.
349    * {@code
350    * ExecutorService pool = ...;
351    * HConnection connection = HConnectionManager.createConnection(conf, pool);
352    * HTableInterface table = connection.getTable("mytable");
353    * table.get(...);
354    * ...
355    * table.close();
356    * connection.close();
357    * }
358    * @param conf configuration
359    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
360    * @return HConnection object for <code>conf</code>
361    * @throws ZooKeeperConnectionException
362    */
363   public static HConnection createConnection(Configuration conf, ExecutorService pool)
364   throws IOException {
365     UserProvider provider = UserProvider.instantiate(conf);
366     return createConnection(conf, false, pool, provider.getCurrent());
367   }
368 
369   /**
370    * Create a new HConnection instance using the passed <code>conf</code> instance.
371    * <p>Note: This bypasses the usual HConnection life cycle management done by
372    * {@link #getConnection(Configuration)}. The caller is responsible for
373    * calling {@link HConnection#close()} on the returned connection instance.
374    * This is the recommended way to create HConnections.
375    * {@code
376    * ExecutorService pool = ...;
377    * HConnection connection = HConnectionManager.createConnection(conf, pool);
378    * HTableInterface table = connection.getTable("mytable");
379    * table.get(...);
380    * ...
381    * table.close();
382    * connection.close();
383    * }
384    * @param conf configuration
385    * @param user the user the connection is for
386    * @return HConnection object for <code>conf</code>
387    * @throws ZooKeeperConnectionException
388    */
389   public static HConnection createConnection(Configuration conf, User user)
390   throws IOException {
391     return createConnection(conf, false, null, user);
392   }
393 
394   /**
395    * Create a new HConnection instance using the passed <code>conf</code> instance.
396    * <p>Note: This bypasses the usual HConnection life cycle management done by
397    * {@link #getConnection(Configuration)}. The caller is responsible for
398    * calling {@link HConnection#close()} on the returned connection instance.
399    * This is the recommended way to create HConnections.
400    * {@code
401    * ExecutorService pool = ...;
402    * HConnection connection = HConnectionManager.createConnection(conf, pool);
403    * HTableInterface table = connection.getTable("mytable");
404    * table.get(...);
405    * ...
406    * table.close();
407    * connection.close();
408    * }
409    * @param conf configuration
410    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
411    * @param user the user the connection is for
412    * @return HConnection object for <code>conf</code>
413    * @throws ZooKeeperConnectionException
414    */
415   public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
416   throws IOException {
417     return createConnection(conf, false, pool, user);
418   }
419 
420   @Deprecated
421   static HConnection createConnection(final Configuration conf, final boolean managed)
422       throws IOException {
423     UserProvider provider = UserProvider.instantiate(conf);
424     return createConnection(conf, managed, null, provider.getCurrent());
425   }
426 
427   @Deprecated
428   static ClusterConnection createConnection(final Configuration conf, final boolean managed,
429       final ExecutorService pool, final User user)
430   throws IOException {
431     return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
432   }
433 
434   /**
435    * Delete connection information for the instance specified by passed configuration.
436    * If there are no more references to the designated connection connection, this method will
437    * then close connection to the zookeeper ensemble and let go of all associated resources.
438    *
439    * @param conf configuration whose identity is used to find {@link HConnection} instance.
440    * @deprecated
441    */
442   @Deprecated
443   public static void deleteConnection(Configuration conf) {
444     deleteConnection(new HConnectionKey(conf), false);
445   }
446 
447   /**
448    * Cleanup a known stale connection.
449    * This will then close connection to the zookeeper ensemble and let go of all resources.
450    *
451    * @param connection
452    * @deprecated
453    */
454   @Deprecated
455   public static void deleteStaleConnection(HConnection connection) {
456     deleteConnection(connection, true);
457   }
458 
459   /**
460    * Delete information for all connections. Close or not the connection, depending on the
461    *  staleConnection boolean and the ref count. By default, you should use it with
462    *  staleConnection to true.
463    * @deprecated
464    */
465   @Deprecated
466   public static void deleteAllConnections(boolean staleConnection) {
467     synchronized (CONNECTION_INSTANCES) {
468       Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
469       connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
470       for (HConnectionKey connectionKey : connectionKeys) {
471         deleteConnection(connectionKey, staleConnection);
472       }
473       CONNECTION_INSTANCES.clear();
474     }
475   }
476 
477   /**
478    * Delete information for all connections..
479    * @deprecated kept for backward compatibility, but the behavior is broken. HBASE-8983
480    */
481   @Deprecated
482   public static void deleteAllConnections() {
483     deleteAllConnections(false);
484   }
485 
486 
487   @Deprecated
488   private static void deleteConnection(HConnection connection, boolean staleConnection) {
489     synchronized (CONNECTION_INSTANCES) {
490       for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
491         if (e.getValue() == connection) {
492           deleteConnection(e.getKey(), staleConnection);
493           break;
494         }
495       }
496     }
497   }
498 
499   @Deprecated
500   private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
501     synchronized (CONNECTION_INSTANCES) {
502       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
503       if (connection != null) {
504         connection.decCount();
505         if (connection.isZeroReference() || staleConnection) {
506           CONNECTION_INSTANCES.remove(connectionKey);
507           connection.internalClose();
508         }
509       } else {
510         LOG.error("Connection not found in the list, can't delete it "+
511           "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
512       }
513     }
514   }
515 
516 
517   /**
518    * This convenience method invokes the given {@link HConnectable#connect}
519    * implementation using a {@link HConnection} instance that lasts just for the
520    * duration of the invocation.
521    *
522    * @param <T> the return type of the connect method
523    * @param connectable the {@link HConnectable} instance
524    * @return the value returned by the connect method
525    * @throws IOException
526    */
527   @InterfaceAudience.Private
528   public static <T> T execute(HConnectable<T> connectable) throws IOException {
529     if (connectable == null || connectable.conf == null) {
530       return null;
531     }
532     Configuration conf = connectable.conf;
533     HConnection connection = getConnection(conf);
534     boolean connectSucceeded = false;
535     try {
536       T returnValue = connectable.connect(connection);
537       connectSucceeded = true;
538       return returnValue;
539     } finally {
540       try {
541         connection.close();
542       } catch (Exception e) {
543         ExceptionUtil.rethrowIfInterrupt(e);
544         if (connectSucceeded) {
545           throw new IOException("The connection to " + connection
546               + " could not be deleted.", e);
547         }
548       }
549     }
550   }
551 
552   /** Encapsulates connection to zookeeper and regionservers.*/
553   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
554       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
555       justification="Access to the conncurrent hash map is under a lock so should be fine.")
556   static class HConnectionImplementation implements ClusterConnection, Closeable {
557     static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
558     private final boolean hostnamesCanChange;
559     private final long pause;
560     private final boolean useMetaReplicas;
561     private final int numTries;
562     final int rpcTimeout;
563     private NonceGenerator nonceGenerator = null;
564     private final AsyncProcess asyncProcess;
565     // single tracker per connection
566     private final ServerStatisticTracker stats;
567 
568     private volatile boolean closed;
569     private volatile boolean aborted;
570 
571     // package protected for the tests
572     ClusterStatusListener clusterStatusListener;
573 
574 
575     private final Object metaRegionLock = new Object();
576 
577     // We have a single lock for master & zk to prevent deadlocks. Having
578     //  one lock for ZK and one lock for master is not possible:
579     //  When creating a connection to master, we need a connection to ZK to get
580     //  its address. But another thread could have taken the ZK lock, and could
581     //  be waiting for the master lock => deadlock.
582     private final Object masterAndZKLock = new Object();
583 
584     private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
585 
586     // thread executor shared by all HTableInterface instances created
587     // by this connection
588     private volatile ExecutorService batchPool = null;
589     // meta thread executor shared by all HTableInterface instances created
590     // by this connection
591     private volatile ExecutorService metaLookupPool = null;
592     private volatile boolean cleanupPool = false;
593 
594     private final Configuration conf;
595 
596     // cache the configuration value for tables so that we can avoid calling
597     // the expensive Configuration to fetch the value multiple times.
598     private final ConnectionConfiguration connectionConfig;
599 
600     // Client rpc instance.
601     private RpcClient rpcClient;
602 
603     private final MetaCache metaCache;
604     private final MetricsConnection metrics;
605 
606     private int refCount;
607 
608     // indicates whether this connection's life cycle is managed (by us)
609     private boolean managed;
610 
611     protected User user;
612 
613     private RpcRetryingCallerFactory rpcCallerFactory;
614 
615     private RpcControllerFactory rpcControllerFactory;
616 
617     private final RetryingCallerInterceptor interceptor;
618 
619     /**
620      * Cluster registry of basic info such as clusterid and meta region location.
621      */
622      Registry registry;
623 
624     private final ClientBackoffPolicy backoffPolicy;
625 
626      HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
627        this(conf, managed, null, null);
628      }
629 
630     /**
631      * constructor
632      * @param conf Configuration object
633      * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
634      * to zk and shutdown of all services; we just close down the resources this connection was
635      * responsible for and decrement usage counters.  It is up to the caller to do the full
636      * cleanup.  It is set when we want have connection sharing going on -- reuse of zk connection,
637      * and cached region locations, established regionserver connections, etc.  When connections
638      * are shared, we have reference counting going on and will only do full cleanup when no more
639      * users of an HConnectionImplementation instance.
640      */
641     HConnectionImplementation(Configuration conf, boolean managed,
642         ExecutorService pool, User user) throws IOException {
643       this(conf);
644       this.user = user;
645       this.batchPool = pool;
646       this.managed = managed;
647       this.registry = setupRegistry();
648       retrieveClusterId();
649 
650       this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
651       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
652 
653       // Do we publish the status?
654       boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
655           HConstants.STATUS_PUBLISHED_DEFAULT);
656       Class<? extends ClusterStatusListener.Listener> listenerClass =
657           conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
658               ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
659               ClusterStatusListener.Listener.class);
660       if (shouldListen) {
661         if (listenerClass == null) {
662           LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
663               ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
664         } else {
665           clusterStatusListener = new ClusterStatusListener(
666               new ClusterStatusListener.DeadServerHandler() {
667                 @Override
668                 public void newDead(ServerName sn) {
669                   clearCaches(sn);
670                   rpcClient.cancelConnections(sn);
671                 }
672               }, conf, listenerClass);
673         }
674       }
675     }
676 
677     /**
678      * For tests.
679      */
680     protected HConnectionImplementation(Configuration conf) {
681       this.conf = conf;
682       this.connectionConfig = new ConnectionConfiguration(conf);
683       this.closed = false;
684       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
685           HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
686       this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
687           HConstants.DEFAULT_USE_META_REPLICAS);
688       this.numTries = connectionConfig.getRetriesNumber();
689       this.rpcTimeout = conf.getInt(
690           HConstants.HBASE_RPC_TIMEOUT_KEY,
691           HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
692       if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
693         synchronized (nonceGeneratorCreateLock) {
694           if (ConnectionManager.nonceGenerator == null) {
695             ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
696           }
697           this.nonceGenerator = ConnectionManager.nonceGenerator;
698         }
699       } else {
700         this.nonceGenerator = new NoNonceGenerator();
701       }
702       stats = ServerStatisticTracker.create(conf);
703       this.asyncProcess = createAsyncProcess(this.conf);
704       this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
705       this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
706       this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
707       if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
708         this.metrics = new MetricsConnection(this);
709       } else {
710         this.metrics = null;
711       }
712       
713       this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
714       this.metaCache = new MetaCache(this.metrics);
715     }
716 
717     @Override
718     public HTableInterface getTable(String tableName) throws IOException {
719       return getTable(TableName.valueOf(tableName));
720     }
721 
722     @Override
723     public HTableInterface getTable(byte[] tableName) throws IOException {
724       return getTable(TableName.valueOf(tableName));
725     }
726 
727     @Override
728     public HTableInterface getTable(TableName tableName) throws IOException {
729       return getTable(tableName, getBatchPool());
730     }
731 
732     @Override
733     public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
734       return getTable(TableName.valueOf(tableName), pool);
735     }
736 
737     @Override
738     public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
739       return getTable(TableName.valueOf(tableName), pool);
740     }
741 
742     @Override
743     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
744       if (managed) {
745         throw new NeedUnmanagedConnectionException();
746       }
747       return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool);
748     }
749 
750     @Override
751     public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
752       if (params.getTableName() == null) {
753         throw new IllegalArgumentException("TableName cannot be null.");
754       }
755       if (params.getPool() == null) {
756         params.pool(HTable.getDefaultExecutor(getConfiguration()));
757       }
758       if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
759         params.writeBufferSize(connectionConfig.getWriteBufferSize());
760       }
761       if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
762         params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
763       }
764       return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
765     }
766 
767     @Override
768     public BufferedMutator getBufferedMutator(TableName tableName) {
769       return getBufferedMutator(new BufferedMutatorParams(tableName));
770     }
771 
772     @Override
773     public RegionLocator getRegionLocator(TableName tableName) throws IOException {
774       return new HRegionLocator(tableName, this);
775     }
776 
777     @Override
778     public Admin getAdmin() throws IOException {
779       if (managed) {
780         throw new NeedUnmanagedConnectionException();
781       }
782       return new HBaseAdmin(this);
783     }
784 
785     @Override
786     public MetricsConnection getConnectionMetrics() {
787       return this.metrics;
788     }
789 
790     private ExecutorService getBatchPool() {
791       if (batchPool == null) {
792         synchronized (this) {
793           if (batchPool == null) {
794             this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
795                 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
796             this.cleanupPool = true;
797           }
798         }
799       }
800       return this.batchPool;
801     }
802 
803     private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
804         BlockingQueue<Runnable> passedWorkQueue) {
805       // shared HTable thread executor not yet initialized
806       if (maxThreads == 0) {
807         maxThreads = Runtime.getRuntime().availableProcessors() * 8;
808       }
809       if (coreThreads == 0) {
810         coreThreads = Runtime.getRuntime().availableProcessors() * 8;
811       }
812       long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
813       BlockingQueue<Runnable> workQueue = passedWorkQueue;
814       if (workQueue == null) {
815         workQueue =
816           new LinkedBlockingQueue<Runnable>(maxThreads *
817               conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
818                   HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
819       }
820       ThreadPoolExecutor tpe = new ThreadPoolExecutor(
821           coreThreads,
822           maxThreads,
823           keepAliveTime,
824           TimeUnit.SECONDS,
825           workQueue,
826           Threads.newDaemonThreadFactory(toString() + nameHint));
827       tpe.allowCoreThreadTimeOut(true);
828       return tpe;
829     }
830 
831     private ExecutorService getMetaLookupPool() {
832       if (this.metaLookupPool == null) {
833         synchronized (this) {
834           if (this.metaLookupPool == null) {
835             //Some of the threads would be used for meta replicas
836             //To start with, threads.max.core threads can hit the meta (including replicas).
837             //After that, requests will get queued up in the passed queue, and only after
838             //the queue is full, a new thread will be started
839             this.metaLookupPool = getThreadPool(
840                conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
841                conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
842              "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
843           }
844         }
845       }
846       return this.metaLookupPool;
847     }
848 
849     protected ExecutorService getCurrentMetaLookupPool() {
850       return metaLookupPool;
851     }
852 
853     protected ExecutorService getCurrentBatchPool() {
854       return batchPool;
855     }
856 
857     private void shutdownPools() {
858       if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
859         shutdownBatchPool(this.batchPool);
860       }
861       if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
862         shutdownBatchPool(this.metaLookupPool);
863       }
864     }
865 
866     private void shutdownBatchPool(ExecutorService pool) {
867       pool.shutdown();
868       try {
869         if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
870           pool.shutdownNow();
871         }
872       } catch (InterruptedException e) {
873         pool.shutdownNow();
874       }
875     }
876 
877     /**
878      * @return The cluster registry implementation to use.
879      * @throws IOException
880      */
881     private Registry setupRegistry() throws IOException {
882       return RegistryFactory.getRegistry(this);
883     }
884 
885     /**
886      * For tests only.
887      */
888     @VisibleForTesting
889     RpcClient getRpcClient() {
890       return rpcClient;
891     }
892 
893     /**
894      * An identifier that will remain the same for a given connection.
895      */
896     @Override
897     public String toString(){
898       return "hconnection-0x" + Integer.toHexString(hashCode());
899     }
900 
901     protected String clusterId = null;
902 
903     void retrieveClusterId() {
904       if (clusterId != null) return;
905       this.clusterId = this.registry.getClusterId();
906       if (clusterId == null) {
907         clusterId = HConstants.CLUSTER_ID_DEFAULT;
908         LOG.debug("clusterid came back null, using default " + clusterId);
909       }
910     }
911 
912     @Override
913     public Configuration getConfiguration() {
914       return this.conf;
915     }
916 
917     private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
918       throws MasterNotRunningException {
919       String errorMsg;
920       try {
921         if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
922           errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
923             + "It should have been written by the master. "
924             + "Check the value configured in 'zookeeper.znode.parent'. "
925             + "There could be a mismatch with the one configured in the master.";
926           LOG.error(errorMsg);
927           throw new MasterNotRunningException(errorMsg);
928         }
929       } catch (KeeperException e) {
930         errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
931         LOG.error(errorMsg);
932         throw new MasterNotRunningException(errorMsg, e);
933       }
934     }
935 
936     /**
937      * @return true if the master is running, throws an exception otherwise
938      * @throws MasterNotRunningException - if the master is not running
939      * @throws ZooKeeperConnectionException
940      */
941     @Deprecated
942     @Override
943     public boolean isMasterRunning()
944     throws MasterNotRunningException, ZooKeeperConnectionException {
945       // When getting the master connection, we check it's running,
946       // so if there is no exception, it means we've been able to get a
947       // connection on a running master
948       MasterKeepAliveConnection m = getKeepAliveMasterService();
949       m.close();
950       return true;
951     }
952 
953     @Override
954     public HRegionLocation getRegionLocation(final TableName tableName,
955         final byte [] row, boolean reload)
956     throws IOException {
957       return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
958     }
959 
960     @Override
961     public HRegionLocation getRegionLocation(final byte[] tableName,
962         final byte [] row, boolean reload)
963     throws IOException {
964       return getRegionLocation(TableName.valueOf(tableName), row, reload);
965     }
966 
967     @Override
968     public boolean isTableEnabled(TableName tableName) throws IOException {
969       return this.registry.isTableOnlineState(tableName, true);
970     }
971 
972     @Override
973     public boolean isTableEnabled(byte[] tableName) throws IOException {
974       return isTableEnabled(TableName.valueOf(tableName));
975     }
976 
977     @Override
978     public boolean isTableDisabled(TableName tableName) throws IOException {
979       return this.registry.isTableOnlineState(tableName, false);
980     }
981 
982     @Override
983     public boolean isTableDisabled(byte[] tableName) throws IOException {
984       return isTableDisabled(TableName.valueOf(tableName));
985     }
986 
987     @Override
988     public boolean isTableAvailable(final TableName tableName) throws IOException {
989       final AtomicBoolean available = new AtomicBoolean(true);
990       final AtomicInteger regionCount = new AtomicInteger(0);
991       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
992         @Override
993         public boolean processRow(Result row) throws IOException {
994           HRegionInfo info = MetaScanner.getHRegionInfo(row);
995           if (info != null && !info.isSplitParent()) {
996             if (tableName.equals(info.getTable())) {
997               ServerName server = HRegionInfo.getServerName(row);
998               if (server == null) {
999                 available.set(false);
1000                 return false;
1001               }
1002               regionCount.incrementAndGet();
1003             } else if (tableName.compareTo(info.getTable()) < 0) {
1004               // Return if we are done with the current table
1005               return false;
1006             }
1007           }
1008           return true;
1009         }
1010       };
1011       MetaScanner.metaScan(this, visitor, tableName);
1012       return available.get() && (regionCount.get() > 0);
1013     }
1014 
1015     @Override
1016     public boolean isTableAvailable(final byte[] tableName) throws IOException {
1017       return isTableAvailable(TableName.valueOf(tableName));
1018     }
1019 
1020     @Override
1021     public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
1022         throws IOException {
1023       final AtomicBoolean available = new AtomicBoolean(true);
1024       final AtomicInteger regionCount = new AtomicInteger(0);
1025       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1026         @Override
1027         public boolean processRow(Result row) throws IOException {
1028           HRegionInfo info = MetaScanner.getHRegionInfo(row);
1029           if (info != null && !info.isSplitParent()) {
1030             if (tableName.equals(info.getTable())) {
1031               ServerName server = HRegionInfo.getServerName(row);
1032               if (server == null) {
1033                 available.set(false);
1034                 return false;
1035               }
1036               if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1037                 for (byte[] splitKey : splitKeys) {
1038                   // Just check if the splitkey is available
1039                   if (Bytes.equals(info.getStartKey(), splitKey)) {
1040                     regionCount.incrementAndGet();
1041                     break;
1042                   }
1043                 }
1044               } else {
1045                 // Always empty start row should be counted
1046                 regionCount.incrementAndGet();
1047               }
1048             } else if (tableName.compareTo(info.getTable()) < 0) {
1049               // Return if we are done with the current table
1050               return false;
1051             }
1052           }
1053           return true;
1054         }
1055       };
1056       MetaScanner.metaScan(this, visitor, tableName);
1057       // +1 needs to be added so that the empty start row is also taken into account
1058       return available.get() && (regionCount.get() == splitKeys.length + 1);
1059     }
1060 
1061     @Override
1062     public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1063         throws IOException {
1064       return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1065     }
1066 
1067     @Override
1068     public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1069       RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
1070         HRegionInfo.getStartKey(regionName), false, true);
1071       return locations == null ? null : locations.getRegionLocation();
1072     }
1073 
1074     @Override
1075     public boolean isDeadServer(ServerName sn) {
1076       if (clusterStatusListener == null) {
1077         return false;
1078       } else {
1079         return clusterStatusListener.isDeadServer(sn);
1080       }
1081     }
1082 
1083     @Override
1084     public List<HRegionLocation> locateRegions(final TableName tableName)
1085     throws IOException {
1086       return locateRegions (tableName, false, true);
1087     }
1088 
1089     @Override
1090     public List<HRegionLocation> locateRegions(final byte[] tableName)
1091     throws IOException {
1092       return locateRegions(TableName.valueOf(tableName));
1093     }
1094 
1095     @Override
1096     public List<HRegionLocation> locateRegions(final TableName tableName,
1097         final boolean useCache, final boolean offlined) throws IOException {
1098       NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(this, tableName);
1099       final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1100       for (HRegionInfo regionInfo : regions.keySet()) {
1101         RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
1102         if (list != null) {
1103           for (HRegionLocation loc : list.getRegionLocations()) {
1104             if (loc != null) {
1105               locations.add(loc);
1106             }
1107           }
1108         }
1109       }
1110       return locations;
1111     }
1112 
1113     @Override
1114     public List<HRegionLocation> locateRegions(final byte[] tableName,
1115        final boolean useCache, final boolean offlined) throws IOException {
1116       return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1117     }
1118 
1119     @Override
1120     public HRegionLocation locateRegion(
1121         final TableName tableName, final byte[] row) throws IOException{
1122       RegionLocations locations = locateRegion(tableName, row, true, true);
1123       return locations == null ? null : locations.getRegionLocation();
1124     }
1125 
1126     @Override
1127     public HRegionLocation locateRegion(final byte[] tableName,
1128         final byte [] row)
1129     throws IOException{
1130       return locateRegion(TableName.valueOf(tableName), row);
1131     }
1132 
1133     @Override
1134     public HRegionLocation relocateRegion(final TableName tableName,
1135         final byte [] row) throws IOException{
1136       RegionLocations locations =  relocateRegion(tableName, row,
1137         RegionReplicaUtil.DEFAULT_REPLICA_ID);
1138       return locations == null ? null :
1139         locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
1140     }
1141 
1142     @Override
1143     public RegionLocations relocateRegion(final TableName tableName,
1144         final byte [] row, int replicaId) throws IOException{
1145       // Since this is an explicit request not to use any caching, finding
1146       // disabled tables should not be desirable.  This will ensure that an exception is thrown when
1147       // the first time a disabled table is interacted with.
1148       if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
1149         throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1150       }
1151 
1152       return locateRegion(tableName, row, false, true, replicaId);
1153     }
1154 
1155     @Override
1156     public HRegionLocation relocateRegion(final byte[] tableName,
1157         final byte [] row) throws IOException {
1158       return relocateRegion(TableName.valueOf(tableName), row);
1159     }
1160 
1161     @Override
1162     public RegionLocations locateRegion(final TableName tableName,
1163       final byte [] row, boolean useCache, boolean retry)
1164     throws IOException {
1165       return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
1166     }
1167 
1168     @Override
1169     public RegionLocations locateRegion(final TableName tableName,
1170       final byte [] row, boolean useCache, boolean retry, int replicaId)
1171     throws IOException {
1172       if (this.closed) throw new IOException(toString() + " closed");
1173       if (tableName== null || tableName.getName().length == 0) {
1174         throw new IllegalArgumentException(
1175             "table name cannot be null or zero length");
1176       }
1177       if (tableName.equals(TableName.META_TABLE_NAME)) {
1178         return locateMeta(tableName, useCache, replicaId);
1179       } else {
1180         // Region not in the cache - have to go to the meta RS
1181         return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
1182       }
1183     }
1184 
1185     private RegionLocations locateMeta(final TableName tableName,
1186         boolean useCache, int replicaId) throws IOException {
1187       // HBASE-10785: We cache the location of the META itself, so that we are not overloading
1188       // zookeeper with one request for every region lookup. We cache the META with empty row
1189       // key in MetaCache.
1190       byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
1191       RegionLocations locations = null;
1192       if (useCache) {
1193         locations = getCachedLocation(tableName, metaCacheKey);
1194         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1195           return locations;
1196         }
1197       }
1198 
1199       // only one thread should do the lookup.
1200       synchronized (metaRegionLock) {
1201         // Check the cache again for a hit in case some other thread made the
1202         // same query while we were waiting on the lock.
1203         if (useCache) {
1204           locations = getCachedLocation(tableName, metaCacheKey);
1205           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1206             return locations;
1207           }
1208         }
1209 
1210         // Look up from zookeeper
1211         locations = this.registry.getMetaRegionLocation();
1212         if (locations != null) {
1213           cacheLocation(tableName, locations);
1214         }
1215       }
1216       return locations;
1217     }
1218 
1219     /*
1220       * Search the hbase:meta table for the HRegionLocation
1221       * info that contains the table and row we're seeking.
1222       */
1223     private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
1224                    boolean useCache, boolean retry, int replicaId) throws IOException {
1225 
1226       // If we are supposed to be using the cache, look in the cache to see if
1227       // we already have the region.
1228       if (useCache) {
1229         RegionLocations locations = getCachedLocation(tableName, row);
1230         if (locations != null && locations.getRegionLocation(replicaId) != null) {
1231           return locations;
1232         }
1233       }
1234 
1235       // build the key of the meta region we should be looking for.
1236       // the extra 9's on the end are necessary to allow "exact" matches
1237       // without knowing the precise region names.
1238       byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1239 
1240       Scan s = new Scan();
1241       s.setReversed(true);
1242       s.setStartRow(metaKey);
1243       s.setSmall(true);
1244       s.setCaching(1);
1245       if (this.useMetaReplicas) {
1246         s.setConsistency(Consistency.TIMELINE);
1247       }
1248 
1249       int localNumRetries = (retry ? numTries : 1);
1250 
1251       for (int tries = 0; true; tries++) {
1252         if (tries >= localNumRetries) {
1253           throw new NoServerForRegionException("Unable to find region for "
1254               + Bytes.toStringBinary(row) + " in " + tableName +
1255               " after " + localNumRetries + " tries.");
1256         }
1257         if (useCache) {
1258           RegionLocations locations = getCachedLocation(tableName, row);
1259           if (locations != null && locations.getRegionLocation(replicaId) != null) {
1260             return locations;
1261           }
1262         } else {
1263           // If we are not supposed to be using the cache, delete any existing cached location
1264           // so it won't interfere.
1265           metaCache.clearCache(tableName, row);
1266         }
1267 
1268         // Query the meta region
1269         try {
1270           Result regionInfoRow = null;
1271           ReversedClientScanner rcs = null;
1272           try {
1273             rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
1274               rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
1275             regionInfoRow = rcs.next();
1276           } finally {
1277             if (rcs != null) {
1278               rcs.close();
1279             }
1280           }
1281 
1282           if (regionInfoRow == null) {
1283             throw new TableNotFoundException(tableName);
1284           }
1285 
1286           // convert the row result into the HRegionLocation we need!
1287           RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
1288           if (locations == null || locations.getRegionLocation(replicaId) == null) {
1289             throw new IOException("HRegionInfo was null in " +
1290               tableName + ", row=" + regionInfoRow);
1291           }
1292           HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
1293           if (regionInfo == null) {
1294             throw new IOException("HRegionInfo was null or empty in " +
1295               TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
1296           }
1297 
1298           // possible we got a region of a different table...
1299           if (!regionInfo.getTable().equals(tableName)) {
1300             throw new TableNotFoundException(
1301                   "Table '" + tableName + "' was not found, got: " +
1302                   regionInfo.getTable() + ".");
1303           }
1304           if (regionInfo.isSplit()) {
1305             throw new RegionOfflineException("the only available region for" +
1306               " the required row is a split parent," +
1307               " the daughters should be online soon: " +
1308               regionInfo.getRegionNameAsString());
1309           }
1310           if (regionInfo.isOffline()) {
1311             throw new RegionOfflineException("the region is offline, could" +
1312               " be caused by a disable table call: " +
1313               regionInfo.getRegionNameAsString());
1314           }
1315 
1316           ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1317           if (serverName == null) {
1318             throw new NoServerForRegionException("No server address listed " +
1319               "in " + TableName.META_TABLE_NAME + " for region " +
1320               regionInfo.getRegionNameAsString() + " containing row " +
1321               Bytes.toStringBinary(row));
1322           }
1323 
1324           if (isDeadServer(serverName)){
1325             throw new RegionServerStoppedException("hbase:meta says the region "+
1326                 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1327                 ", but it is dead.");
1328           }
1329           // Instantiate the location
1330           cacheLocation(tableName, locations);
1331           return locations;
1332         } catch (TableNotFoundException e) {
1333           // if we got this error, probably means the table just plain doesn't
1334           // exist. rethrow the error immediately. this should always be coming
1335           // from the HTable constructor.
1336           throw e;
1337         } catch (IOException e) {
1338           ExceptionUtil.rethrowIfInterrupt(e);
1339 
1340           if (e instanceof RemoteException) {
1341             e = ((RemoteException)e).unwrapRemoteException();
1342           }
1343           if (tries < localNumRetries - 1) {
1344             if (LOG.isDebugEnabled()) {
1345               LOG.debug("locateRegionInMeta parentTable=" +
1346                   TableName.META_TABLE_NAME + ", metaLocation=" +
1347                 ", attempt=" + tries + " of " +
1348                 localNumRetries + " failed; retrying after sleep of " +
1349                 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
1350             }
1351           } else {
1352             throw e;
1353           }
1354           // Only relocate the parent region if necessary
1355           if(!(e instanceof RegionOfflineException ||
1356               e instanceof NoServerForRegionException)) {
1357             relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
1358           }
1359         }
1360         try{
1361           Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1362         } catch (InterruptedException e) {
1363           throw new InterruptedIOException("Giving up trying to location region in " +
1364             "meta: thread is interrupted.");
1365         }
1366       }
1367     }
1368 
1369     /**
1370      * Put a newly discovered HRegionLocation into the cache.
1371      * @param tableName The table name.
1372      * @param location the new location
1373      */
1374     private void cacheLocation(final TableName tableName, final RegionLocations location) {
1375       metaCache.cacheLocation(tableName, location);
1376     }
1377 
1378     /**
1379      * Search the cache for a location that fits our table and row key.
1380      * Return null if no suitable region is located.
1381      *
1382      * @param tableName
1383      * @param row
1384      * @return Null or region location found in cache.
1385      */
1386     RegionLocations getCachedLocation(final TableName tableName,
1387         final byte [] row) {
1388       return metaCache.getCachedLocation(tableName, row);
1389     }
1390 
1391     public void clearRegionCache(final TableName tableName, byte[] row) {
1392       metaCache.clearCache(tableName, row);
1393     }
1394 
1395     /*
1396      * Delete all cached entries of a table that maps to a specific location.
1397      */
1398     @Override
1399     public void clearCaches(final ServerName serverName) {
1400       metaCache.clearCache(serverName);
1401     }
1402 
1403     @Override
1404     public void clearRegionCache() {
1405       metaCache.clearCache();
1406     }
1407 
1408     @Override
1409     public void clearRegionCache(final TableName tableName) {
1410       metaCache.clearCache(tableName);
1411     }
1412 
1413     @Override
1414     public void clearRegionCache(final byte[] tableName) {
1415       clearRegionCache(TableName.valueOf(tableName));
1416     }
1417 
1418     /**
1419      * Put a newly discovered HRegionLocation into the cache.
1420      * @param tableName The table name.
1421      * @param source the source of the new location, if it's not coming from meta
1422      * @param location the new location
1423      */
1424     private void cacheLocation(final TableName tableName, final ServerName source,
1425         final HRegionLocation location) {
1426       metaCache.cacheLocation(tableName, source, location);
1427     }
1428 
1429     // Map keyed by service name + regionserver to service stub implementation
1430     private final ConcurrentHashMap<String, Object> stubs =
1431       new ConcurrentHashMap<String, Object>();
1432     // Map of locks used creating service stubs per regionserver.
1433     private final ConcurrentHashMap<String, String> connectionLock =
1434       new ConcurrentHashMap<String, String>();
1435 
1436     /**
1437      * State of the MasterService connection/setup.
1438      */
1439     static class MasterServiceState {
1440       HConnection connection;
1441       MasterService.BlockingInterface stub;
1442       int userCount;
1443 
1444       MasterServiceState (final HConnection connection) {
1445         super();
1446         this.connection = connection;
1447       }
1448 
1449       @Override
1450       public String toString() {
1451         return "MasterService";
1452       }
1453 
1454       Object getStub() {
1455         return this.stub;
1456       }
1457 
1458       void clearStub() {
1459         this.stub = null;
1460       }
1461 
1462       boolean isMasterRunning() throws ServiceException {
1463         IsMasterRunningResponse response =
1464           this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1465         return response != null? response.getIsMasterRunning(): false;
1466       }
1467     }
1468 
1469     /**
1470      * Makes a client-side stub for master services. Sub-class to specialize.
1471      * Depends on hosting class so not static.  Exists so we avoid duplicating a bunch of code
1472      * when setting up the MasterMonitorService and MasterAdminService.
1473      */
1474     abstract class StubMaker {
1475       /**
1476        * Returns the name of the service stub being created.
1477        */
1478       protected abstract String getServiceName();
1479 
1480       /**
1481        * Make stub and cache it internal so can be used later doing the isMasterRunning call.
1482        * @param channel
1483        */
1484       protected abstract Object makeStub(final BlockingRpcChannel channel);
1485 
1486       /**
1487        * Once setup, check it works by doing isMasterRunning check.
1488        * @throws ServiceException
1489        */
1490       protected abstract void isMasterRunning() throws ServiceException;
1491 
1492       /**
1493        * Create a stub. Try once only.  It is not typed because there is no common type to
1494        * protobuf services nor their interfaces.  Let the caller do appropriate casting.
1495        * @return A stub for master services.
1496        * @throws IOException
1497        * @throws KeeperException
1498        * @throws ServiceException
1499        */
1500       private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1501         ZooKeeperKeepAliveConnection zkw;
1502         try {
1503           zkw = getKeepAliveZooKeeperWatcher();
1504         } catch (IOException e) {
1505           ExceptionUtil.rethrowIfInterrupt(e);
1506           throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1507         }
1508         try {
1509           checkIfBaseNodeAvailable(zkw);
1510           ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1511           if (sn == null) {
1512             String msg = "ZooKeeper available but no active master location found";
1513             LOG.info(msg);
1514             throw new MasterNotRunningException(msg);
1515           }
1516           if (isDeadServer(sn)) {
1517             throw new MasterNotRunningException(sn + " is dead.");
1518           }
1519           // Use the security info interface name as our stub key
1520           String key = getStubKey(getServiceName(),
1521               sn.getHostname(), sn.getPort(), hostnamesCanChange);
1522           connectionLock.putIfAbsent(key, key);
1523           Object stub = null;
1524           synchronized (connectionLock.get(key)) {
1525             stub = stubs.get(key);
1526             if (stub == null) {
1527               BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1528               stub = makeStub(channel);
1529               isMasterRunning();
1530               stubs.put(key, stub);
1531             }
1532           }
1533           return stub;
1534         } finally {
1535           zkw.close();
1536         }
1537       }
1538 
1539       /**
1540        * Create a stub against the master.  Retry if necessary.
1541        * @return A stub to do <code>intf</code> against the master
1542        * @throws MasterNotRunningException
1543        */
1544       Object makeStub() throws IOException {
1545         // The lock must be at the beginning to prevent multiple master creations
1546         //  (and leaks) in a multithread context
1547         synchronized (masterAndZKLock) {
1548           Exception exceptionCaught = null;
1549           if (!closed) {
1550             try {
1551               return makeStubNoRetries();
1552             } catch (IOException e) {
1553               exceptionCaught = e;
1554             } catch (KeeperException e) {
1555               exceptionCaught = e;
1556             } catch (ServiceException e) {
1557               exceptionCaught = e;
1558             }
1559 
1560             throw new MasterNotRunningException(exceptionCaught);
1561           } else {
1562             throw new DoNotRetryIOException("Connection was closed while trying to get master");
1563           }
1564         }
1565       }
1566     }
1567 
1568     /**
1569      * Class to make a MasterServiceStubMaker stub.
1570      */
1571     class MasterServiceStubMaker extends StubMaker {
1572       private MasterService.BlockingInterface stub;
1573       @Override
1574       protected String getServiceName() {
1575         return MasterService.getDescriptor().getName();
1576       }
1577 
1578       @Override
1579       MasterService.BlockingInterface makeStub() throws IOException {
1580         return (MasterService.BlockingInterface)super.makeStub();
1581       }
1582 
1583       @Override
1584       protected Object makeStub(BlockingRpcChannel channel) {
1585         this.stub = MasterService.newBlockingStub(channel);
1586         return this.stub;
1587       }
1588 
1589       @Override
1590       protected void isMasterRunning() throws ServiceException {
1591         this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1592       }
1593     }
1594 
1595     @Override
1596     public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1597         throws IOException {
1598       return getAdmin(serverName, false);
1599     }
1600 
1601     @Override
1602     // Nothing is done w/ the 'master' parameter.  It is ignored.
1603     public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1604       final boolean master)
1605     throws IOException {
1606       if (isDeadServer(serverName)) {
1607         throw new RegionServerStoppedException(serverName + " is dead.");
1608       }
1609       String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1610           serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange);
1611       this.connectionLock.putIfAbsent(key, key);
1612       AdminService.BlockingInterface stub = null;
1613       synchronized (this.connectionLock.get(key)) {
1614         stub = (AdminService.BlockingInterface)this.stubs.get(key);
1615         if (stub == null) {
1616           BlockingRpcChannel channel =
1617               this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1618           stub = AdminService.newBlockingStub(channel);
1619           this.stubs.put(key, stub);
1620         }
1621       }
1622       return stub;
1623     }
1624 
1625     @Override
1626     public ClientService.BlockingInterface getClient(final ServerName sn)
1627     throws IOException {
1628       if (isDeadServer(sn)) {
1629         throw new RegionServerStoppedException(sn + " is dead.");
1630       }
1631       String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
1632           sn.getPort(), this.hostnamesCanChange);
1633       this.connectionLock.putIfAbsent(key, key);
1634       ClientService.BlockingInterface stub = null;
1635       synchronized (this.connectionLock.get(key)) {
1636         stub = (ClientService.BlockingInterface)this.stubs.get(key);
1637         if (stub == null) {
1638           BlockingRpcChannel channel =
1639               this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1640           stub = ClientService.newBlockingStub(channel);
1641           // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
1642           // Just fail on first actual call rather than in here on setup.
1643           this.stubs.put(key, stub);
1644         }
1645       }
1646       return stub;
1647     }
1648 
1649     static String getStubKey(final String serviceName,
1650                              final String rsHostname,
1651                              int port,
1652                              boolean resolveHostnames) {
1653 
1654       // Sometimes, servers go down and they come back up with the same hostname but a different
1655       // IP address. Force a resolution of the rsHostname by trying to instantiate an
1656       // InetSocketAddress, and this way we will rightfully get a new stubKey.
1657       // Also, include the hostname in the key so as to take care of those cases where the
1658       // DNS name is different but IP address remains the same.
1659       String address = rsHostname;
1660       if (resolveHostnames) {
1661         InetAddress i =  new InetSocketAddress(rsHostname, port).getAddress();
1662         if (i != null) {
1663           address = i.getHostAddress() + "-" + rsHostname;
1664         }
1665       }
1666       return serviceName + "@" + address + ":" + port;
1667     }
1668 
1669     private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1670     private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1671     private boolean canCloseZKW = true;
1672 
1673     // keepAlive time, in ms. No reason to make it configurable.
1674     private static final long keepAlive = 5 * 60 * 1000;
1675 
1676     /**
1677      * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
1678      * @return The shared instance. Never returns null.
1679      */
1680     ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1681       throws IOException {
1682       synchronized (masterAndZKLock) {
1683         if (keepAliveZookeeper == null) {
1684           if (this.closed) {
1685             throw new IOException(toString() + " closed");
1686           }
1687           // We don't check that our link to ZooKeeper is still valid
1688           // But there is a retry mechanism in the ZooKeeperWatcher itself
1689           keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1690         }
1691         keepAliveZookeeperUserCount.addAndGet(1);
1692         keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1693         return keepAliveZookeeper;
1694       }
1695     }
1696 
1697     void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1698       if (zkw == null){
1699         return;
1700       }
1701       if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0 ){
1702         keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1703       }
1704     }
1705 
1706     private void closeZooKeeperWatcher() {
1707       synchronized (masterAndZKLock) {
1708         if (keepAliveZookeeper != null) {
1709           LOG.info("Closing zookeeper sessionid=0x" +
1710             Long.toHexString(
1711               keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1712           keepAliveZookeeper.internalClose();
1713           keepAliveZookeeper = null;
1714         }
1715         keepAliveZookeeperUserCount.set(0);
1716       }
1717     }
1718 
1719     final MasterServiceState masterServiceState = new MasterServiceState(this);
1720 
1721     @Override
1722     public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1723       return getKeepAliveMasterService();
1724     }
1725 
1726     private void resetMasterServiceState(final MasterServiceState mss) {
1727       mss.userCount++;
1728     }
1729 
1730     @Override
1731     public MasterKeepAliveConnection getKeepAliveMasterService()
1732     throws MasterNotRunningException {
1733       synchronized (masterAndZKLock) {
1734         if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1735           MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1736           try {
1737             this.masterServiceState.stub = stubMaker.makeStub();
1738           } catch (MasterNotRunningException ex) {
1739             throw ex;
1740           } catch (IOException e) {
1741             // rethrow as MasterNotRunningException so that we can keep the method sig
1742             throw new MasterNotRunningException(e);
1743           }
1744         }
1745         resetMasterServiceState(this.masterServiceState);
1746       }
1747       // Ugly delegation just so we can add in a Close method.
1748       final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1749       return new MasterKeepAliveConnection() {
1750         MasterServiceState mss = masterServiceState;
1751         @Override
1752         public MasterProtos.AbortProcedureResponse abortProcedure(
1753           RpcController controller,
1754           MasterProtos.AbortProcedureRequest request) throws ServiceException {
1755           return stub.abortProcedure(controller, request);
1756         }
1757         @Override
1758         public MasterProtos.ListProceduresResponse listProcedures(
1759             RpcController controller,
1760             MasterProtos.ListProceduresRequest request) throws ServiceException {
1761           return stub.listProcedures(controller, request);
1762         }
1763         @Override
1764         public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1765         throws ServiceException {
1766           return stub.addColumn(controller, request);
1767         }
1768 
1769         @Override
1770         public DeleteColumnResponse deleteColumn(RpcController controller,
1771             DeleteColumnRequest request)
1772         throws ServiceException {
1773           return stub.deleteColumn(controller, request);
1774         }
1775 
1776         @Override
1777         public ModifyColumnResponse modifyColumn(RpcController controller,
1778             ModifyColumnRequest request)
1779         throws ServiceException {
1780           return stub.modifyColumn(controller, request);
1781         }
1782 
1783         @Override
1784         public MoveRegionResponse moveRegion(RpcController controller,
1785             MoveRegionRequest request) throws ServiceException {
1786           return stub.moveRegion(controller, request);
1787         }
1788 
1789         @Override
1790         public DispatchMergingRegionsResponse dispatchMergingRegions(
1791             RpcController controller, DispatchMergingRegionsRequest request)
1792             throws ServiceException {
1793           return stub.dispatchMergingRegions(controller, request);
1794         }
1795 
1796         @Override
1797         public AssignRegionResponse assignRegion(RpcController controller,
1798             AssignRegionRequest request) throws ServiceException {
1799           return stub.assignRegion(controller, request);
1800         }
1801 
1802         @Override
1803         public UnassignRegionResponse unassignRegion(RpcController controller,
1804             UnassignRegionRequest request) throws ServiceException {
1805           return stub.unassignRegion(controller, request);
1806         }
1807 
1808         @Override
1809         public OfflineRegionResponse offlineRegion(RpcController controller,
1810             OfflineRegionRequest request) throws ServiceException {
1811           return stub.offlineRegion(controller, request);
1812         }
1813 
1814         @Override
1815         public DeleteTableResponse deleteTable(RpcController controller,
1816             DeleteTableRequest request) throws ServiceException {
1817           return stub.deleteTable(controller, request);
1818         }
1819 
1820         @Override
1821         public TruncateTableResponse truncateTable(RpcController controller,
1822             TruncateTableRequest request) throws ServiceException {
1823           return stub.truncateTable(controller, request);
1824         }
1825 
1826         @Override
1827         public EnableTableResponse enableTable(RpcController controller,
1828             EnableTableRequest request) throws ServiceException {
1829           return stub.enableTable(controller, request);
1830         }
1831 
1832         @Override
1833         public DisableTableResponse disableTable(RpcController controller,
1834             DisableTableRequest request) throws ServiceException {
1835           return stub.disableTable(controller, request);
1836         }
1837 
1838         @Override
1839         public ModifyTableResponse modifyTable(RpcController controller,
1840             ModifyTableRequest request) throws ServiceException {
1841           return stub.modifyTable(controller, request);
1842         }
1843 
1844         @Override
1845         public CreateTableResponse createTable(RpcController controller,
1846             CreateTableRequest request) throws ServiceException {
1847           return stub.createTable(controller, request);
1848         }
1849 
1850         @Override
1851         public ShutdownResponse shutdown(RpcController controller,
1852             ShutdownRequest request) throws ServiceException {
1853           return stub.shutdown(controller, request);
1854         }
1855 
1856         @Override
1857         public StopMasterResponse stopMaster(RpcController controller,
1858             StopMasterRequest request) throws ServiceException {
1859           return stub.stopMaster(controller, request);
1860         }
1861 
1862         @Override
1863         public BalanceResponse balance(RpcController controller,
1864             BalanceRequest request) throws ServiceException {
1865           return stub.balance(controller, request);
1866         }
1867 
1868         @Override
1869         public SetBalancerRunningResponse setBalancerRunning(
1870             RpcController controller, SetBalancerRunningRequest request)
1871             throws ServiceException {
1872           return stub.setBalancerRunning(controller, request);
1873         }
1874 
1875         @Override
1876         public NormalizeResponse normalize(RpcController controller,
1877                                        NormalizeRequest request) throws ServiceException {
1878           return stub.normalize(controller, request);
1879         }
1880 
1881         @Override
1882         public SetNormalizerRunningResponse setNormalizerRunning(
1883           RpcController controller, SetNormalizerRunningRequest request)
1884           throws ServiceException {
1885           return stub.setNormalizerRunning(controller, request);
1886         }
1887 
1888         @Override
1889         public RunCatalogScanResponse runCatalogScan(RpcController controller,
1890             RunCatalogScanRequest request) throws ServiceException {
1891           return stub.runCatalogScan(controller, request);
1892         }
1893 
1894         @Override
1895         public EnableCatalogJanitorResponse enableCatalogJanitor(
1896             RpcController controller, EnableCatalogJanitorRequest request)
1897             throws ServiceException {
1898           return stub.enableCatalogJanitor(controller, request);
1899         }
1900 
1901         @Override
1902         public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1903             RpcController controller, IsCatalogJanitorEnabledRequest request)
1904             throws ServiceException {
1905           return stub.isCatalogJanitorEnabled(controller, request);
1906         }
1907 
1908         @Override
1909         public CoprocessorServiceResponse execMasterService(
1910             RpcController controller, CoprocessorServiceRequest request)
1911             throws ServiceException {
1912           return stub.execMasterService(controller, request);
1913         }
1914 
1915         @Override
1916         public SnapshotResponse snapshot(RpcController controller,
1917             SnapshotRequest request) throws ServiceException {
1918           return stub.snapshot(controller, request);
1919         }
1920 
1921         @Override
1922         public GetCompletedSnapshotsResponse getCompletedSnapshots(
1923             RpcController controller, GetCompletedSnapshotsRequest request)
1924             throws ServiceException {
1925           return stub.getCompletedSnapshots(controller, request);
1926         }
1927 
1928         @Override
1929         public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1930             DeleteSnapshotRequest request) throws ServiceException {
1931           return stub.deleteSnapshot(controller, request);
1932         }
1933 
1934         @Override
1935         public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1936             IsSnapshotDoneRequest request) throws ServiceException {
1937           return stub.isSnapshotDone(controller, request);
1938         }
1939 
1940         @Override
1941         public RestoreSnapshotResponse restoreSnapshot(
1942             RpcController controller, RestoreSnapshotRequest request)
1943             throws ServiceException {
1944           return stub.restoreSnapshot(controller, request);
1945         }
1946 
1947         @Override
1948         public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
1949             RpcController controller, IsRestoreSnapshotDoneRequest request)
1950             throws ServiceException {
1951           return stub.isRestoreSnapshotDone(controller, request);
1952         }
1953 
1954         @Override
1955         public ExecProcedureResponse execProcedure(
1956             RpcController controller, ExecProcedureRequest request)
1957             throws ServiceException {
1958           return stub.execProcedure(controller, request);
1959         }
1960 
1961         @Override
1962         public ExecProcedureResponse execProcedureWithRet(
1963             RpcController controller, ExecProcedureRequest request)
1964             throws ServiceException {
1965           return stub.execProcedureWithRet(controller, request);
1966         }
1967 
1968         @Override
1969         public IsProcedureDoneResponse isProcedureDone(RpcController controller,
1970             IsProcedureDoneRequest request) throws ServiceException {
1971           return stub.isProcedureDone(controller, request);
1972         }
1973 
1974         @Override
1975         public GetProcedureResultResponse getProcedureResult(RpcController controller,
1976             GetProcedureResultRequest request) throws ServiceException {
1977           return stub.getProcedureResult(controller, request);
1978         }
1979 
1980         @Override
1981         public IsMasterRunningResponse isMasterRunning(
1982             RpcController controller, IsMasterRunningRequest request)
1983             throws ServiceException {
1984           return stub.isMasterRunning(controller, request);
1985         }
1986 
1987         @Override
1988         public ModifyNamespaceResponse modifyNamespace(RpcController controller,
1989             ModifyNamespaceRequest request)
1990         throws ServiceException {
1991           return stub.modifyNamespace(controller, request);
1992         }
1993 
1994         @Override
1995         public CreateNamespaceResponse createNamespace(
1996             RpcController controller, CreateNamespaceRequest request) throws ServiceException {
1997           return stub.createNamespace(controller, request);
1998         }
1999 
2000         @Override
2001         public DeleteNamespaceResponse deleteNamespace(
2002             RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
2003           return stub.deleteNamespace(controller, request);
2004         }
2005 
2006         @Override
2007         public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
2008             GetNamespaceDescriptorRequest request) throws ServiceException {
2009           return stub.getNamespaceDescriptor(controller, request);
2010         }
2011 
2012         @Override
2013         public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
2014             ListNamespaceDescriptorsRequest request) throws ServiceException {
2015           return stub.listNamespaceDescriptors(controller, request);
2016         }
2017 
2018         @Override
2019         public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
2020             RpcController controller, ListTableDescriptorsByNamespaceRequest request)
2021                 throws ServiceException {
2022           return stub.listTableDescriptorsByNamespace(controller, request);
2023         }
2024 
2025         @Override
2026         public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
2027             RpcController controller, ListTableNamesByNamespaceRequest request)
2028                 throws ServiceException {
2029           return stub.listTableNamesByNamespace(controller, request);
2030         }
2031 
2032         @Override
2033         public void close() {
2034           release(this.mss);
2035         }
2036 
2037         @Override
2038         public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2039             RpcController controller, GetSchemaAlterStatusRequest request)
2040             throws ServiceException {
2041           return stub.getSchemaAlterStatus(controller, request);
2042         }
2043 
2044         @Override
2045         public GetTableDescriptorsResponse getTableDescriptors(
2046             RpcController controller, GetTableDescriptorsRequest request)
2047             throws ServiceException {
2048           return stub.getTableDescriptors(controller, request);
2049         }
2050 
2051         @Override
2052         public GetTableNamesResponse getTableNames(
2053             RpcController controller, GetTableNamesRequest request)
2054             throws ServiceException {
2055           return stub.getTableNames(controller, request);
2056         }
2057 
2058         @Override
2059         public GetClusterStatusResponse getClusterStatus(
2060             RpcController controller, GetClusterStatusRequest request)
2061             throws ServiceException {
2062           return stub.getClusterStatus(controller, request);
2063         }
2064 
2065         @Override
2066         public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
2067             throws ServiceException {
2068           return stub.setQuota(controller, request);
2069         }
2070 
2071         @Override
2072         public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
2073             RpcController controller, MajorCompactionTimestampRequest request)
2074             throws ServiceException {
2075           return stub.getLastMajorCompactionTimestamp(controller, request);
2076         }
2077 
2078         @Override
2079         public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
2080             RpcController controller, MajorCompactionTimestampForRegionRequest request)
2081             throws ServiceException {
2082           return stub.getLastMajorCompactionTimestampForRegion(controller, request);
2083         }
2084 
2085         @Override
2086         public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
2087             IsBalancerEnabledRequest request) throws ServiceException {
2088           return stub.isBalancerEnabled(controller, request);
2089         }
2090 
2091         @Override
2092         public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
2093             IsNormalizerEnabledRequest request) throws ServiceException {
2094           return stub.isNormalizerEnabled(controller, request);
2095         }
2096 
2097         @Override
2098         public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
2099             SecurityCapabilitiesRequest request) throws ServiceException {
2100           return stub.getSecurityCapabilities(controller, request);
2101         }
2102       };
2103     }
2104 
2105 
2106     private static void release(MasterServiceState mss) {
2107       if (mss != null && mss.connection != null) {
2108         ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2109       }
2110     }
2111 
2112     private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2113       if (mss.getStub() == null){
2114         return false;
2115       }
2116       try {
2117         return mss.isMasterRunning();
2118       } catch (UndeclaredThrowableException e) {
2119         // It's somehow messy, but we can receive exceptions such as
2120         //  java.net.ConnectException but they're not declared. So we catch it...
2121         LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2122         return false;
2123       } catch (ServiceException se) {
2124         LOG.warn("Checking master connection", se);
2125         return false;
2126       }
2127     }
2128 
2129     void releaseMaster(MasterServiceState mss) {
2130       if (mss.getStub() == null) return;
2131       synchronized (masterAndZKLock) {
2132         --mss.userCount;
2133       }
2134     }
2135 
2136     private void closeMasterService(MasterServiceState mss) {
2137       if (mss.getStub() != null) {
2138         LOG.info("Closing master protocol: " + mss);
2139         mss.clearStub();
2140       }
2141       mss.userCount = 0;
2142     }
2143 
2144     /**
2145      * Immediate close of the shared master. Can be by the delayed close or when closing the
2146      * connection itself.
2147      */
2148     private void closeMaster() {
2149       synchronized (masterAndZKLock) {
2150         closeMasterService(masterServiceState);
2151       }
2152     }
2153 
2154     void updateCachedLocation(HRegionInfo hri, ServerName source,
2155                               ServerName serverName, long seqNum) {
2156       HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2157       cacheLocation(hri.getTable(), source, newHrl);
2158     }
2159 
2160     @Override
2161     public void deleteCachedRegionLocation(final HRegionLocation location) {
2162       metaCache.clearCache(location);
2163     }
2164 
2165     @Override
2166     public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2167         final Object exception, final HRegionLocation source) {
2168       assert source != null;
2169       updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
2170         , rowkey, exception, source.getServerName());
2171     }
2172 
2173     /**
2174      * Update the location with the new value (if the exception is a RegionMovedException)
2175      * or delete it from the cache. Does nothing if we can be sure from the exception that
2176      * the location is still accurate, or if the cache has already been updated.
2177      * @param exception an object (to simplify user code) on which we will try to find a nested
2178      *                  or wrapped or both RegionMovedException
2179      * @param source server that is the source of the location update.
2180      */
2181     @Override
2182     public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2183       final Object exception, final ServerName source) {
2184       if (rowkey == null || tableName == null) {
2185         LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2186             ", tableName=" + (tableName == null ? "null" : tableName));
2187         return;
2188       }
2189 
2190       if (source == null) {
2191         // This should not happen, but let's secure ourselves.
2192         return;
2193       }
2194 
2195       if (regionName == null) {
2196         // we do not know which region, so just remove the cache entry for the row and server
2197         metaCache.clearCache(tableName, rowkey, source);
2198         return;
2199       }
2200 
2201       // Is it something we have already updated?
2202       final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2203       HRegionLocation oldLocation = null;
2204       if (oldLocations != null) {
2205         oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2206       }
2207       if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2208         // There is no such location in the cache (it's been removed already) or
2209         // the cache has already been refreshed with a different location.  => nothing to do
2210         return;
2211       }
2212 
2213       HRegionInfo regionInfo = oldLocation.getRegionInfo();
2214       Throwable cause = ClientExceptionsUtil.findException(exception);
2215       if (cause != null) {
2216         if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
2217           // We know that the region is still on this region server
2218           return;
2219         }
2220 
2221         if (cause instanceof RegionMovedException) {
2222           RegionMovedException rme = (RegionMovedException) cause;
2223           if (LOG.isTraceEnabled()) {
2224             LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2225                 rme.getHostname() + ":" + rme.getPort() +
2226                 " according to " + source.getHostAndPort());
2227           }
2228           // We know that the region is not anymore on this region server, but we know
2229           //  the new location.
2230           updateCachedLocation(
2231               regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2232           return;
2233         }
2234       }
2235 
2236       // If we're here, it means that can cannot be sure about the location, so we remove it from
2237       // the cache. Do not send the source because source can be a new server in the same host:port
2238       metaCache.clearCache(regionInfo);
2239     }
2240 
2241     @Override
2242     public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2243       final Object exception, final HRegionLocation source) {
2244       updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2245     }
2246 
2247     @Override
2248     @Deprecated
2249     public void processBatch(List<? extends Row> list,
2250         final TableName tableName,
2251         ExecutorService pool,
2252         Object[] results) throws IOException, InterruptedException {
2253       // This belongs in HTable!!! Not in here.  St.Ack
2254 
2255       // results must be the same size as list
2256       if (results.length != list.size()) {
2257         throw new IllegalArgumentException(
2258           "argument results must be the same size as argument list");
2259       }
2260       processBatchCallback(list, tableName, pool, results, null);
2261     }
2262 
2263     @Override
2264     @Deprecated
2265     public void processBatch(List<? extends Row> list,
2266         final byte[] tableName,
2267         ExecutorService pool,
2268         Object[] results) throws IOException, InterruptedException {
2269       processBatch(list, TableName.valueOf(tableName), pool, results);
2270     }
2271 
2272     /**
2273      * Send the queries in parallel on the different region servers. Retries on failures.
2274      * If the method returns it means that there is no error, and the 'results' array will
2275      * contain no exception. On error, an exception is thrown, and the 'results' array will
2276      * contain results and exceptions.
2277      * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
2278      */
2279     @Override
2280     @Deprecated
2281     public <R> void processBatchCallback(
2282       List<? extends Row> list,
2283       TableName tableName,
2284       ExecutorService pool,
2285       Object[] results,
2286       Batch.Callback<R> callback)
2287       throws IOException, InterruptedException {
2288 
2289       AsyncRequestFuture ars = this.asyncProcess.submitAll(
2290           pool, tableName, list, callback, results);
2291       ars.waitUntilDone();
2292       if (ars.hasError()) {
2293         throw ars.getErrors();
2294       }
2295     }
2296 
2297     @Override
2298     @Deprecated
2299     public <R> void processBatchCallback(
2300       List<? extends Row> list,
2301       byte[] tableName,
2302       ExecutorService pool,
2303       Object[] results,
2304       Batch.Callback<R> callback)
2305       throws IOException, InterruptedException {
2306       processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2307     }
2308 
2309     // For tests to override.
2310     protected AsyncProcess createAsyncProcess(Configuration conf) {
2311       // No default pool available.
2312       return new AsyncProcess(this, conf, this.batchPool,
2313           RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
2314           RpcControllerFactory.instantiate(conf));
2315     }
2316 
2317     @Override
2318     public AsyncProcess getAsyncProcess() {
2319       return asyncProcess;
2320     }
2321 
2322     @Override
2323     public ServerStatisticTracker getStatisticsTracker() {
2324       return this.stats;
2325     }
2326 
2327     @Override
2328     public ClientBackoffPolicy getBackoffPolicy() {
2329       return this.backoffPolicy;
2330     }
2331 
2332     /*
2333      * Return the number of cached region for a table. It will only be called
2334      * from a unit test.
2335      */
2336     @VisibleForTesting
2337     int getNumberOfCachedRegionLocations(final TableName tableName) {
2338       return metaCache.getNumberOfCachedRegionLocations(tableName);
2339     }
2340 
2341     @Override
2342     @Deprecated
2343     public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
2344     }
2345 
2346     @Override
2347     @Deprecated
2348     public void setRegionCachePrefetch(final byte[] tableName,
2349         final boolean enable) {
2350     }
2351 
2352     @Override
2353     @Deprecated
2354     public boolean getRegionCachePrefetch(TableName tableName) {
2355       return false;
2356     }
2357 
2358     @Override
2359     @Deprecated
2360     public boolean getRegionCachePrefetch(byte[] tableName) {
2361       return false;
2362     }
2363 
2364     @Override
2365     public void abort(final String msg, Throwable t) {
2366       if (t instanceof KeeperException.SessionExpiredException
2367         && keepAliveZookeeper != null) {
2368         synchronized (masterAndZKLock) {
2369           if (keepAliveZookeeper != null) {
2370             LOG.warn("This client just lost it's session with ZooKeeper," +
2371               " closing it." +
2372               " It will be recreated next time someone needs it", t);
2373             closeZooKeeperWatcher();
2374           }
2375         }
2376       } else {
2377         if (t != null) {
2378           LOG.fatal(msg, t);
2379         } else {
2380           LOG.fatal(msg);
2381         }
2382         this.aborted = true;
2383         close();
2384         this.closed = true;
2385       }
2386     }
2387 
2388     @Override
2389     public boolean isClosed() {
2390       return this.closed;
2391     }
2392 
2393     @Override
2394     public boolean isAborted(){
2395       return this.aborted;
2396     }
2397 
2398     @Override
2399     public int getCurrentNrHRS() throws IOException {
2400       return this.registry.getCurrentNrHRS();
2401     }
2402 
2403     /**
2404      * Increment this client's reference count.
2405      */
2406     void incCount() {
2407       ++refCount;
2408     }
2409 
2410     /**
2411      * Decrement this client's reference count.
2412      */
2413     void decCount() {
2414       if (refCount > 0) {
2415         --refCount;
2416       }
2417     }
2418 
2419     /**
2420      * Return if this client has no reference
2421      *
2422      * @return true if this client has no reference; false otherwise
2423      */
2424     boolean isZeroReference() {
2425       return refCount == 0;
2426     }
2427 
2428     void internalClose() {
2429       if (this.closed) {
2430         return;
2431       }
2432       closeMaster();
2433       shutdownPools();
2434       if (this.metrics != null) {
2435         this.metrics.shutdown();
2436       }
2437       this.closed = true;
2438       closeZooKeeperWatcher();
2439       this.stubs.clear();
2440       if (clusterStatusListener != null) {
2441         clusterStatusListener.close();
2442       }
2443       if (rpcClient != null) {
2444         rpcClient.close();
2445       }
2446     }
2447 
2448     @Override
2449     public void close() {
2450       if (managed) {
2451         if (aborted) {
2452           ConnectionManager.deleteStaleConnection(this);
2453         } else {
2454           ConnectionManager.deleteConnection(this, false);
2455         }
2456       } else {
2457         internalClose();
2458       }
2459     }
2460 
2461     /**
2462      * Close the connection for good, regardless of what the current value of
2463      * {@link #refCount} is. Ideally, {@link #refCount} should be zero at this
2464      * point, which would be the case if all of its consumers close the
2465      * connection. However, on the off chance that someone is unable to close
2466      * the connection, perhaps because it bailed out prematurely, the method
2467      * below will ensure that this {@link HConnection} instance is cleaned up.
2468      * Caveat: The JVM may take an unknown amount of time to call finalize on an
2469      * unreachable object, so our hope is that every consumer cleans up after
2470      * itself, like any good citizen.
2471      */
2472     @Override
2473     protected void finalize() throws Throwable {
2474       super.finalize();
2475       // Pretend as if we are about to release the last remaining reference
2476       refCount = 1;
2477       close();
2478     }
2479 
2480     /**
2481      * @deprecated Use {@link Admin#listTables()} instead
2482      */
2483     @Deprecated
2484     @Override
2485     public HTableDescriptor[] listTables() throws IOException {
2486       MasterKeepAliveConnection master = getKeepAliveMasterService();
2487       try {
2488         GetTableDescriptorsRequest req =
2489           RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2490         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2491       } catch (ServiceException se) {
2492         throw ProtobufUtil.getRemoteException(se);
2493       } finally {
2494         master.close();
2495       }
2496     }
2497 
2498     /**
2499      * @deprecated Use {@link Admin#listTableNames()} instead
2500      */
2501     @Deprecated
2502     @Override
2503     public String[] getTableNames() throws IOException {
2504       TableName[] tableNames = listTableNames();
2505       String result[] = new String[tableNames.length];
2506       for (int i = 0; i < tableNames.length; i++) {
2507         result[i] = tableNames[i].getNameAsString();
2508       }
2509       return result;
2510     }
2511 
2512     /**
2513      * @deprecated Use {@link Admin#listTableNames()} instead
2514      */
2515     @Deprecated
2516     @Override
2517     public TableName[] listTableNames() throws IOException {
2518       MasterKeepAliveConnection master = getKeepAliveMasterService();
2519       try {
2520         return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2521             GetTableNamesRequest.newBuilder().build())
2522           .getTableNamesList());
2523       } catch (ServiceException se) {
2524         throw ProtobufUtil.getRemoteException(se);
2525       } finally {
2526         master.close();
2527       }
2528     }
2529 
2530     /**
2531      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2532      */
2533     @Deprecated
2534     @Override
2535     public HTableDescriptor[] getHTableDescriptorsByTableName(
2536         List<TableName> tableNames) throws IOException {
2537       if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2538       MasterKeepAliveConnection master = getKeepAliveMasterService();
2539       try {
2540         GetTableDescriptorsRequest req =
2541           RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2542         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2543       } catch (ServiceException se) {
2544         throw ProtobufUtil.getRemoteException(se);
2545       } finally {
2546         master.close();
2547       }
2548     }
2549 
2550     /**
2551      * @deprecated Use {@link Admin#getTableDescriptorsByTableName(List)} instead
2552      */
2553     @Deprecated
2554     @Override
2555     public HTableDescriptor[] getHTableDescriptors(
2556         List<String> names) throws IOException {
2557       List<TableName> tableNames = new ArrayList<TableName>(names.size());
2558       for(String name : names) {
2559         tableNames.add(TableName.valueOf(name));
2560       }
2561 
2562       return getHTableDescriptorsByTableName(tableNames);
2563     }
2564 
2565     @Override
2566     public NonceGenerator getNonceGenerator() {
2567       return this.nonceGenerator;
2568     }
2569 
2570     /**
2571      * Connects to the master to get the table descriptor.
2572      * @param tableName table name
2573      * @throws IOException if the connection to master fails or if the table
2574      *  is not found.
2575      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2576      */
2577     @Deprecated
2578     @Override
2579     public HTableDescriptor getHTableDescriptor(final TableName tableName)
2580     throws IOException {
2581       if (tableName == null) return null;
2582       MasterKeepAliveConnection master = getKeepAliveMasterService();
2583       GetTableDescriptorsResponse htds;
2584       try {
2585         GetTableDescriptorsRequest req =
2586           RequestConverter.buildGetTableDescriptorsRequest(tableName);
2587         htds = master.getTableDescriptors(null, req);
2588       } catch (ServiceException se) {
2589         throw ProtobufUtil.getRemoteException(se);
2590       } finally {
2591         master.close();
2592       }
2593       if (!htds.getTableSchemaList().isEmpty()) {
2594         return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2595       }
2596       throw new TableNotFoundException(tableName.getNameAsString());
2597     }
2598 
2599     /**
2600      * @deprecated Use {@link Admin#getTableDescriptor(TableName)} instead
2601      */
2602     @Deprecated
2603     @Override
2604     public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2605     throws IOException {
2606       return getHTableDescriptor(TableName.valueOf(tableName));
2607     }
2608 
2609     @Override
2610     public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2611       return RpcRetryingCallerFactory
2612           .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2613     }
2614 
2615     @Override
2616     public boolean isManaged() {
2617       return managed;
2618     }
2619 
2620     @Override
2621     public boolean hasCellBlockSupport() {
2622       return this.rpcClient.hasCellBlockSupport();
2623     }
2624     
2625     @Override
2626     public ConnectionConfiguration getConnectionConfiguration() {
2627       return this.connectionConfig;
2628     }
2629 
2630     @Override
2631     public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2632       return this.rpcCallerFactory;
2633     }
2634 
2635     @Override
2636     public RpcControllerFactory getRpcControllerFactory() {
2637       return this.rpcControllerFactory;
2638     }
2639   }
2640 
2641   /**
2642    * The record of errors for servers.
2643    */
2644   static class ServerErrorTracker {
2645     // We need a concurrent map here, as we could have multiple threads updating it in parallel.
2646     private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
2647         new ConcurrentHashMap<ServerName, ServerErrors>();
2648     private final long canRetryUntil;
2649     private final int maxRetries;
2650     private final long startTrackingTime;
2651 
2652     public ServerErrorTracker(long timeout, int maxRetries) {
2653       this.maxRetries = maxRetries;
2654       this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
2655       this.startTrackingTime = new Date().getTime();
2656     }
2657 
2658     /**
2659      * We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
2660      */
2661     boolean canRetryMore(int numRetry) {
2662       // If there is a single try we must not take into account the time.
2663       return numRetry < maxRetries || (maxRetries > 1 &&
2664           EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
2665     }
2666 
2667     /**
2668      * Calculates the back-off time for a retrying request to a particular server.
2669      *
2670      * @param server    The server in question.
2671      * @param basePause The default hci pause.
2672      * @return The time to wait before sending next request.
2673      */
2674     long calculateBackoffTime(ServerName server, long basePause) {
2675       long result;
2676       ServerErrors errorStats = errorsByServer.get(server);
2677       if (errorStats != null) {
2678         result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2679       } else {
2680         result = 0; // yes, if the server is not in our list we don't wait before retrying.
2681       }
2682       return result;
2683     }
2684 
2685     /**
2686      * Reports that there was an error on the server to do whatever bean-counting necessary.
2687      *
2688      * @param server The server in question.
2689      */
2690     void reportServerError(ServerName server) {
2691       ServerErrors errors = errorsByServer.get(server);
2692       if (errors != null) {
2693         errors.addError();
2694       } else {
2695         errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2696         if (errors != null){
2697           errors.addError();
2698         }
2699       }
2700     }
2701 
2702     long getStartTrackingTime() {
2703       return startTrackingTime;
2704     }
2705 
2706     /**
2707      * The record of errors for a server.
2708      */
2709     private static class ServerErrors {
2710       public final AtomicInteger retries = new AtomicInteger(0);
2711 
2712       public void addError() {
2713         retries.incrementAndGet();
2714       }
2715     }
2716   }
2717 }