View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.management.ManagementFactory;
25  import java.lang.management.MemoryUsage;
26  import java.lang.reflect.Constructor;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Map.Entry;
40  import java.util.Set;
41  import java.util.SortedMap;
42  import java.util.TreeMap;
43  import java.util.TreeSet;
44  import java.util.concurrent.ConcurrentHashMap;
45  import java.util.concurrent.ConcurrentMap;
46  import java.util.concurrent.ConcurrentSkipListMap;
47  import java.util.concurrent.atomic.AtomicBoolean;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.concurrent.locks.ReentrantReadWriteLock;
50  
51  import javax.management.MalformedObjectNameException;
52  import javax.management.ObjectName;
53  import javax.servlet.http.HttpServlet;
54  
55  import org.apache.commons.lang.SystemUtils;
56  import org.apache.commons.lang.math.RandomUtils;
57  import org.apache.commons.logging.Log;
58  import org.apache.commons.logging.LogFactory;
59  import org.apache.hadoop.conf.Configuration;
60  import org.apache.hadoop.fs.FileSystem;
61  import org.apache.hadoop.fs.Path;
62  import org.apache.hadoop.hbase.ChoreService;
63  import org.apache.hadoop.hbase.ClockOutOfSyncException;
64  import org.apache.hadoop.hbase.CoordinatedStateManager;
65  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
66  import org.apache.hadoop.hbase.HBaseConfiguration;
67  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
68  import org.apache.hadoop.hbase.HConstants;
69  import org.apache.hadoop.hbase.HRegionInfo;
70  import org.apache.hadoop.hbase.HealthCheckChore;
71  import org.apache.hadoop.hbase.MetaTableAccessor;
72  import org.apache.hadoop.hbase.NotServingRegionException;
73  import org.apache.hadoop.hbase.RemoteExceptionHandler;
74  import org.apache.hadoop.hbase.ScheduledChore;
75  import org.apache.hadoop.hbase.ServerName;
76  import org.apache.hadoop.hbase.Stoppable;
77  import org.apache.hadoop.hbase.TableDescriptors;
78  import org.apache.hadoop.hbase.TableName;
79  import org.apache.hadoop.hbase.YouAreDeadException;
80  import org.apache.hadoop.hbase.ZNodeClearer;
81  import org.apache.hadoop.hbase.classification.InterfaceAudience;
82  import org.apache.hadoop.hbase.client.ClusterConnection;
83  import org.apache.hadoop.hbase.client.ConnectionUtils;
84  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
85  import org.apache.hadoop.hbase.conf.ConfigurationManager;
86  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
87  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
88  import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
89  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
90  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
91  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
92  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
93  import org.apache.hadoop.hbase.executor.ExecutorService;
94  import org.apache.hadoop.hbase.executor.ExecutorType;
95  import org.apache.hadoop.hbase.fs.HFileSystem;
96  import org.apache.hadoop.hbase.http.InfoServer;
97  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
98  import org.apache.hadoop.hbase.ipc.RpcClient;
99  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
100 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
101 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
102 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
103 import org.apache.hadoop.hbase.ipc.ServerRpcController;
104 import org.apache.hadoop.hbase.master.HMaster;
105 import org.apache.hadoop.hbase.master.RegionState.State;
106 import org.apache.hadoop.hbase.master.TableLockManager;
107 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
108 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
109 import org.apache.hadoop.hbase.protobuf.RequestConverter;
110 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
111 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
112 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
113 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
116 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
117 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor.Builder;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
122 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
123 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
124 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
130 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
131 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
132 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
135 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
136 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
137 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
138 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
139 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
140 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
141 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
142 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
143 import org.apache.hadoop.hbase.security.Superusers;
144 import org.apache.hadoop.hbase.security.UserProvider;
145 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
146 import org.apache.hadoop.hbase.util.Addressing;
147 import org.apache.hadoop.hbase.util.ByteStringer;
148 import org.apache.hadoop.hbase.util.Bytes;
149 import org.apache.hadoop.hbase.util.CompressionTest;
150 import org.apache.hadoop.hbase.util.ConfigUtil;
151 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
152 import org.apache.hadoop.hbase.util.FSTableDescriptors;
153 import org.apache.hadoop.hbase.util.FSUtils;
154 import org.apache.hadoop.hbase.util.HasThread;
155 import org.apache.hadoop.hbase.util.JSONBean;
156 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
157 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
158 import org.apache.hadoop.hbase.util.Sleeper;
159 import org.apache.hadoop.hbase.util.Threads;
160 import org.apache.hadoop.hbase.util.VersionInfo;
161 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
162 import org.apache.hadoop.hbase.wal.WAL;
163 import org.apache.hadoop.hbase.wal.WALFactory;
164 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
165 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
166 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
167 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
168 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
169 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
170 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
171 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
172 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
173 import org.apache.hadoop.ipc.RemoteException;
174 import org.apache.hadoop.metrics.util.MBeanUtil;
175 import org.apache.hadoop.util.ReflectionUtils;
176 import org.apache.hadoop.util.StringUtils;
177 import org.apache.zookeeper.KeeperException;
178 import org.apache.zookeeper.KeeperException.NoNodeException;
179 import org.apache.zookeeper.data.Stat;
180 
181 import com.google.common.annotations.VisibleForTesting;
182 import com.google.common.base.Preconditions;
183 import com.google.common.collect.Maps;
184 import com.google.protobuf.BlockingRpcChannel;
185 import com.google.protobuf.Descriptors;
186 import com.google.protobuf.Message;
187 import com.google.protobuf.RpcCallback;
188 import com.google.protobuf.RpcController;
189 import com.google.protobuf.Service;
190 import com.google.protobuf.ServiceException;
191 import sun.misc.Signal;
192 import sun.misc.SignalHandler;
193 
194 /**
195  * HRegionServer makes a set of HRegions available to clients. It checks in with
196  * the HMaster. There are many HRegionServers in a single HBase deployment.
197  */
198 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
199 @SuppressWarnings("deprecation")
200 public class HRegionServer extends HasThread implements
201     RegionServerServices, LastSequenceId {
202 
203   private static final Log LOG = LogFactory.getLog(HRegionServer.class);
204 
205   /*
206    * Strings to be used in forming the exception message for
207    * RegionsAlreadyInTransitionException.
208    */
209   protected static final String OPEN = "OPEN";
210   protected static final String CLOSE = "CLOSE";
211 
212   //RegionName vs current action in progress
213   //true - if open region action in progress
214   //false - if close region action in progress
215   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
216     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
217 
218   // Cache flushing
219   protected MemStoreFlusher cacheFlusher;
220 
221   protected HeapMemoryManager hMemManager;
222 
223   /**
224    * Cluster connection to be shared by services.
225    * Initialized at server startup and closed when server shuts down.
226    * Clients must never close it explicitly.
227    */
228   protected ClusterConnection clusterConnection;
229 
230   /*
231    * Long-living meta table locator, which is created when the server is started and stopped
232    * when server shuts down. References to this locator shall be used to perform according
233    * operations in EventHandlers. Primary reason for this decision is to make it mockable
234    * for tests.
235    */
236   protected MetaTableLocator metaTableLocator;
237 
238   // Watch if a region is out of recovering state from ZooKeeper
239   @SuppressWarnings("unused")
240   private RecoveringRegionWatcher recoveringRegionWatcher;
241 
242   /**
243    * Go here to get table descriptors.
244    */
245   protected TableDescriptors tableDescriptors;
246 
247   // Replication services. If no replication, this handler will be null.
248   protected ReplicationSourceService replicationSourceHandler;
249   protected ReplicationSinkService replicationSinkHandler;
250 
251   // Compactions
252   public CompactSplitThread compactSplitThread;
253 
254   /**
255    * Map of regions currently being served by this region server. Key is the
256    * encoded region name.  All access should be synchronized.
257    */
258   protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
259 
260   /**
261    * Map of encoded region names to the DataNode locations they should be hosted on
262    * We store the value as InetSocketAddress since this is used only in HDFS
263    * API (create() that takes favored nodes as hints for placing file blocks).
264    * We could have used ServerName here as the value class, but we'd need to
265    * convert it to InetSocketAddress at some point before the HDFS API call, and
266    * it seems a bit weird to store ServerName since ServerName refers to RegionServers
267    * and here we really mean DataNode locations.
268    */
269   protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
270       new ConcurrentHashMap<String, InetSocketAddress[]>();
271 
272   /**
273    * Set of regions currently being in recovering state which means it can accept writes(edits from
274    * previous failed region server) but not reads. A recovering region is also an online region.
275    */
276   protected final Map<String, Region> recoveringRegions = Collections
277       .synchronizedMap(new HashMap<String, Region>());
278 
279   // Leases
280   protected Leases leases;
281 
282   // Instance of the hbase executor service.
283   protected ExecutorService service;
284 
285   // If false, the file system has become unavailable
286   protected volatile boolean fsOk;
287   protected HFileSystem fs;
288 
289   // Set when a report to the master comes back with a message asking us to
290   // shutdown. Also set by call to stop when debugging or running unit tests
291   // of HRegionServer in isolation.
292   private volatile boolean stopped = false;
293 
294   // Go down hard. Used if file system becomes unavailable and also in
295   // debugging and unit tests.
296   private volatile boolean abortRequested;
297 
298   ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
299 
300   // A state before we go into stopped state.  At this stage we're closing user
301   // space regions.
302   private boolean stopping = false;
303 
304   private volatile boolean killed = false;
305 
306   protected final Configuration conf;
307 
308   private Path rootDir;
309 
310   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
311 
312   final int numRetries;
313   protected final int threadWakeFrequency;
314   protected final int msgInterval;
315 
316   protected final int numRegionsToReport;
317 
318   // Stub to do region server status calls against the master.
319   private volatile RegionServerStatusService.BlockingInterface rssStub;
320   // RPC client. Used to make the stub above that does region server status checking.
321   RpcClient rpcClient;
322 
323   private RpcRetryingCallerFactory rpcRetryingCallerFactory;
324   private RpcControllerFactory rpcControllerFactory;
325 
326   private UncaughtExceptionHandler uncaughtExceptionHandler;
327 
328   // Info server. Default access so can be used by unit tests. REGIONSERVER
329   // is name of the webapp and the attribute name used stuffing this instance
330   // into web context.
331   protected InfoServer infoServer;
332   private JvmPauseMonitor pauseMonitor;
333 
334   /** region server process name */
335   public static final String REGIONSERVER = "regionserver";
336 
337   MetricsRegionServer metricsRegionServer;
338   private SpanReceiverHost spanReceiverHost;
339 
340   /**
341    * ChoreService used to schedule tasks that we want to run periodically
342    */
343   private final ChoreService choreService;
344 
345   /*
346    * Check for compactions requests.
347    */
348   ScheduledChore compactionChecker;
349 
350   /*
351    * Check for flushes
352    */
353   ScheduledChore periodicFlusher;
354 
355   protected volatile WALFactory walFactory;
356 
357   // WAL roller. log is protected rather than private to avoid
358   // eclipse warning when accessed by inner classes
359   final LogRoller walRoller;
360   // Lazily initialized if this RegionServer hosts a meta table.
361   final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
362 
363   // flag set after we're done setting up server threads
364   final AtomicBoolean online = new AtomicBoolean(false);
365 
366   // zookeeper connection and watcher
367   protected ZooKeeperWatcher zooKeeper;
368 
369   // master address tracker
370   private MasterAddressTracker masterAddressTracker;
371 
372   // Cluster Status Tracker
373   protected ClusterStatusTracker clusterStatusTracker;
374 
375   // Log Splitting Worker
376   private SplitLogWorker splitLogWorker;
377 
378   // A sleeper that sleeps for msgInterval.
379   protected final Sleeper sleeper;
380 
381   private final int operationTimeout;
382   private final int shortOperationTimeout;
383 
384   private final RegionServerAccounting regionServerAccounting;
385 
386   // Cache configuration and block cache reference
387   protected CacheConfig cacheConfig;
388 
389   /** The health check chore. */
390   private HealthCheckChore healthCheckChore;
391 
392   /** The nonce manager chore. */
393   private ScheduledChore nonceManagerChore;
394 
395   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
396 
397   /**
398    * The server name the Master sees us as.  Its made from the hostname the
399    * master passes us, port, and server startcode. Gets set after registration
400    * against  Master.
401    */
402   protected ServerName serverName;
403 
404   /*
405    * hostname specified by hostname config
406    */
407   private String useThisHostnameInstead;
408 
409   // key to the config parameter of server hostname
410   // the specification of server hostname is optional. The hostname should be resolvable from
411   // both master and region server
412   final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
413 
414   final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
415 
416   /**
417    * This servers startcode.
418    */
419   protected final long startcode;
420 
421   /**
422    * Unique identifier for the cluster we are a part of.
423    */
424   private String clusterId;
425 
426   /**
427    * MX Bean for RegionServerInfo
428    */
429   private ObjectName mxBean = null;
430 
431   /**
432    * Chore to clean periodically the moved region list
433    */
434   private MovedRegionsCleaner movedRegionsCleaner;
435 
436   // chore for refreshing store files for secondary regions
437   private StorefileRefresherChore storefileRefresher;
438 
439   private RegionServerCoprocessorHost rsHost;
440 
441   private RegionServerProcedureManagerHost rspmHost;
442   
443   private RegionServerQuotaManager rsQuotaManager;
444 
445   // Table level lock manager for locking for region operations
446   protected TableLockManager tableLockManager;
447 
448   /**
449    * Nonce manager. Nonces are used to make operations like increment and append idempotent
450    * in the case where client doesn't receive the response from a successful operation and
451    * retries. We track the successful ops for some time via a nonce sent by client and handle
452    * duplicate operations (currently, by failing them; in future we might use MVCC to return
453    * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
454    * HBASE-3787) are:
455    * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
456    *   of past records. If we don't read the records, we don't read and recover the nonces.
457    *   Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
458    * - There's no WAL recovery during normal region move, so nonces will not be transfered.
459    * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
460    * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
461    * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
462    * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
463    * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
464    * latest nonce in it expired. It can also be recovered during move.
465    */
466   final ServerNonceManager nonceManager;
467 
468   private UserProvider userProvider;
469 
470   protected final RSRpcServices rpcServices;
471 
472   protected BaseCoordinatedStateManager csm;
473 
474   private final boolean useZKForAssignment;
475 
476   /**
477    * Configuration manager is used to register/deregister and notify the configuration observers
478    * when the regionserver is notified that there was a change in the on disk configs.
479    */
480   protected final ConfigurationManager configurationManager;
481 
482   /**
483    * Starts a HRegionServer at the default location.
484    * @param conf
485    * @throws IOException
486    * @throws InterruptedException
487    */
488   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
489     this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
490   }
491 
492   /**
493    * Starts a HRegionServer at the default location
494    * @param conf
495    * @param csm implementation of CoordinatedStateManager to be used
496    * @throws IOException
497    * @throws InterruptedException
498    */
499   public HRegionServer(Configuration conf, CoordinatedStateManager csm)
500       throws IOException, InterruptedException {
501     this.fsOk = true;
502     this.conf = conf;
503     checkCodecs(this.conf);
504     this.userProvider = UserProvider.instantiate(conf);
505     Superusers.initialize(conf);
506     FSUtils.setupShortCircuitRead(this.conf);
507     // Disable usage of meta replicas in the regionserver
508     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
509 
510     // Config'ed params
511     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
512         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
513     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
514     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
515 
516     this.sleeper = new Sleeper(this.msgInterval, this);
517 
518     boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
519     this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
520 
521     this.numRegionsToReport = conf.getInt(
522       "hbase.regionserver.numregionstoreport", 10);
523 
524     this.operationTimeout = conf.getInt(
525       HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
526       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
527 
528     this.shortOperationTimeout = conf.getInt(
529       HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
530       HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
531 
532     this.abortRequested = false;
533     this.stopped = false;
534 
535     rpcServices = createRpcServices();
536     this.startcode = System.currentTimeMillis();
537     if (this instanceof HMaster) {
538       useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
539     } else {
540       useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
541     }
542     String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
543       rpcServices.isa.getHostName();
544     serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
545 
546     rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
547     rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
548 
549     // login the zookeeper client principal (if using security)
550     ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
551       HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
552     // login the server principal (if using secure Hadoop)
553     login(userProvider, hostName);
554 
555     regionServerAccounting = new RegionServerAccounting();
556     uncaughtExceptionHandler = new UncaughtExceptionHandler() {
557       @Override
558       public void uncaughtException(Thread t, Throwable e) {
559         abort("Uncaught exception in service thread " + t.getName(), e);
560       }
561     };
562 
563     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
564 
565     // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
566     // underlying hadoop hdfs accessors will be going against wrong filesystem
567     // (unless all is set to defaults).
568     FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
569     // Get fs instance used by this RS.  Do we use checksum verification in the hbase? If hbase
570     // checksum verification enabled, then automatically switch off hdfs checksum verification.
571     boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
572     this.fs = new HFileSystem(this.conf, useHBaseChecksum);
573     this.rootDir = FSUtils.getRootDir(this.conf);
574     this.tableDescriptors = new FSTableDescriptors(
575       this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
576 
577     service = new ExecutorService(getServerName().toShortString());
578     spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
579 
580     // Some unit tests don't need a cluster, so no zookeeper at all
581     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
582       // Open connection to zookeeper and set primary watcher
583       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
584         rpcServices.isa.getPort(), this, canCreateBaseZNode());
585 
586       this.csm = (BaseCoordinatedStateManager) csm;
587       this.csm.initialize(this);
588       this.csm.start();
589 
590       tableLockManager = TableLockManager.createTableLockManager(
591         conf, zooKeeper, serverName);
592 
593       masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
594       masterAddressTracker.start();
595 
596       clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
597       clusterStatusTracker.start();
598     }
599     this.configurationManager = new ConfigurationManager();
600 
601     rpcServices.start();
602     putUpWebUI();
603     this.walRoller = new LogRoller(this, this);
604     this.choreService = new ChoreService(getServerName().toString(), true);
605 
606     if (!SystemUtils.IS_OS_WINDOWS) {
607       Signal.handle(new Signal("HUP"), new SignalHandler() {
608         public void handle(Signal signal) {
609           getConfiguration().reloadConfiguration();
610           configurationManager.notifyAllObservers(getConfiguration());
611         }
612       });
613     }
614   }
615 
616   /*
617    * Returns true if configured hostname should be used
618    */
619   protected boolean shouldUseThisHostnameInstead() {
620     return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
621   }
622 
623   protected void login(UserProvider user, String host) throws IOException {
624     user.login("hbase.regionserver.keytab.file",
625       "hbase.regionserver.kerberos.principal", host);
626   }
627 
628   protected void waitForMasterActive(){
629   }
630 
631   protected String getProcessName() {
632     return REGIONSERVER;
633   }
634 
635   protected boolean canCreateBaseZNode() {
636     return false;
637   }
638 
639   protected boolean canUpdateTableDescriptor() {
640     return false;
641   }
642 
643   protected RSRpcServices createRpcServices() throws IOException {
644     return new RSRpcServices(this);
645   }
646 
647   protected void configureInfoServer() {
648     infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
649     infoServer.setAttribute(REGIONSERVER, this);
650   }
651 
652   protected Class<? extends HttpServlet> getDumpServlet() {
653     return RSDumpServlet.class;
654   }
655 
656   protected void doMetrics() {
657   }
658 
659   @Override
660   public boolean registerService(Service instance) {
661     /*
662      * No stacking of instances is allowed for a single service name
663      */
664     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
665     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
666       LOG.error("Coprocessor service " + serviceDesc.getFullName()
667           + " already registered, rejecting request from " + instance);
668       return false;
669     }
670 
671     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
672     if (LOG.isDebugEnabled()) {
673       LOG.debug("Registered regionserver coprocessor service: service="+serviceDesc.getFullName());
674     }
675     return true;
676   }
677 
678   /**
679    * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to
680    * the local server. Safe to use going to local or remote server.
681    * Create this instance in a method can be intercepted and mocked in tests.
682    * @throws IOException
683    */
684   @VisibleForTesting
685   protected ClusterConnection createClusterConnection() throws IOException {
686     // Create a cluster connection that when appropriate, can short-circuit and go directly to the
687     // local server if the request is to the local server bypassing RPC. Can be used for both local
688     // and remote invocations.
689     return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
690       serverName, rpcServices, rpcServices);
691   }
692 
693   /**
694    * Run test on configured codecs to make sure supporting libs are in place.
695    * @param c
696    * @throws IOException
697    */
698   private static void checkCodecs(final Configuration c) throws IOException {
699     // check to see if the codec list is available:
700     String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
701     if (codecs == null) return;
702     for (String codec : codecs) {
703       if (!CompressionTest.testCompression(codec)) {
704         throw new IOException("Compression codec " + codec +
705           " not supported, aborting RS construction");
706       }
707     }
708   }
709 
710   public String getClusterId() {
711     return this.clusterId;
712   }
713 
714   /**
715    * Setup our cluster connection if not already initialized.
716    * @throws IOException
717    */
718   protected synchronized void setupClusterConnection() throws IOException {
719     if (clusterConnection == null) {
720       clusterConnection = createClusterConnection();
721       metaTableLocator = new MetaTableLocator();
722     }
723   }
724 
725   /**
726    * All initialization needed before we go register with Master.
727    *
728    * @throws IOException
729    * @throws InterruptedException
730    */
731   private void preRegistrationInitialization(){
732     try {
733       setupClusterConnection();
734 
735       // Health checker thread.
736       if (isHealthCheckerConfigured()) {
737         int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
738           HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
739         healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
740       }
741       this.pauseMonitor = new JvmPauseMonitor(conf);
742       pauseMonitor.start();
743 
744       initializeZooKeeper();
745       if (!isStopped() && !isAborted()) {
746         initializeThreads();
747       }
748     } catch (Throwable t) {
749       // Call stop if error or process will stick around for ever since server
750       // puts up non-daemon threads.
751       this.rpcServices.stop();
752       abort("Initialization of RS failed.  Hence aborting RS.", t);
753     }
754   }
755 
756   /**
757    * Bring up connection to zk ensemble and then wait until a master for this
758    * cluster and then after that, wait until cluster 'up' flag has been set.
759    * This is the order in which master does things.
760    * Finally open long-living server short-circuit connection.
761    * @throws IOException
762    * @throws InterruptedException
763    */
764   private void initializeZooKeeper() throws IOException, InterruptedException {
765     // Create the master address tracker, register with zk, and start it.  Then
766     // block until a master is available.  No point in starting up if no master
767     // running.
768     blockAndCheckIfStopped(this.masterAddressTracker);
769 
770     // Wait on cluster being up.  Master will set this flag up in zookeeper
771     // when ready.
772     blockAndCheckIfStopped(this.clusterStatusTracker);
773 
774     // Retrieve clusterId
775     // Since cluster status is now up
776     // ID should have already been set by HMaster
777     try {
778       clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
779       if (clusterId == null) {
780         this.abort("Cluster ID has not been set");
781       }
782       LOG.info("ClusterId : "+clusterId);
783     } catch (KeeperException e) {
784       this.abort("Failed to retrieve Cluster ID",e);
785     }
786 
787     // In case colocated master, wait here till it's active.
788     // So backup masters won't start as regionservers.
789     // This is to avoid showing backup masters as regionservers
790     // in master web UI, or assigning any region to them.
791     waitForMasterActive();
792     if (isStopped() || isAborted()) {
793       return; // No need for further initialization
794     }
795 
796     // watch for snapshots and other procedures
797     try {
798       rspmHost = new RegionServerProcedureManagerHost();
799       rspmHost.loadProcedures(conf);
800       rspmHost.initialize(this);
801     } catch (KeeperException e) {
802       this.abort("Failed to reach zk cluster when creating procedure handler.", e);
803     }
804     // register watcher for recovering regions
805     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
806   }
807 
808   /**
809    * Utilty method to wait indefinitely on a znode availability while checking
810    * if the region server is shut down
811    * @param tracker znode tracker to use
812    * @throws IOException any IO exception, plus if the RS is stopped
813    * @throws InterruptedException
814    */
815   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
816       throws IOException, InterruptedException {
817     while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
818       if (this.stopped) {
819         throw new IOException("Received the shutdown message while waiting.");
820       }
821     }
822   }
823 
824   /**
825    * @return False if cluster shutdown in progress
826    */
827   private boolean isClusterUp() {
828     return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
829   }
830 
831   private void initializeThreads() throws IOException {
832     // Cache flushing thread.
833     this.cacheFlusher = new MemStoreFlusher(conf, this);
834 
835     // Compaction thread
836     this.compactSplitThread = new CompactSplitThread(this);
837 
838     // Background thread to check for compactions; needed if region has not gotten updates
839     // in a while. It will take care of not checking too frequently on store-by-store basis.
840     this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
841     this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
842     this.leases = new Leases(this.threadWakeFrequency);
843 
844     // Create the thread to clean the moved regions list
845     movedRegionsCleaner = MovedRegionsCleaner.create(this);
846 
847     if (this.nonceManager != null) {
848       // Create the scheduled chore that cleans up nonces.
849       nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
850     }
851 
852     // Setup the Quota Manager
853     rsQuotaManager = new RegionServerQuotaManager(this);
854     
855     // Setup RPC client for master communication
856     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
857         rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
858 
859     boolean onlyMetaRefresh = false;
860     int storefileRefreshPeriod = conf.getInt(
861         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
862       , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
863     if (storefileRefreshPeriod == 0) {
864       storefileRefreshPeriod = conf.getInt(
865           StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
866           StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
867       onlyMetaRefresh = true;
868     }
869     if (storefileRefreshPeriod > 0) {
870       this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
871           onlyMetaRefresh, this, this);
872     }
873     registerConfigurationObservers();
874   }
875 
876   private void registerConfigurationObservers() {
877     // Registering the compactSplitThread object with the ConfigurationManager.
878     configurationManager.registerObserver(this.compactSplitThread);
879     configurationManager.registerObserver(this.rpcServices);
880   }
881 
882   /**
883    * The HRegionServer sticks in this loop until closed.
884    */
885   @Override
886   public void run() {
887     try {
888       // Do pre-registration initializations; zookeeper, lease threads, etc.
889       preRegistrationInitialization();
890     } catch (Throwable e) {
891       abort("Fatal exception during initialization", e);
892     }
893 
894     try {
895       if (!isStopped() && !isAborted()) {
896         ShutdownHook.install(conf, fs, this, Thread.currentThread());
897         // Set our ephemeral znode up in zookeeper now we have a name.
898         createMyEphemeralNode();
899         // Initialize the RegionServerCoprocessorHost now that our ephemeral
900         // node was created, in case any coprocessors want to use ZooKeeper
901         this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
902       }
903 
904       // Try and register with the Master; tell it we are here.  Break if
905       // server is stopped or the clusterup flag is down or hdfs went wacky.
906       while (keepLooping()) {
907         RegionServerStartupResponse w = reportForDuty();
908         if (w == null) {
909           LOG.warn("reportForDuty failed; sleeping and then retrying.");
910           this.sleeper.sleep();
911         } else {
912           handleReportForDutyResponse(w);
913           break;
914         }
915       }
916 
917       if (!isStopped() && isHealthy()){
918         // start the snapshot handler and other procedure handlers,
919         // since the server is ready to run
920         rspmHost.start();
921       }
922       
923       // Start the Quota Manager
924       if (this.rsQuotaManager != null) {
925         rsQuotaManager.start(getRpcServer().getScheduler());
926       }
927 
928       // We registered with the Master.  Go into run mode.
929       long lastMsg = System.currentTimeMillis();
930       long oldRequestCount = -1;
931       // The main run loop.
932       while (!isStopped() && isHealthy()) {
933         if (!isClusterUp()) {
934           if (isOnlineRegionsEmpty()) {
935             stop("Exiting; cluster shutdown set and not carrying any regions");
936           } else if (!this.stopping) {
937             this.stopping = true;
938             LOG.info("Closing user regions");
939             closeUserRegions(this.abortRequested);
940           } else if (this.stopping) {
941             boolean allUserRegionsOffline = areAllUserRegionsOffline();
942             if (allUserRegionsOffline) {
943               // Set stopped if no more write requests tp meta tables
944               // since last time we went around the loop.  Any open
945               // meta regions will be closed on our way out.
946               if (oldRequestCount == getWriteRequestCount()) {
947                 stop("Stopped; only catalog regions remaining online");
948                 break;
949               }
950               oldRequestCount = getWriteRequestCount();
951             } else {
952               // Make sure all regions have been closed -- some regions may
953               // have not got it because we were splitting at the time of
954               // the call to closeUserRegions.
955               closeUserRegions(this.abortRequested);
956             }
957             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
958           }
959         }
960         long now = System.currentTimeMillis();
961         if ((now - lastMsg) >= msgInterval) {
962           tryRegionServerReport(lastMsg, now);
963           lastMsg = System.currentTimeMillis();
964           doMetrics();
965         }
966         if (!isStopped() && !isAborted()) {
967           this.sleeper.sleep();
968         }
969       } // for
970     } catch (Throwable t) {
971       if (!rpcServices.checkOOME(t)) {
972         String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
973         abort(prefix + t.getMessage(), t);
974       }
975     }
976     // Run shutdown.
977     if (mxBean != null) {
978       MBeanUtil.unregisterMBean(mxBean);
979       mxBean = null;
980     }
981     if (this.leases != null) this.leases.closeAfterLeasesExpire();
982     if (this.splitLogWorker != null) {
983       splitLogWorker.stop();
984     }
985     if (this.infoServer != null) {
986       LOG.info("Stopping infoServer");
987       try {
988         this.infoServer.stop();
989       } catch (Exception e) {
990         LOG.error("Failed to stop infoServer", e);
991       }
992     }
993     // Send cache a shutdown.
994     if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
995       cacheConfig.getBlockCache().shutdown();
996     }
997 
998     if (movedRegionsCleaner != null) {
999       movedRegionsCleaner.stop("Region Server stopping");
1000     }
1001 
1002     // Send interrupts to wake up threads if sleeping so they notice shutdown.
1003     // TODO: Should we check they are alive? If OOME could have exited already
1004     if (this.hMemManager != null) this.hMemManager.stop();
1005     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1006     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1007     if (this.compactionChecker != null) this.compactionChecker.cancel(true);
1008     if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
1009     if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
1010     if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
1011     sendShutdownInterrupt();
1012 
1013     // Stop the quota manager
1014     if (rsQuotaManager != null) {
1015       rsQuotaManager.stop();
1016     }
1017     
1018     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1019     if (rspmHost != null) {
1020       rspmHost.stop(this.abortRequested || this.killed);
1021     }
1022 
1023     if (this.killed) {
1024       // Just skip out w/o closing regions.  Used when testing.
1025     } else if (abortRequested) {
1026       if (this.fsOk) {
1027         closeUserRegions(abortRequested); // Don't leave any open file handles
1028       }
1029       LOG.info("aborting server " + this.serverName);
1030     } else {
1031       closeUserRegions(abortRequested);
1032       LOG.info("stopping server " + this.serverName);
1033     }
1034 
1035     // so callers waiting for meta without timeout can stop
1036     if (this.metaTableLocator != null) this.metaTableLocator.stop();
1037     if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1038       try {
1039         this.clusterConnection.close();
1040       } catch (IOException e) {
1041         // Although the {@link Closeable} interface throws an {@link
1042         // IOException}, in reality, the implementation would never do that.
1043         LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1044       }
1045     }
1046 
1047     // Closing the compactSplit thread before closing meta regions
1048     if (!this.killed && containsMetaTableRegions()) {
1049       if (!abortRequested || this.fsOk) {
1050         if (this.compactSplitThread != null) {
1051           this.compactSplitThread.join();
1052           this.compactSplitThread = null;
1053         }
1054         closeMetaTableRegions(abortRequested);
1055       }
1056     }
1057 
1058     if (!this.killed && this.fsOk) {
1059       waitOnAllRegionsToClose(abortRequested);
1060       LOG.info("stopping server " + this.serverName +
1061         "; all regions closed.");
1062     }
1063 
1064     //fsOk flag may be changed when closing regions throws exception.
1065     if (this.fsOk) {
1066       shutdownWAL(!abortRequested);
1067     }
1068 
1069     // Make sure the proxy is down.
1070     if (this.rssStub != null) {
1071       this.rssStub = null;
1072     }
1073     if (this.rpcClient != null) {
1074       this.rpcClient.close();
1075     }
1076     if (this.leases != null) {
1077       this.leases.close();
1078     }
1079     if (this.pauseMonitor != null) {
1080       this.pauseMonitor.stop();
1081     }
1082 
1083     if (!killed) {
1084       stopServiceThreads();
1085     }
1086 
1087     if (this.rpcServices != null) {
1088       this.rpcServices.stop();
1089     }
1090 
1091     try {
1092       deleteMyEphemeralNode();
1093     } catch (KeeperException.NoNodeException nn) {
1094     } catch (KeeperException e) {
1095       LOG.warn("Failed deleting my ephemeral node", e);
1096     }
1097     // We may have failed to delete the znode at the previous step, but
1098     //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1099     ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1100 
1101     if (this.zooKeeper != null) {
1102       this.zooKeeper.close();
1103     }
1104     LOG.info("stopping server " + this.serverName +
1105       "; zookeeper connection closed.");
1106 
1107     LOG.info(Thread.currentThread().getName() + " exiting");
1108   }
1109 
1110   private boolean containsMetaTableRegions() {
1111     return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1112   }
1113 
1114   private boolean areAllUserRegionsOffline() {
1115     if (getNumberOfOnlineRegions() > 2) return false;
1116     boolean allUserRegionsOffline = true;
1117     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1118       if (!e.getValue().getRegionInfo().isMetaTable()) {
1119         allUserRegionsOffline = false;
1120         break;
1121       }
1122     }
1123     return allUserRegionsOffline;
1124   }
1125 
1126   /**
1127    * @return Current write count for all online regions.
1128    */
1129   private long getWriteRequestCount() {
1130     long writeCount = 0;
1131     for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1132       writeCount += e.getValue().getWriteRequestsCount();
1133     }
1134     return writeCount;
1135   }
1136 
1137   @VisibleForTesting
1138   protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1139   throws IOException {
1140     RegionServerStatusService.BlockingInterface rss = rssStub;
1141     if (rss == null) {
1142       // the current server could be stopping.
1143       return;
1144     }
1145     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1146     try {
1147       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1148       ServerName sn = ServerName.parseVersionedServerName(
1149         this.serverName.getVersionedBytes());
1150       request.setServer(ProtobufUtil.toServerName(sn));
1151       request.setLoad(sl);
1152       rss.regionServerReport(null, request.build());
1153     } catch (ServiceException se) {
1154       IOException ioe = ProtobufUtil.getRemoteException(se);
1155       if (ioe instanceof YouAreDeadException) {
1156         // This will be caught and handled as a fatal error in run()
1157         throw ioe;
1158       }
1159       if (rssStub == rss) {
1160         rssStub = null;
1161       }
1162       // Couldn't connect to the master, get location from zk and reconnect
1163       // Method blocks until new master is found or we are stopped
1164       createRegionServerStatusStub();
1165     }
1166   }
1167 
1168   ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1169       throws IOException {
1170     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1171     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
1172     // the wrapper to compute those numbers in one place.
1173     // In the long term most of these should be moved off of ServerLoad and the heart beat.
1174     // Instead they should be stored in an HBase table so that external visibility into HBase is
1175     // improved; Additionally the load balancer will be able to take advantage of a more complete
1176     // history.
1177     MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1178     Collection<Region> regions = getOnlineRegionsLocalContext();
1179     MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1180 
1181     ClusterStatusProtos.ServerLoad.Builder serverLoad =
1182       ClusterStatusProtos.ServerLoad.newBuilder();
1183     serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1184     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1185     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1186     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1187     Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1188     Builder coprocessorBuilder = Coprocessor.newBuilder();
1189     for (String coprocessor : coprocessors) {
1190       serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1191     }
1192     RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1193     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1194     for (Region region : regions) {
1195       if (region.getCoprocessorHost() != null) {
1196         Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1197         Iterator<String> iterator = regionCoprocessors.iterator();
1198         while (iterator.hasNext()) {
1199           serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1200         }
1201       }
1202       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1203       for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1204           .getCoprocessors()) {
1205         serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1206       }
1207     }
1208     serverLoad.setReportStartTime(reportStartTime);
1209     serverLoad.setReportEndTime(reportEndTime);
1210     if (this.infoServer != null) {
1211       serverLoad.setInfoServerPort(this.infoServer.getPort());
1212     } else {
1213       serverLoad.setInfoServerPort(-1);
1214     }
1215 
1216     // for the replicationLoad purpose. Only need to get from one service
1217     // either source or sink will get the same info
1218     ReplicationSourceService rsources = getReplicationSourceService();
1219 
1220     if (rsources != null) {
1221       // always refresh first to get the latest value
1222       ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1223       if (rLoad != null) {
1224         serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1225         for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1226           serverLoad.addReplLoadSource(rLS);
1227         }
1228       }
1229     }
1230 
1231     return serverLoad.build();
1232   }
1233 
1234   String getOnlineRegionsAsPrintableString() {
1235     StringBuilder sb = new StringBuilder();
1236     for (Region r: this.onlineRegions.values()) {
1237       if (sb.length() > 0) sb.append(", ");
1238       sb.append(r.getRegionInfo().getEncodedName());
1239     }
1240     return sb.toString();
1241   }
1242 
1243   /**
1244    * Wait on regions close.
1245    */
1246   private void waitOnAllRegionsToClose(final boolean abort) {
1247     // Wait till all regions are closed before going out.
1248     int lastCount = -1;
1249     long previousLogTime = 0;
1250     Set<String> closedRegions = new HashSet<String>();
1251     boolean interrupted = false;
1252     try {
1253       while (!isOnlineRegionsEmpty()) {
1254         int count = getNumberOfOnlineRegions();
1255         // Only print a message if the count of regions has changed.
1256         if (count != lastCount) {
1257           // Log every second at most
1258           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1259             previousLogTime = System.currentTimeMillis();
1260             lastCount = count;
1261             LOG.info("Waiting on " + count + " regions to close");
1262             // Only print out regions still closing if a small number else will
1263             // swamp the log.
1264             if (count < 10 && LOG.isDebugEnabled()) {
1265               LOG.debug(this.onlineRegions);
1266             }
1267           }
1268         }
1269         // Ensure all user regions have been sent a close. Use this to
1270         // protect against the case where an open comes in after we start the
1271         // iterator of onlineRegions to close all user regions.
1272         for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1273           HRegionInfo hri = e.getValue().getRegionInfo();
1274           if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1275               && !closedRegions.contains(hri.getEncodedName())) {
1276             closedRegions.add(hri.getEncodedName());
1277             // Don't update zk with this close transition; pass false.
1278             closeRegionIgnoreErrors(hri, abort);
1279               }
1280         }
1281         // No regions in RIT, we could stop waiting now.
1282         if (this.regionsInTransitionInRS.isEmpty()) {
1283           if (!isOnlineRegionsEmpty()) {
1284             LOG.info("We were exiting though online regions are not empty," +
1285                 " because some regions failed closing");
1286           }
1287           break;
1288         }
1289         if (sleep(200)) {
1290           interrupted = true;
1291         }
1292       }
1293     } finally {
1294       if (interrupted) {
1295         Thread.currentThread().interrupt();
1296       }
1297     }
1298   }
1299 
1300   private boolean sleep(long millis) {
1301     boolean interrupted = false;
1302     try {
1303       Thread.sleep(millis);
1304     } catch (InterruptedException e) {
1305       LOG.warn("Interrupted while sleeping");
1306       interrupted = true;
1307     }
1308     return interrupted;
1309   }
1310 
1311   private void shutdownWAL(final boolean close) {
1312     if (this.walFactory != null) {
1313       try {
1314         if (close) {
1315           walFactory.close();
1316         } else {
1317           walFactory.shutdown();
1318         }
1319       } catch (Throwable e) {
1320         e = RemoteExceptionHandler.checkThrowable(e);
1321         LOG.error("Shutdown / close of WAL failed: " + e);
1322         LOG.debug("Shutdown / close exception details:", e);
1323       }
1324     }
1325   }
1326 
1327   /*
1328    * Run init. Sets up wal and starts up all server threads.
1329    *
1330    * @param c Extra configuration.
1331    */
1332   protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1333   throws IOException {
1334     try {
1335       for (NameStringPair e : c.getMapEntriesList()) {
1336         String key = e.getName();
1337         // The hostname the master sees us as.
1338         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1339           String hostnameFromMasterPOV = e.getValue();
1340           this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1341             rpcServices.isa.getPort(), this.startcode);
1342           if (shouldUseThisHostnameInstead() &&
1343               !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1344             String msg = "Master passed us a different hostname to use; was=" +
1345                 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1346             LOG.error(msg);
1347             throw new IOException(msg);
1348           }
1349           if (!shouldUseThisHostnameInstead() &&
1350               !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1351             String msg = "Master passed us a different hostname to use; was=" +
1352                 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1353             LOG.error(msg);
1354           }
1355           continue;
1356         }
1357         String value = e.getValue();
1358         if (LOG.isDebugEnabled()) {
1359           LOG.info("Config from master: " + key + "=" + value);
1360         }
1361         this.conf.set(key, value);
1362       }
1363 
1364       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
1365       // config param for task trackers, but we can piggyback off of it.
1366       if (this.conf.get("mapreduce.task.attempt.id") == null) {
1367         this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1368           this.serverName.toString());
1369       }
1370 
1371       // Save it in a file, this will allow to see if we crash
1372       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1373 
1374       this.cacheConfig = new CacheConfig(conf);
1375       this.walFactory = setupWALAndReplication();
1376       // Init in here rather than in constructor after thread name has been set
1377       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1378 
1379       startServiceThreads();
1380       startHeapMemoryManager();
1381       LOG.info("Serving as " + this.serverName +
1382         ", RpcServer on " + rpcServices.isa +
1383         ", sessionid=0x" +
1384         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1385 
1386       // Wake up anyone waiting for this server to online
1387       synchronized (online) {
1388         online.set(true);
1389         online.notifyAll();
1390       }
1391     } catch (Throwable e) {
1392       stop("Failed initialization");
1393       throw convertThrowableToIOE(cleanup(e, "Failed init"),
1394           "Region server startup failed");
1395     } finally {
1396       sleeper.skipSleepCycle();
1397     }
1398   }
1399 
1400   private void startHeapMemoryManager() {
1401     this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
1402         this, this.regionServerAccounting);
1403     if (this.hMemManager != null) {
1404       this.hMemManager.start(getChoreService());
1405     }
1406   }
1407 
1408   private void createMyEphemeralNode() throws KeeperException, IOException {
1409     RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1410     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1411     rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1412     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1413     ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1414       getMyEphemeralNodePath(), data);
1415   }
1416 
1417   private void deleteMyEphemeralNode() throws KeeperException {
1418     ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1419   }
1420 
1421   @Override
1422   public RegionServerAccounting getRegionServerAccounting() {
1423     return regionServerAccounting;
1424   }
1425 
1426   @Override
1427   public TableLockManager getTableLockManager() {
1428     return tableLockManager;
1429   }
1430 
1431   /*
1432    * @param r Region to get RegionLoad for.
1433    * @param regionLoadBldr the RegionLoad.Builder, can be null
1434    * @param regionSpecifier the RegionSpecifier.Builder, can be null
1435    * @return RegionLoad instance.
1436    *
1437    * @throws IOException
1438    */
1439   private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1440       RegionSpecifier.Builder regionSpecifier) throws IOException {
1441     byte[] name = r.getRegionInfo().getRegionName();
1442     int stores = 0;
1443     int storefiles = 0;
1444     int storeUncompressedSizeMB = 0;
1445     int storefileSizeMB = 0;
1446     int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1447     int storefileIndexSizeMB = 0;
1448     int rootIndexSizeKB = 0;
1449     int totalStaticIndexSizeKB = 0;
1450     int totalStaticBloomSizeKB = 0;
1451     long totalCompactingKVs = 0;
1452     long currentCompactedKVs = 0;
1453     List<Store> storeList = r.getStores();
1454     stores += storeList.size();
1455     for (Store store : storeList) {
1456       storefiles += store.getStorefilesCount();
1457       storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1458       storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1459       storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1460       CompactionProgress progress = store.getCompactionProgress();
1461       if (progress != null) {
1462         totalCompactingKVs += progress.totalCompactingKVs;
1463         currentCompactedKVs += progress.currentCompactedKVs;
1464       }
1465       rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1466       totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1467       totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1468     }
1469 
1470     float dataLocality =
1471         r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1472     if (regionLoadBldr == null) {
1473       regionLoadBldr = RegionLoad.newBuilder();
1474     }
1475     if (regionSpecifier == null) {
1476       regionSpecifier = RegionSpecifier.newBuilder();
1477     }
1478     regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1479     regionSpecifier.setValue(ByteStringer.wrap(name));
1480     regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1481       .setStores(stores)
1482       .setStorefiles(storefiles)
1483       .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1484       .setStorefileSizeMB(storefileSizeMB)
1485       .setMemstoreSizeMB(memstoreSizeMB)
1486       .setStorefileIndexSizeMB(storefileIndexSizeMB)
1487       .setRootIndexSizeKB(rootIndexSizeKB)
1488       .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1489       .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1490       .setReadRequestsCount(r.getReadRequestsCount())
1491       .setWriteRequestsCount(r.getWriteRequestsCount())
1492       .setTotalCompactingKVs(totalCompactingKVs)
1493       .setCurrentCompactedKVs(currentCompactedKVs)
1494       .setDataLocality(dataLocality)
1495       .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1496     ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1497 
1498     return regionLoadBldr.build();
1499   }
1500 
1501   /**
1502    * @param encodedRegionName
1503    * @return An instance of RegionLoad.
1504    */
1505   public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1506     Region r = onlineRegions.get(encodedRegionName);
1507     return r != null ? createRegionLoad(r, null, null) : null;
1508   }
1509 
1510   /*
1511    * Inner class that runs on a long period checking if regions need compaction.
1512    */
1513   private static class CompactionChecker extends ScheduledChore {
1514     private final HRegionServer instance;
1515     private final int majorCompactPriority;
1516     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1517     private long iteration = 0;
1518 
1519     CompactionChecker(final HRegionServer h, final int sleepTime,
1520         final Stoppable stopper) {
1521       super("CompactionChecker", stopper, sleepTime);
1522       this.instance = h;
1523       LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1524 
1525       /* MajorCompactPriority is configurable.
1526        * If not set, the compaction will use default priority.
1527        */
1528       this.majorCompactPriority = this.instance.conf.
1529         getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1530         DEFAULT_PRIORITY);
1531     }
1532 
1533     @Override
1534     protected void chore() {
1535       for (Region r : this.instance.onlineRegions.values()) {
1536         if (r == null)
1537           continue;
1538         for (Store s : r.getStores()) {
1539           try {
1540             long multiplier = s.getCompactionCheckMultiplier();
1541             assert multiplier > 0;
1542             if (iteration % multiplier != 0) continue;
1543             if (s.needsCompaction()) {
1544               // Queue a compaction. Will recognize if major is needed.
1545               this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1546                   + " requests compaction");
1547             } else if (s.isMajorCompaction()) {
1548               if (majorCompactPriority == DEFAULT_PRIORITY
1549                   || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1550                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1551                     + " requests major compaction; use default priority", null);
1552               } else {
1553                 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1554                     + " requests major compaction; use configured priority",
1555                   this.majorCompactPriority, null, null);
1556               }
1557             }
1558           } catch (IOException e) {
1559             LOG.warn("Failed major compaction check on " + r, e);
1560           }
1561         }
1562       }
1563       iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1564     }
1565   }
1566 
1567   static class PeriodicMemstoreFlusher extends ScheduledChore {
1568     final HRegionServer server;
1569     final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
1570     final static int MIN_DELAY_TIME = 0; // millisec
1571     public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1572       super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1573       this.server = server;
1574     }
1575 
1576     @Override
1577     protected void chore() {
1578       final StringBuffer whyFlush = new StringBuffer();
1579       for (Region r : this.server.onlineRegions.values()) {
1580         if (r == null) continue;
1581         if (((HRegion)r).shouldFlush(whyFlush)) {
1582           FlushRequester requester = server.getFlushRequester();
1583           if (requester != null) {
1584             long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1585             LOG.info(getName() + " requesting flush of " +
1586               r.getRegionInfo().getRegionNameAsString() + " because " +
1587               whyFlush.toString() +
1588               " after random delay " + randomDelay + "ms");
1589             //Throttle the flushes by putting a delay. If we don't throttle, and there
1590             //is a balanced write-load on the regions in a table, we might end up
1591             //overwhelming the filesystem with too many flushes at once.
1592             requester.requestDelayedFlush(r, randomDelay, false);
1593           }
1594         }
1595       }
1596     }
1597   }
1598 
1599   /**
1600    * Report the status of the server. A server is online once all the startup is
1601    * completed (setting up filesystem, starting service threads, etc.). This
1602    * method is designed mostly to be useful in tests.
1603    *
1604    * @return true if online, false if not.
1605    */
1606   public boolean isOnline() {
1607     return online.get();
1608   }
1609 
1610   /**
1611    * Setup WAL log and replication if enabled.
1612    * Replication setup is done in here because it wants to be hooked up to WAL.
1613    * @return A WAL instance.
1614    * @throws IOException
1615    */
1616   private WALFactory setupWALAndReplication() throws IOException {
1617     // TODO Replication make assumptions here based on the default filesystem impl
1618     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1619     final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1620 
1621     Path logdir = new Path(rootDir, logName);
1622     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1623     if (this.fs.exists(logdir)) {
1624       throw new RegionServerRunningException("Region server has already " +
1625         "created directory at " + this.serverName.toString());
1626     }
1627 
1628     // Instantiate replication manager if replication enabled.  Pass it the
1629     // log directories.
1630     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1631 
1632     // listeners the wal factory will add to wals it creates.
1633     final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1634     listeners.add(new MetricsWAL());
1635     if (this.replicationSourceHandler != null &&
1636         this.replicationSourceHandler.getWALActionsListener() != null) {
1637       // Replication handler is an implementation of WALActionsListener.
1638       listeners.add(this.replicationSourceHandler.getWALActionsListener());
1639     }
1640 
1641     return new WALFactory(conf, listeners, serverName.toString());
1642   }
1643 
1644   /**
1645    * We initialize the roller for the wal that handles meta lazily
1646    * since we don't know if this regionserver will handle it. All calls to
1647    * this method return a reference to the that same roller. As newly referenced
1648    * meta regions are brought online, they will be offered to the roller for maintenance.
1649    * As a part of that registration process, the roller will add itself as a
1650    * listener on the wal.
1651    */
1652   protected LogRoller ensureMetaWALRoller() {
1653     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
1654     // null
1655     LogRoller roller = metawalRoller.get();
1656     if (null == roller) {
1657       LogRoller tmpLogRoller = new LogRoller(this, this);
1658       String n = Thread.currentThread().getName();
1659       Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1660           n + "-MetaLogRoller", uncaughtExceptionHandler);
1661       if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1662         roller = tmpLogRoller;
1663       } else {
1664         // Another thread won starting the roller
1665         Threads.shutdown(tmpLogRoller.getThread());
1666         roller = metawalRoller.get();
1667       }
1668     }
1669     return roller;
1670   }
1671 
1672   public MetricsRegionServer getRegionServerMetrics() {
1673     return this.metricsRegionServer;
1674   }
1675 
1676   /**
1677    * @return Master address tracker instance.
1678    */
1679   public MasterAddressTracker getMasterAddressTracker() {
1680     return this.masterAddressTracker;
1681   }
1682 
1683   /*
1684    * Start maintenance Threads, Server, Worker and lease checker threads.
1685    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1686    * get an unhandled exception. We cannot set the handler on all threads.
1687    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1688    * waits a while then retries. Meantime, a flush or a compaction that tries to
1689    * run should trigger same critical condition and the shutdown will run. On
1690    * its way out, this server will shut down Server. Leases are sort of
1691    * inbetween. It has an internal thread that while it inherits from Chore, it
1692    * keeps its own internal stop mechanism so needs to be stopped by this
1693    * hosting server. Worker logs the exception and exits.
1694    */
1695   private void startServiceThreads() throws IOException {
1696     // Start executor services
1697     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1698       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1699     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1700       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1701     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1702       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1703     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1704       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1705     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1706       this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1707         conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1708     }
1709     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1710        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1711 
1712     if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1713       this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1714         conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1715           conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1716     }
1717 
1718     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1719         uncaughtExceptionHandler);
1720     this.cacheFlusher.start(uncaughtExceptionHandler);
1721 
1722     if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1723     if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1724     if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1725     if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1726     if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1727     if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1728 
1729     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1730     // an unhandled exception, it will just exit.
1731     Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1732       uncaughtExceptionHandler);
1733 
1734     if (this.replicationSourceHandler == this.replicationSinkHandler &&
1735         this.replicationSourceHandler != null) {
1736       this.replicationSourceHandler.startReplicationService();
1737     } else {
1738       if (this.replicationSourceHandler != null) {
1739         this.replicationSourceHandler.startReplicationService();
1740       }
1741       if (this.replicationSinkHandler != null) {
1742         this.replicationSinkHandler.startReplicationService();
1743       }
1744     }
1745 
1746     // Create the log splitting worker and start it
1747     // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1748     // quite a while inside HConnection layer. The worker won't be available for other
1749     // tasks even after current task is preempted after a split task times out.
1750     Configuration sinkConf = HBaseConfiguration.create(conf);
1751     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1752       conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1753     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1754       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1755     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1756     this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1757     splitLogWorker.start();
1758   }
1759 
1760   /**
1761    * Puts up the webui.
1762    * @return Returns final port -- maybe different from what we started with.
1763    * @throws IOException
1764    */
1765   private int putUpWebUI() throws IOException {
1766     int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1767       HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1768     String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1769 
1770     if(this instanceof HMaster) {
1771       port = conf.getInt(HConstants.MASTER_INFO_PORT,
1772           HConstants.DEFAULT_MASTER_INFOPORT);
1773       addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1774     }
1775     // -1 is for disabling info server
1776     if (port < 0) return port;
1777 
1778     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1779       String msg =
1780           "Failed to start http info server. Address " + addr
1781               + " does not belong to this host. Correct configuration parameter: "
1782               + "hbase.regionserver.info.bindAddress";
1783       LOG.error(msg);
1784       throw new IOException(msg);
1785     }
1786     // check if auto port bind enabled
1787     boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1788         false);
1789     while (true) {
1790       try {
1791         this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1792         infoServer.addServlet("dump", "/dump", getDumpServlet());
1793         configureInfoServer();
1794         this.infoServer.start();
1795         break;
1796       } catch (BindException e) {
1797         if (!auto) {
1798           // auto bind disabled throw BindException
1799           LOG.error("Failed binding http info server to port: " + port);
1800           throw e;
1801         }
1802         // auto bind enabled, try to use another port
1803         LOG.info("Failed binding http info server to port: " + port);
1804         port++;
1805       }
1806     }
1807     port = this.infoServer.getPort();
1808     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1809     int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1810       HConstants.DEFAULT_MASTER_INFOPORT);
1811     conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1812     conf.setInt(HConstants.MASTER_INFO_PORT, port);
1813     return port;
1814   }
1815 
1816   /*
1817    * Verify that server is healthy
1818    */
1819   private boolean isHealthy() {
1820     if (!fsOk) {
1821       // File system problem
1822       return false;
1823     }
1824     // Verify that all threads are alive
1825     if (!(leases.isAlive()
1826         && cacheFlusher.isAlive() && walRoller.isAlive()
1827         && this.compactionChecker.isScheduled()
1828         && this.periodicFlusher.isScheduled())) {
1829       stop("One or more threads are no longer alive -- stop");
1830       return false;
1831     }
1832     final LogRoller metawalRoller = this.metawalRoller.get();
1833     if (metawalRoller != null && !metawalRoller.isAlive()) {
1834       stop("Meta WAL roller thread is no longer alive -- stop");
1835       return false;
1836     }
1837     return true;
1838   }
1839 
1840   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1841 
1842   @Override
1843   public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1844     WAL wal;
1845     LogRoller roller = walRoller;
1846     //_ROOT_ and hbase:meta regions have separate WAL.
1847     if (regionInfo != null && regionInfo.isMetaTable() &&
1848         regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1849       roller = ensureMetaWALRoller();
1850       wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1851     } else if (regionInfo == null) {
1852       wal = walFactory.getWAL(UNSPECIFIED_REGION);
1853     } else {
1854       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1855     }
1856     roller.addWAL(wal);
1857     return wal;
1858   }
1859 
1860   @Override
1861   public ClusterConnection getConnection() {
1862     return this.clusterConnection;
1863   }
1864 
1865   @Override
1866   public MetaTableLocator getMetaTableLocator() {
1867     return this.metaTableLocator;
1868   }
1869 
1870   @Override
1871   public void stop(final String msg) {
1872     if (!this.stopped) {
1873       try {
1874         if (this.rsHost != null) {
1875           this.rsHost.preStop(msg);
1876         }
1877         this.stopped = true;
1878         LOG.info("STOPPED: " + msg);
1879         // Wakes run() if it is sleeping
1880         sleeper.skipSleepCycle();
1881       } catch (IOException exp) {
1882         LOG.warn("The region server did not stop", exp);
1883       }
1884     }
1885   }
1886 
1887   public void waitForServerOnline(){
1888     while (!isStopped() && !isOnline()) {
1889       synchronized (online) {
1890         try {
1891           online.wait(msgInterval);
1892         } catch (InterruptedException ie) {
1893           Thread.currentThread().interrupt();
1894           break;
1895         }
1896       }
1897     }
1898   }
1899 
1900   @Override
1901   public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1902     postOpenDeployTasks(new PostOpenDeployContext(r, -1));
1903   }
1904 
1905   @Override
1906   public void postOpenDeployTasks(final PostOpenDeployContext context)
1907       throws KeeperException, IOException {
1908     Region r = context.getRegion();
1909     long masterSystemTime = context.getMasterSystemTime();
1910     Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
1911     rpcServices.checkOpen();
1912     LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
1913     // Do checks to see if we need to compact (references or too many files)
1914     for (Store s : r.getStores()) {
1915       if (s.hasReferences() || s.needsCompaction()) {
1916        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1917       }
1918     }
1919     long openSeqNum = r.getOpenSeqNum();
1920     if (openSeqNum == HConstants.NO_SEQNUM) {
1921       // If we opened a region, we should have read some sequence number from it.
1922       LOG.error("No sequence number found when opening " +
1923         r.getRegionInfo().getRegionNameAsString());
1924       openSeqNum = 0;
1925     }
1926 
1927     // Update flushed sequence id of a recovering region in ZK
1928     updateRecoveringRegionLastFlushedSequenceId(r);
1929 
1930     // Update ZK, or META
1931     if (r.getRegionInfo().isMetaRegion()) {
1932       MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, r.getRegionInfo().getReplicaId(),
1933          State.OPEN);
1934     } else if (useZKForAssignment) {
1935       MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
1936         this.serverName, openSeqNum, masterSystemTime);
1937     }
1938     if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
1939         TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
1940       throw new IOException("Failed to report opened region to master: "
1941         + r.getRegionInfo().getRegionNameAsString());
1942     }
1943 
1944     triggerFlushInPrimaryRegion((HRegion)r);
1945 
1946     LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
1947   }
1948 
1949   @Override
1950   public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1951     return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1952   }
1953 
1954   @Override
1955   public boolean reportRegionStateTransition(
1956       TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1957     return reportRegionStateTransition(
1958       new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
1959   }
1960 
1961   @Override
1962   public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
1963     TransitionCode code = context.getCode();
1964     long openSeqNum = context.getOpenSeqNum();
1965     HRegionInfo[] hris = context.getHris();
1966 
1967     ReportRegionStateTransitionRequest.Builder builder =
1968       ReportRegionStateTransitionRequest.newBuilder();
1969     builder.setServer(ProtobufUtil.toServerName(serverName));
1970     RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1971     transition.setTransitionCode(code);
1972     if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1973       transition.setOpenSeqNum(openSeqNum);
1974     }
1975     for (HRegionInfo hri: hris) {
1976       transition.addRegionInfo(HRegionInfo.convert(hri));
1977     }
1978     ReportRegionStateTransitionRequest request = builder.build();
1979     while (keepLooping()) {
1980       RegionServerStatusService.BlockingInterface rss = rssStub;
1981       try {
1982         if (rss == null) {
1983           createRegionServerStatusStub();
1984           continue;
1985         }
1986         ReportRegionStateTransitionResponse response =
1987           rss.reportRegionStateTransition(null, request);
1988         if (response.hasErrorMessage()) {
1989           LOG.info("Failed to transition " + hris[0]
1990             + " to " + code + ": " + response.getErrorMessage());
1991           return false;
1992         }
1993         return true;
1994       } catch (ServiceException se) {
1995         IOException ioe = ProtobufUtil.getRemoteException(se);
1996         LOG.info("Failed to report region transition, will retry", ioe);
1997         if (rssStub == rss) {
1998           rssStub = null;
1999         }
2000       }
2001     }
2002     return false;
2003   }
2004 
2005   /**
2006    * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2007    * block this thread. See RegionReplicaFlushHandler for details.
2008    */
2009   void triggerFlushInPrimaryRegion(final HRegion region) {
2010     if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2011       return;
2012     }
2013     if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2014         !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2015           region.conf)) {
2016       region.setReadsEnabled(true);
2017       return;
2018     }
2019 
2020     region.setReadsEnabled(false); // disable reads before marking the region as opened.
2021     // RegionReplicaFlushHandler might reset this.
2022 
2023     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2024     this.service.submit(
2025       new RegionReplicaFlushHandler(this, clusterConnection,
2026         rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2027   }
2028 
2029   @Override
2030   public RpcServerInterface getRpcServer() {
2031     return rpcServices.rpcServer;
2032   }
2033 
2034   @VisibleForTesting
2035   public RSRpcServices getRSRpcServices() {
2036     return rpcServices;
2037   }
2038 
2039   /**
2040    * Cause the server to exit without closing the regions it is serving, the log
2041    * it is using and without notifying the master. Used unit testing and on
2042    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2043    *
2044    * @param reason
2045    *          the reason we are aborting
2046    * @param cause
2047    *          the exception that caused the abort, or null
2048    */
2049   @Override
2050   public void abort(String reason, Throwable cause) {
2051     String msg = "ABORTING region server " + this + ": " + reason;
2052     if (cause != null) {
2053       LOG.fatal(msg, cause);
2054     } else {
2055       LOG.fatal(msg);
2056     }
2057     this.abortRequested = true;
2058     // HBASE-4014: show list of coprocessors that were loaded to help debug
2059     // regionserver crashes.Note that we're implicitly using
2060     // java.util.HashSet's toString() method to print the coprocessor names.
2061     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2062         CoprocessorHost.getLoadedCoprocessors());
2063     // Try and dump metrics if abort -- might give clue as to how fatal came about....
2064     try {
2065       LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2066     } catch (MalformedObjectNameException | IOException e) {
2067       LOG.warn("Failed dumping metrics", e);
2068     }
2069 
2070     // Do our best to report our abort to the master, but this may not work
2071     try {
2072       if (cause != null) {
2073         msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2074       }
2075       // Report to the master but only if we have already registered with the master.
2076       if (rssStub != null && this.serverName != null) {
2077         ReportRSFatalErrorRequest.Builder builder =
2078           ReportRSFatalErrorRequest.newBuilder();
2079         ServerName sn =
2080           ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2081         builder.setServer(ProtobufUtil.toServerName(sn));
2082         builder.setErrorMessage(msg);
2083         rssStub.reportRSFatalError(null, builder.build());
2084       }
2085     } catch (Throwable t) {
2086       LOG.warn("Unable to report fatal error to master", t);
2087     }
2088     stop(reason);
2089   }
2090 
2091   /**
2092    * @see HRegionServer#abort(String, Throwable)
2093    */
2094   public void abort(String reason) {
2095     abort(reason, null);
2096   }
2097 
2098   @Override
2099   public boolean isAborted() {
2100     return this.abortRequested;
2101   }
2102 
2103   /*
2104    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2105    * logs but it does close socket in case want to bring up server on old
2106    * hostname+port immediately.
2107    */
2108   protected void kill() {
2109     this.killed = true;
2110     abort("Simulated kill");
2111   }
2112 
2113   /**
2114    * Called on stop/abort before closing the cluster connection and meta locator.
2115    */
2116   protected void sendShutdownInterrupt() {
2117   }
2118 
2119   /**
2120    * Wait on all threads to finish. Presumption is that all closes and stops
2121    * have already been called.
2122    */
2123   protected void stopServiceThreads() {
2124     // clean up the scheduled chores
2125     if (this.choreService != null) choreService.shutdown();
2126     if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2127     if (this.compactionChecker != null) compactionChecker.cancel(true);
2128     if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2129     if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2130     if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2131     if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2132 
2133     if (this.cacheFlusher != null) {
2134       this.cacheFlusher.join();
2135     }
2136 
2137     if (this.spanReceiverHost != null) {
2138       this.spanReceiverHost.closeReceivers();
2139     }
2140     if (this.walRoller != null) {
2141       Threads.shutdown(this.walRoller.getThread());
2142     }
2143     final LogRoller metawalRoller = this.metawalRoller.get();
2144     if (metawalRoller != null) {
2145       Threads.shutdown(metawalRoller.getThread());
2146     }
2147     if (this.compactSplitThread != null) {
2148       this.compactSplitThread.join();
2149     }
2150     if (this.service != null) this.service.shutdown();
2151     if (this.replicationSourceHandler != null &&
2152         this.replicationSourceHandler == this.replicationSinkHandler) {
2153       this.replicationSourceHandler.stopReplicationService();
2154     } else {
2155       if (this.replicationSourceHandler != null) {
2156         this.replicationSourceHandler.stopReplicationService();
2157       }
2158       if (this.replicationSinkHandler != null) {
2159         this.replicationSinkHandler.stopReplicationService();
2160       }
2161     }
2162   }
2163 
2164   /**
2165    * @return Return the object that implements the replication
2166    * source service.
2167    */
2168   ReplicationSourceService getReplicationSourceService() {
2169     return replicationSourceHandler;
2170   }
2171 
2172   /**
2173    * @return Return the object that implements the replication
2174    * sink service.
2175    */
2176   ReplicationSinkService getReplicationSinkService() {
2177     return replicationSinkHandler;
2178   }
2179 
2180   /**
2181    * Get the current master from ZooKeeper and open the RPC connection to it.
2182    * To get a fresh connection, the current rssStub must be null.
2183    * Method will block until a master is available. You can break from this
2184    * block by requesting the server stop.
2185    *
2186    * @return master + port, or null if server has been stopped
2187    */
2188   @VisibleForTesting
2189   protected synchronized ServerName createRegionServerStatusStub() {
2190     if (rssStub != null) {
2191       return masterAddressTracker.getMasterAddress();
2192     }
2193     ServerName sn = null;
2194     long previousLogTime = 0;
2195     boolean refresh = false; // for the first time, use cached data
2196     RegionServerStatusService.BlockingInterface intf = null;
2197     boolean interrupted = false;
2198     try {
2199       while (keepLooping()) {
2200         sn = this.masterAddressTracker.getMasterAddress(refresh);
2201         if (sn == null) {
2202           if (!keepLooping()) {
2203             // give up with no connection.
2204             LOG.debug("No master found and cluster is stopped; bailing out");
2205             return null;
2206           }
2207           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2208             LOG.debug("No master found; retry");
2209             previousLogTime = System.currentTimeMillis();
2210           }
2211           refresh = true; // let's try pull it from ZK directly
2212           if (sleep(200)) {
2213             interrupted = true;
2214           }
2215           continue;
2216         }
2217 
2218         // If we are on the active master, use the shortcut
2219         if (this instanceof HMaster && sn.equals(getServerName())) {
2220           intf = ((HMaster)this).getMasterRpcServices();
2221           break;
2222         }
2223         try {
2224           BlockingRpcChannel channel =
2225             this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2226               shortOperationTimeout);
2227           intf = RegionServerStatusService.newBlockingStub(channel);
2228           break;
2229         } catch (IOException e) {
2230           if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2231             e = e instanceof RemoteException ?
2232               ((RemoteException)e).unwrapRemoteException() : e;
2233             if (e instanceof ServerNotRunningYetException) {
2234               LOG.info("Master isn't available yet, retrying");
2235             } else {
2236               LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2237             }
2238             previousLogTime = System.currentTimeMillis();
2239           }
2240           if (sleep(200)) {
2241             interrupted = true;
2242           }
2243         }
2244       }
2245     } finally {
2246       if (interrupted) {
2247         Thread.currentThread().interrupt();
2248       }
2249     }
2250     rssStub = intf;
2251     return sn;
2252   }
2253 
2254   /**
2255    * @return True if we should break loop because cluster is going down or
2256    * this server has been stopped or hdfs has gone bad.
2257    */
2258   private boolean keepLooping() {
2259     return !this.stopped && isClusterUp();
2260   }
2261 
2262   /*
2263    * Let the master know we're here Run initialization using parameters passed
2264    * us by the master.
2265    * @return A Map of key/value configurations we got from the Master else
2266    * null if we failed to register.
2267    * @throws IOException
2268    */
2269   private RegionServerStartupResponse reportForDuty() throws IOException {
2270     ServerName masterServerName = createRegionServerStatusStub();
2271     if (masterServerName == null) return null;
2272     RegionServerStartupResponse result = null;
2273     try {
2274       rpcServices.requestCount.set(0);
2275       LOG.info("reportForDuty to master=" + masterServerName + " with port="
2276         + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2277       long now = EnvironmentEdgeManager.currentTime();
2278       int port = rpcServices.isa.getPort();
2279       RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2280       if (shouldUseThisHostnameInstead()) {
2281         request.setUseThisHostnameInstead(useThisHostnameInstead);
2282       }
2283       request.setPort(port);
2284       request.setServerStartCode(this.startcode);
2285       request.setServerCurrentTime(now);
2286       result = this.rssStub.regionServerStartup(null, request.build());
2287     } catch (ServiceException se) {
2288       IOException ioe = ProtobufUtil.getRemoteException(se);
2289       if (ioe instanceof ClockOutOfSyncException) {
2290         LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2291         // Re-throw IOE will cause RS to abort
2292         throw ioe;
2293       } else if (ioe instanceof ServerNotRunningYetException) {
2294         LOG.debug("Master is not running yet");
2295       } else {
2296         LOG.warn("error telling master we are up", se);
2297       }
2298       rssStub = null;
2299     }
2300     return result;
2301   }
2302 
2303   @Override
2304   public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2305     try {
2306       GetLastFlushedSequenceIdRequest req =
2307           RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2308       RegionServerStatusService.BlockingInterface rss = rssStub;
2309       if (rss == null) { // Try to connect one more time
2310         createRegionServerStatusStub();
2311         rss = rssStub;
2312         if (rss == null) {
2313           // Still no luck, we tried
2314           LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2315           return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2316               .build();
2317         }
2318       }
2319       GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2320       return RegionStoreSequenceIds.newBuilder()
2321           .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2322           .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2323     } catch (ServiceException e) {
2324       LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2325       return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2326           .build();
2327     }
2328   }
2329 
2330   /**
2331    * Closes all regions.  Called on our way out.
2332    * Assumes that its not possible for new regions to be added to onlineRegions
2333    * while this method runs.
2334    */
2335   protected void closeAllRegions(final boolean abort) {
2336     closeUserRegions(abort);
2337     closeMetaTableRegions(abort);
2338   }
2339 
2340   /**
2341    * Close meta region if we carry it
2342    * @param abort Whether we're running an abort.
2343    */
2344   void closeMetaTableRegions(final boolean abort) {
2345     Region meta = null;
2346     this.lock.writeLock().lock();
2347     try {
2348       for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2349         HRegionInfo hri = e.getValue().getRegionInfo();
2350         if (hri.isMetaRegion()) {
2351           meta = e.getValue();
2352         }
2353         if (meta != null) break;
2354       }
2355     } finally {
2356       this.lock.writeLock().unlock();
2357     }
2358     if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2359   }
2360 
2361   /**
2362    * Schedule closes on all user regions.
2363    * Should be safe calling multiple times because it wont' close regions
2364    * that are already closed or that are closing.
2365    * @param abort Whether we're running an abort.
2366    */
2367   void closeUserRegions(final boolean abort) {
2368     this.lock.writeLock().lock();
2369     try {
2370       for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2371         Region r = e.getValue();
2372         if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2373           // Don't update zk with this close transition; pass false.
2374           closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2375         }
2376       }
2377     } finally {
2378       this.lock.writeLock().unlock();
2379     }
2380   }
2381 
2382   /** @return the info server */
2383   public InfoServer getInfoServer() {
2384     return infoServer;
2385   }
2386 
2387   /**
2388    * @return true if a stop has been requested.
2389    */
2390   @Override
2391   public boolean isStopped() {
2392     return this.stopped;
2393   }
2394 
2395   @Override
2396   public boolean isStopping() {
2397     return this.stopping;
2398   }
2399 
2400   @Override
2401   public Map<String, Region> getRecoveringRegions() {
2402     return this.recoveringRegions;
2403   }
2404 
2405   /**
2406    *
2407    * @return the configuration
2408    */
2409   @Override
2410   public Configuration getConfiguration() {
2411     return conf;
2412   }
2413 
2414   /** @return the write lock for the server */
2415   ReentrantReadWriteLock.WriteLock getWriteLock() {
2416     return lock.writeLock();
2417   }
2418 
2419   public int getNumberOfOnlineRegions() {
2420     return this.onlineRegions.size();
2421   }
2422 
2423   boolean isOnlineRegionsEmpty() {
2424     return this.onlineRegions.isEmpty();
2425   }
2426 
2427   /**
2428    * For tests, web ui and metrics.
2429    * This method will only work if HRegionServer is in the same JVM as client;
2430    * HRegion cannot be serialized to cross an rpc.
2431    */
2432   public Collection<Region> getOnlineRegionsLocalContext() {
2433     Collection<Region> regions = this.onlineRegions.values();
2434     return Collections.unmodifiableCollection(regions);
2435   }
2436 
2437   @Override
2438   public void addToOnlineRegions(Region region) {
2439     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2440     configurationManager.registerObserver(region);
2441   }
2442 
2443   /**
2444    * @return A new Map of online regions sorted by region size with the first entry being the
2445    * biggest.  If two regions are the same size, then the last one found wins; i.e. this method
2446    * may NOT return all regions.
2447    */
2448   SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2449     // we'll sort the regions in reverse
2450     SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2451         new Comparator<Long>() {
2452           @Override
2453           public int compare(Long a, Long b) {
2454             return -1 * a.compareTo(b);
2455           }
2456         });
2457     // Copy over all regions. Regions are sorted by size with biggest first.
2458     for (Region region : this.onlineRegions.values()) {
2459       sortedRegions.put(region.getMemstoreSize(), region);
2460     }
2461     return sortedRegions;
2462   }
2463 
2464   /**
2465    * @return time stamp in millis of when this region server was started
2466    */
2467   public long getStartcode() {
2468     return this.startcode;
2469   }
2470 
2471   /** @return reference to FlushRequester */
2472   @Override
2473   public FlushRequester getFlushRequester() {
2474     return this.cacheFlusher;
2475   }
2476 
2477   /**
2478    * Get the top N most loaded regions this server is serving so we can tell the
2479    * master which regions it can reallocate if we're overloaded. TODO: actually
2480    * calculate which regions are most loaded. (Right now, we're just grabbing
2481    * the first N regions being served regardless of load.)
2482    */
2483   protected HRegionInfo[] getMostLoadedRegions() {
2484     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2485     for (Region r : onlineRegions.values()) {
2486       if (!r.isAvailable()) {
2487         continue;
2488       }
2489       if (regions.size() < numRegionsToReport) {
2490         regions.add(r.getRegionInfo());
2491       } else {
2492         break;
2493       }
2494     }
2495     return regions.toArray(new HRegionInfo[regions.size()]);
2496   }
2497 
2498   @Override
2499   public Leases getLeases() {
2500     return leases;
2501   }
2502 
2503   /**
2504    * @return Return the rootDir.
2505    */
2506   protected Path getRootDir() {
2507     return rootDir;
2508   }
2509 
2510   /**
2511    * @return Return the fs.
2512    */
2513   @Override
2514   public FileSystem getFileSystem() {
2515     return fs;
2516   }
2517 
2518   @Override
2519   public String toString() {
2520     return getServerName().toString();
2521   }
2522 
2523   /**
2524    * Interval at which threads should run
2525    *
2526    * @return the interval
2527    */
2528   public int getThreadWakeFrequency() {
2529     return threadWakeFrequency;
2530   }
2531 
2532   @Override
2533   public ZooKeeperWatcher getZooKeeper() {
2534     return zooKeeper;
2535   }
2536 
2537   @Override
2538   public BaseCoordinatedStateManager getCoordinatedStateManager() {
2539     return csm;
2540   }
2541 
2542   @Override
2543   public ServerName getServerName() {
2544     return serverName;
2545   }
2546 
2547   @Override
2548   public CompactionRequestor getCompactionRequester() {
2549     return this.compactSplitThread;
2550   }
2551 
2552   public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2553     return this.rsHost;
2554   }
2555 
2556   @Override
2557   public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2558     return this.regionsInTransitionInRS;
2559   }
2560 
2561   @Override
2562   public ExecutorService getExecutorService() {
2563     return service;
2564   }
2565 
2566   @Override
2567   public ChoreService getChoreService() {
2568     return choreService;
2569   }
2570   
2571   @Override
2572   public RegionServerQuotaManager getRegionServerQuotaManager() {
2573     return rsQuotaManager;
2574   }
2575 
2576   //
2577   // Main program and support routines
2578   //
2579 
2580   /**
2581    * Load the replication service objects, if any
2582    */
2583   static private void createNewReplicationInstance(Configuration conf,
2584     HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2585 
2586     // If replication is not enabled, then return immediately.
2587     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2588         HConstants.REPLICATION_ENABLE_DEFAULT)) {
2589       return;
2590     }
2591 
2592     // read in the name of the source replication class from the config file.
2593     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2594                                HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2595 
2596     // read in the name of the sink replication class from the config file.
2597     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2598                              HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2599 
2600     // If both the sink and the source class names are the same, then instantiate
2601     // only one object.
2602     if (sourceClassname.equals(sinkClassname)) {
2603       server.replicationSourceHandler = (ReplicationSourceService)
2604                                          newReplicationInstance(sourceClassname,
2605                                          conf, server, fs, logDir, oldLogDir);
2606       server.replicationSinkHandler = (ReplicationSinkService)
2607                                          server.replicationSourceHandler;
2608     } else {
2609       server.replicationSourceHandler = (ReplicationSourceService)
2610                                          newReplicationInstance(sourceClassname,
2611                                          conf, server, fs, logDir, oldLogDir);
2612       server.replicationSinkHandler = (ReplicationSinkService)
2613                                          newReplicationInstance(sinkClassname,
2614                                          conf, server, fs, logDir, oldLogDir);
2615     }
2616   }
2617 
2618   static private ReplicationService newReplicationInstance(String classname,
2619     Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2620     Path oldLogDir) throws IOException{
2621 
2622     Class<?> clazz = null;
2623     try {
2624       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2625       clazz = Class.forName(classname, true, classLoader);
2626     } catch (java.lang.ClassNotFoundException nfe) {
2627       throw new IOException("Could not find class for " + classname);
2628     }
2629 
2630     // create an instance of the replication object.
2631     ReplicationService service = (ReplicationService)
2632                               ReflectionUtils.newInstance(clazz, conf);
2633     service.initialize(server, fs, logDir, oldLogDir);
2634     return service;
2635   }
2636 
2637   /**
2638    * Utility for constructing an instance of the passed HRegionServer class.
2639    *
2640    * @param regionServerClass
2641    * @param conf2
2642    * @return HRegionServer instance.
2643    */
2644   public static HRegionServer constructRegionServer(
2645       Class<? extends HRegionServer> regionServerClass,
2646       final Configuration conf2, CoordinatedStateManager cp) {
2647     try {
2648       Constructor<? extends HRegionServer> c = regionServerClass
2649           .getConstructor(Configuration.class, CoordinatedStateManager.class);
2650       return c.newInstance(conf2, cp);
2651     } catch (Exception e) {
2652       throw new RuntimeException("Failed construction of " + "Regionserver: "
2653           + regionServerClass.toString(), e);
2654     }
2655   }
2656 
2657   /**
2658    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2659    */
2660   public static void main(String[] args) throws Exception {
2661     VersionInfo.logVersion();
2662     Configuration conf = HBaseConfiguration.create();
2663     @SuppressWarnings("unchecked")
2664     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2665         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2666 
2667     new HRegionServerCommandLine(regionServerClass).doMain(args);
2668   }
2669 
2670   /**
2671    * Gets the online regions of the specified table.
2672    * This method looks at the in-memory onlineRegions.  It does not go to <code>hbase:meta</code>.
2673    * Only returns <em>online</em> regions.  If a region on this table has been
2674    * closed during a disable, etc., it will not be included in the returned list.
2675    * So, the returned list may not necessarily be ALL regions in this table, its
2676    * all the ONLINE regions in the table.
2677    * @param tableName
2678    * @return Online regions from <code>tableName</code>
2679    */
2680   @Override
2681   public List<Region> getOnlineRegions(TableName tableName) {
2682      List<Region> tableRegions = new ArrayList<Region>();
2683      synchronized (this.onlineRegions) {
2684        for (Region region: this.onlineRegions.values()) {
2685          HRegionInfo regionInfo = region.getRegionInfo();
2686          if(regionInfo.getTable().equals(tableName)) {
2687            tableRegions.add(region);
2688          }
2689        }
2690      }
2691      return tableRegions;
2692    }
2693   
2694   /**
2695    * Gets the online tables in this RS.
2696    * This method looks at the in-memory onlineRegions.
2697    * @return all the online tables in this RS
2698    */
2699   @Override
2700   public Set<TableName> getOnlineTables() {
2701     Set<TableName> tables = new HashSet<TableName>();
2702     synchronized (this.onlineRegions) {
2703       for (Region region: this.onlineRegions.values()) {
2704         tables.add(region.getTableDesc().getTableName());
2705       }
2706     }
2707     return tables;
2708   }
2709 
2710   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2711   public String[] getRegionServerCoprocessors() {
2712     TreeSet<String> coprocessors = new TreeSet<String>();
2713     try {
2714       coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2715     } catch (IOException exception) {
2716       LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2717           "skipping.");
2718       LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2719     }
2720     Collection<Region> regions = getOnlineRegionsLocalContext();
2721     for (Region region: regions) {
2722       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2723       try {
2724         coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2725       } catch (IOException exception) {
2726         LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2727             "; skipping.");
2728         LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2729       }
2730     }
2731     return coprocessors.toArray(new String[coprocessors.size()]);
2732   }
2733 
2734   /**
2735    * Try to close the region, logs a warning on failure but continues.
2736    * @param region Region to close
2737    */
2738   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2739     try {
2740       CloseRegionCoordination.CloseRegionDetails details =
2741         csm.getCloseRegionCoordination().getDetaultDetails();
2742       if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2743         LOG.warn("Failed to close " + region.getRegionNameAsString() +
2744             " - ignoring and continuing");
2745       }
2746     } catch (IOException e) {
2747       LOG.warn("Failed to close " + region.getRegionNameAsString() +
2748           " - ignoring and continuing", e);
2749     }
2750   }
2751 
2752   /**
2753    * Close asynchronously a region, can be called from the master or internally by the regionserver
2754    * when stopping. If called from the master, the region will update the znode status.
2755    *
2756    * <p>
2757    * If an opening was in progress, this method will cancel it, but will not start a new close. The
2758    * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2759    * </p>
2760 
2761    * <p>
2762    *   If a close was in progress, this new request will be ignored, and an exception thrown.
2763    * </p>
2764    *
2765    * @param encodedName Region to close
2766    * @param abort True if we are aborting
2767    * @param crd details about closing region coordination-coordinated task
2768    * @return True if closed a region.
2769    * @throws NotServingRegionException if the region is not online
2770    * @throws RegionAlreadyInTransitionException if the region is already closing
2771    */
2772   protected boolean closeRegion(String encodedName, final boolean abort,
2773       CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2774       throws NotServingRegionException, RegionAlreadyInTransitionException {
2775     //Check for permissions to close.
2776     Region actualRegion = this.getFromOnlineRegions(encodedName);
2777     if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2778       try {
2779         actualRegion.getCoprocessorHost().preClose(false);
2780       } catch (IOException exp) {
2781         LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2782         return false;
2783       }
2784     }
2785 
2786     final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2787         Boolean.FALSE);
2788 
2789     if (Boolean.TRUE.equals(previous)) {
2790       LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2791           "trying to OPEN. Cancelling OPENING.");
2792       if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2793         // The replace failed. That should be an exceptional case, but theoretically it can happen.
2794         // We're going to try to do a standard close then.
2795         LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2796             " Doing a standard close now");
2797         return closeRegion(encodedName, abort, crd, sn);
2798       }
2799       // Let's get the region from the online region list again
2800       actualRegion = this.getFromOnlineRegions(encodedName);
2801       if (actualRegion == null) { // If already online, we still need to close it.
2802         LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2803         // The master deletes the znode when it receives this exception.
2804         throw new RegionAlreadyInTransitionException("The region " + encodedName +
2805           " was opening but not yet served. Opening is cancelled.");
2806       }
2807     } else if (Boolean.FALSE.equals(previous)) {
2808       LOG.info("Received CLOSE for the region: " + encodedName +
2809         ", which we are already trying to CLOSE, but not completed yet");
2810       // The master will retry till the region is closed. We need to do this since
2811       // the region could fail to close somehow. If we mark the region closed in master
2812       // while it is not, there could be data loss.
2813       // If the region stuck in closing for a while, and master runs out of retries,
2814       // master will move the region to failed_to_close. Later on, if the region
2815       // is indeed closed, master can properly re-assign it.
2816       throw new RegionAlreadyInTransitionException("The region " + encodedName +
2817         " was already closing. New CLOSE request is ignored.");
2818     }
2819 
2820     if (actualRegion == null) {
2821       LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2822       this.regionsInTransitionInRS.remove(encodedName.getBytes());
2823       // The master deletes the znode when it receives this exception.
2824       throw new NotServingRegionException("The region " + encodedName +
2825           " is not online, and is not opening.");
2826     }
2827 
2828     CloseRegionHandler crh;
2829     final HRegionInfo hri = actualRegion.getRegionInfo();
2830     if (hri.isMetaRegion()) {
2831       crh = new CloseMetaHandler(this, this, hri, abort,
2832         csm.getCloseRegionCoordination(), crd);
2833     } else {
2834       crh = new CloseRegionHandler(this, this, hri, abort,
2835         csm.getCloseRegionCoordination(), crd, sn);
2836     }
2837     this.service.submit(crh);
2838     return true;
2839   }
2840 
2841    /**
2842    * @param regionName
2843    * @return HRegion for the passed binary <code>regionName</code> or null if
2844    *         named region is not member of the online regions.
2845    */
2846   public Region getOnlineRegion(final byte[] regionName) {
2847     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2848     return this.onlineRegions.get(encodedRegionName);
2849   }
2850 
2851   public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2852     return this.regionFavoredNodesMap.get(encodedRegionName);
2853   }
2854 
2855   @Override
2856   public Region getFromOnlineRegions(final String encodedRegionName) {
2857     return this.onlineRegions.get(encodedRegionName);
2858   }
2859 
2860 
2861   @Override
2862   public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
2863     Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2864     if (destination != null) {
2865       long closeSeqNum = r.getMaxFlushedSeqId();
2866       if (closeSeqNum == HConstants.NO_SEQNUM) {
2867         // No edits in WAL for this region; get the sequence number when the region was opened.
2868         closeSeqNum = r.getOpenSeqNum();
2869         if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
2870       }
2871       addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2872     }
2873     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2874     return toReturn != null;
2875   }
2876 
2877   /**
2878    * Protected utility method for safely obtaining an HRegion handle.
2879    *
2880    * @param regionName
2881    *          Name of online {@link Region} to return
2882    * @return {@link Region} for <code>regionName</code>
2883    * @throws NotServingRegionException
2884    */
2885   protected Region getRegion(final byte[] regionName)
2886       throws NotServingRegionException {
2887     String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2888     return getRegionByEncodedName(regionName, encodedRegionName);
2889   }
2890 
2891   public Region getRegionByEncodedName(String encodedRegionName)
2892       throws NotServingRegionException {
2893     return getRegionByEncodedName(null, encodedRegionName);
2894   }
2895 
2896   protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2897     throws NotServingRegionException {
2898     Region region = this.onlineRegions.get(encodedRegionName);
2899     if (region == null) {
2900       MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2901       if (moveInfo != null) {
2902         throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2903       }
2904       Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2905       String regionNameStr = regionName == null?
2906         encodedRegionName: Bytes.toStringBinary(regionName);
2907       if (isOpening != null && isOpening.booleanValue()) {
2908         throw new RegionOpeningException("Region " + regionNameStr +
2909           " is opening on " + this.serverName);
2910       }
2911       throw new NotServingRegionException("Region " + regionNameStr +
2912         " is not online on " + this.serverName);
2913     }
2914     return region;
2915   }
2916 
2917   /*
2918    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
2919    * IOE if it isn't already.
2920    *
2921    * @param t Throwable
2922    *
2923    * @param msg Message to log in error. Can be null.
2924    *
2925    * @return Throwable converted to an IOE; methods can only let out IOEs.
2926    */
2927   private Throwable cleanup(final Throwable t, final String msg) {
2928     // Don't log as error if NSRE; NSRE is 'normal' operation.
2929     if (t instanceof NotServingRegionException) {
2930       LOG.debug("NotServingRegionException; " + t.getMessage());
2931       return t;
2932     }
2933     if (msg == null) {
2934       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2935     } else {
2936       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2937     }
2938     if (!rpcServices.checkOOME(t)) {
2939       checkFileSystem();
2940     }
2941     return t;
2942   }
2943 
2944   /*
2945    * @param t
2946    *
2947    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
2948    *
2949    * @return Make <code>t</code> an IOE if it isn't already.
2950    */
2951   protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2952     return (t instanceof IOException ? (IOException) t : msg == null
2953         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2954   }
2955 
2956   /**
2957    * Checks to see if the file system is still accessible. If not, sets
2958    * abortRequested and stopRequested
2959    *
2960    * @return false if file system is not available
2961    */
2962   public boolean checkFileSystem() {
2963     if (this.fsOk && this.fs != null) {
2964       try {
2965         FSUtils.checkFileSystemAvailable(this.fs);
2966       } catch (IOException e) {
2967         abort("File System not available", e);
2968         this.fsOk = false;
2969       }
2970     }
2971     return this.fsOk;
2972   }
2973 
2974   @Override
2975   public void updateRegionFavoredNodesMapping(String encodedRegionName,
2976       List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2977     InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2978     // Refer to the comment on the declaration of regionFavoredNodesMap on why
2979     // it is a map of region name to InetSocketAddress[]
2980     for (int i = 0; i < favoredNodes.size(); i++) {
2981       addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2982           favoredNodes.get(i).getPort());
2983     }
2984     regionFavoredNodesMap.put(encodedRegionName, addr);
2985   }
2986 
2987   /**
2988    * Return the favored nodes for a region given its encoded name. Look at the
2989    * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
2990    * @param encodedRegionName
2991    * @return array of favored locations
2992    */
2993   @Override
2994   public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2995     return regionFavoredNodesMap.get(encodedRegionName);
2996   }
2997 
2998   @Override
2999   public ServerNonceManager getNonceManager() {
3000     return this.nonceManager;
3001   }
3002 
3003   private static class MovedRegionInfo {
3004     private final ServerName serverName;
3005     private final long seqNum;
3006     private final long ts;
3007 
3008     public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3009       this.serverName = serverName;
3010       this.seqNum = closeSeqNum;
3011       ts = EnvironmentEdgeManager.currentTime();
3012      }
3013 
3014     public ServerName getServerName() {
3015       return serverName;
3016     }
3017 
3018     public long getSeqNum() {
3019       return seqNum;
3020     }
3021 
3022     public long getMoveTime() {
3023       return ts;
3024     }
3025   }
3026 
3027   // This map will contains all the regions that we closed for a move.
3028   //  We add the time it was moved as we don't want to keep too old information
3029   protected Map<String, MovedRegionInfo> movedRegions =
3030       new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3031 
3032   // We need a timeout. If not there is a risk of giving a wrong information: this would double
3033   //  the number of network calls instead of reducing them.
3034   private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3035 
3036   protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3037     if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3038       LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3039       return;
3040     }
3041     LOG.info("Adding moved region record: "
3042       + encodedName + " to " + destination + " as of " + closeSeqNum);
3043     movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3044   }
3045 
3046   void removeFromMovedRegions(String encodedName) {
3047     movedRegions.remove(encodedName);
3048   }
3049 
3050   private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3051     MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3052 
3053     long now = EnvironmentEdgeManager.currentTime();
3054     if (dest != null) {
3055       if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3056         return dest;
3057       } else {
3058         movedRegions.remove(encodedRegionName);
3059       }
3060     }
3061 
3062     return null;
3063   }
3064 
3065   /**
3066    * Remove the expired entries from the moved regions list.
3067    */
3068   protected void cleanMovedRegions() {
3069     final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3070     Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3071 
3072     while (it.hasNext()){
3073       Map.Entry<String, MovedRegionInfo> e = it.next();
3074       if (e.getValue().getMoveTime() < cutOff) {
3075         it.remove();
3076       }
3077     }
3078   }
3079 
3080   /*
3081    * Use this to allow tests to override and schedule more frequently.
3082    */
3083 
3084   protected int movedRegionCleanerPeriod() {
3085         return TIMEOUT_REGION_MOVED;
3086   }
3087 
3088   /**
3089    * Creates a Chore thread to clean the moved region cache.
3090    */
3091 
3092   protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3093     private HRegionServer regionServer;
3094     Stoppable stoppable;
3095 
3096     private MovedRegionsCleaner(
3097       HRegionServer regionServer, Stoppable stoppable){
3098       super("MovedRegionsCleaner for region " + regionServer, stoppable,
3099           regionServer.movedRegionCleanerPeriod());
3100       this.regionServer = regionServer;
3101       this.stoppable = stoppable;
3102     }
3103 
3104     static MovedRegionsCleaner create(HRegionServer rs){
3105       Stoppable stoppable = new Stoppable() {
3106         private volatile boolean isStopped = false;
3107         @Override public void stop(String why) { isStopped = true;}
3108         @Override public boolean isStopped() {return isStopped;}
3109       };
3110 
3111       return new MovedRegionsCleaner(rs, stoppable);
3112     }
3113 
3114     @Override
3115     protected void chore() {
3116       regionServer.cleanMovedRegions();
3117     }
3118 
3119     @Override
3120     public void stop(String why) {
3121       stoppable.stop(why);
3122     }
3123 
3124     @Override
3125     public boolean isStopped() {
3126       return stoppable.isStopped();
3127     }
3128   }
3129 
3130   private String getMyEphemeralNodePath() {
3131     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3132   }
3133 
3134   private boolean isHealthCheckerConfigured() {
3135     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3136     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3137   }
3138 
3139   /**
3140    * @return the underlying {@link CompactSplitThread} for the servers
3141    */
3142   public CompactSplitThread getCompactSplitThread() {
3143     return this.compactSplitThread;
3144   }
3145 
3146   /**
3147    * A helper function to store the last flushed sequence Id with the previous failed RS for a
3148    * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
3149    * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
3150    * @throws KeeperException
3151    * @throws IOException
3152    */
3153   private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3154       IOException {
3155     if (!r.isRecovering()) {
3156       // return immdiately for non-recovering regions
3157       return;
3158     }
3159 
3160     HRegionInfo regionInfo = r.getRegionInfo();
3161     ZooKeeperWatcher zkw = getZooKeeper();
3162     String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3163     Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3164     long minSeqIdForLogReplay = -1;
3165     for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3166       if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3167         minSeqIdForLogReplay = storeSeqIdForReplay;
3168       }
3169     }
3170 
3171     try {
3172       long lastRecordedFlushedSequenceId = -1;
3173       String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3174         regionInfo.getEncodedName());
3175       // recovering-region level
3176       byte[] data;
3177       try {
3178         data = ZKUtil.getData(zkw, nodePath);
3179       } catch (InterruptedException e) {
3180         throw new InterruptedIOException();
3181       }
3182       if (data != null) {
3183         lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3184       }
3185       if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3186         ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3187       }
3188       if (previousRSName != null) {
3189         // one level deeper for the failed RS
3190         nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3191         ZKUtil.setData(zkw, nodePath,
3192           ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3193         LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3194           " for " + previousRSName);
3195       } else {
3196         LOG.warn("Can't find failed region server for recovering region " +
3197             regionInfo.getEncodedName());
3198       }
3199     } catch (NoNodeException ignore) {
3200       LOG.debug("Region " + regionInfo.getEncodedName() +
3201         " must have completed recovery because its recovery znode has been removed", ignore);
3202     }
3203   }
3204 
3205   /**
3206    * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3207    * @param encodedRegionName
3208    * @throws KeeperException
3209    */
3210   private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3211     String result = null;
3212     long maxZxid = 0;
3213     ZooKeeperWatcher zkw = this.getZooKeeper();
3214     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3215     List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3216     if (failedServers == null || failedServers.isEmpty()) {
3217       return result;
3218     }
3219     for (String failedServer : failedServers) {
3220       String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3221       Stat stat = new Stat();
3222       ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3223       if (maxZxid < stat.getCzxid()) {
3224         maxZxid = stat.getCzxid();
3225         result = failedServer;
3226       }
3227     }
3228     return result;
3229   }
3230 
3231   public CoprocessorServiceResponse execRegionServerService(
3232       @SuppressWarnings("UnusedParameters") final RpcController controller,
3233       final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3234     try {
3235       ServerRpcController serviceController = new ServerRpcController();
3236       CoprocessorServiceCall call = serviceRequest.getCall();
3237       String serviceName = call.getServiceName();
3238       String methodName = call.getMethodName();
3239       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3240         throw new UnknownProtocolException(null,
3241             "No registered coprocessor service found for name " + serviceName);
3242       }
3243       Service service = coprocessorServiceHandlers.get(serviceName);
3244       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3245       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3246       if (methodDesc == null) {
3247         throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3248             + " called on service " + serviceName);
3249       }
3250       Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
3251       ProtobufUtil.mergeFrom(builderForType, call.getRequest());
3252       Message request = builderForType.build();
3253       final Message.Builder responseBuilder =
3254           service.getResponsePrototype(methodDesc).newBuilderForType();
3255       service.callMethod(methodDesc, serviceController, request, new RpcCallback<Message>() {
3256         @Override
3257         public void run(Message message) {
3258           if (message != null) {
3259             responseBuilder.mergeFrom(message);
3260           }
3261         }
3262       });
3263       IOException exception = ResponseConverter.getControllerException(serviceController);
3264       if (exception != null) {
3265         throw exception;
3266       }
3267       Message execResult = responseBuilder.build();
3268       ClientProtos.CoprocessorServiceResponse.Builder builder =
3269           ClientProtos.CoprocessorServiceResponse.newBuilder();
3270       builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3271         HConstants.EMPTY_BYTE_ARRAY));
3272       builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3273           .setValue(execResult.toByteString()));
3274       return builder.build();
3275     } catch (IOException ie) {
3276       throw new ServiceException(ie);
3277     }
3278   }
3279 
3280   /**
3281    * @return The cache config instance used by the regionserver.
3282    */
3283   public CacheConfig getCacheConfig() {
3284     return this.cacheConfig;
3285   }
3286 
3287   /**
3288    * @return : Returns the ConfigurationManager object for testing purposes.
3289    */
3290   protected ConfigurationManager getConfigurationManager() {
3291     return configurationManager;
3292   }
3293 
3294   /**
3295    * @return Return table descriptors implementation.
3296    */
3297   public TableDescriptors getTableDescriptors() {
3298     return this.tableDescriptors;
3299   }
3300 
3301   /**
3302    * Reload the configuration from disk.
3303    */
3304   public void updateConfiguration() {
3305     LOG.info("Reloading the configuration from disk.");
3306     // Reload the configuration from disk.
3307     conf.reloadConfiguration();
3308     configurationManager.notifyAllObservers(conf);
3309   }
3310 
3311   @Override
3312   public HeapMemoryManager getHeapMemoryManager() {
3313     return hMemManager;
3314   }
3315 
3316   @Override
3317   public double getCompactionPressure() {
3318     double max = 0;
3319     for (Region region : onlineRegions.values()) {
3320       for (Store store : region.getStores()) {
3321         double normCount = store.getCompactionPressure();
3322         if (normCount > max) {
3323           max = normCount;
3324         }
3325       }
3326     }
3327     return max;
3328   }
3329 
3330   /**
3331    * For testing
3332    * @return whether all wal roll request finished for this regionserver
3333    */
3334   @VisibleForTesting
3335   public boolean walRollRequestFinished() {
3336     return this.walRoller.walRollFinished();
3337   }
3338 }