1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.lang.management.ManagementFactory;
25 import java.lang.management.MemoryUsage;
26 import java.lang.reflect.Constructor;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Comparator;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.Set;
41 import java.util.SortedMap;
42 import java.util.TreeMap;
43 import java.util.TreeSet;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.ConcurrentMap;
46 import java.util.concurrent.ConcurrentSkipListMap;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.concurrent.locks.ReentrantReadWriteLock;
50
51 import javax.management.MalformedObjectNameException;
52 import javax.management.ObjectName;
53 import javax.servlet.http.HttpServlet;
54
55 import org.apache.commons.lang.SystemUtils;
56 import org.apache.commons.lang.math.RandomUtils;
57 import org.apache.commons.logging.Log;
58 import org.apache.commons.logging.LogFactory;
59 import org.apache.hadoop.conf.Configuration;
60 import org.apache.hadoop.fs.FileSystem;
61 import org.apache.hadoop.fs.Path;
62 import org.apache.hadoop.hbase.ChoreService;
63 import org.apache.hadoop.hbase.ClockOutOfSyncException;
64 import org.apache.hadoop.hbase.CoordinatedStateManager;
65 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
66 import org.apache.hadoop.hbase.HBaseConfiguration;
67 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
68 import org.apache.hadoop.hbase.HConstants;
69 import org.apache.hadoop.hbase.HRegionInfo;
70 import org.apache.hadoop.hbase.HealthCheckChore;
71 import org.apache.hadoop.hbase.MetaTableAccessor;
72 import org.apache.hadoop.hbase.NotServingRegionException;
73 import org.apache.hadoop.hbase.RemoteExceptionHandler;
74 import org.apache.hadoop.hbase.ScheduledChore;
75 import org.apache.hadoop.hbase.ServerName;
76 import org.apache.hadoop.hbase.Stoppable;
77 import org.apache.hadoop.hbase.TableDescriptors;
78 import org.apache.hadoop.hbase.TableName;
79 import org.apache.hadoop.hbase.YouAreDeadException;
80 import org.apache.hadoop.hbase.ZNodeClearer;
81 import org.apache.hadoop.hbase.classification.InterfaceAudience;
82 import org.apache.hadoop.hbase.client.ClusterConnection;
83 import org.apache.hadoop.hbase.client.ConnectionUtils;
84 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
85 import org.apache.hadoop.hbase.conf.ConfigurationManager;
86 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
87 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
88 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
89 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
90 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
91 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
92 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
93 import org.apache.hadoop.hbase.executor.ExecutorService;
94 import org.apache.hadoop.hbase.executor.ExecutorType;
95 import org.apache.hadoop.hbase.fs.HFileSystem;
96 import org.apache.hadoop.hbase.http.InfoServer;
97 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
98 import org.apache.hadoop.hbase.ipc.RpcClient;
99 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
100 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
101 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
102 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
103 import org.apache.hadoop.hbase.ipc.ServerRpcController;
104 import org.apache.hadoop.hbase.master.HMaster;
105 import org.apache.hadoop.hbase.master.RegionState.State;
106 import org.apache.hadoop.hbase.master.TableLockManager;
107 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
108 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
109 import org.apache.hadoop.hbase.protobuf.RequestConverter;
110 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
111 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
112 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
113 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
116 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
117 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor.Builder;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
122 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
123 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
124 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
130 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
131 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
132 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
135 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
136 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
137 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
138 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
139 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
140 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
141 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
142 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
143 import org.apache.hadoop.hbase.security.Superusers;
144 import org.apache.hadoop.hbase.security.UserProvider;
145 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
146 import org.apache.hadoop.hbase.util.Addressing;
147 import org.apache.hadoop.hbase.util.ByteStringer;
148 import org.apache.hadoop.hbase.util.Bytes;
149 import org.apache.hadoop.hbase.util.CompressionTest;
150 import org.apache.hadoop.hbase.util.ConfigUtil;
151 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
152 import org.apache.hadoop.hbase.util.FSTableDescriptors;
153 import org.apache.hadoop.hbase.util.FSUtils;
154 import org.apache.hadoop.hbase.util.HasThread;
155 import org.apache.hadoop.hbase.util.JSONBean;
156 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
157 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
158 import org.apache.hadoop.hbase.util.Sleeper;
159 import org.apache.hadoop.hbase.util.Threads;
160 import org.apache.hadoop.hbase.util.VersionInfo;
161 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
162 import org.apache.hadoop.hbase.wal.WAL;
163 import org.apache.hadoop.hbase.wal.WALFactory;
164 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
165 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
166 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
167 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
168 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
169 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
170 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
171 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
172 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
173 import org.apache.hadoop.ipc.RemoteException;
174 import org.apache.hadoop.metrics.util.MBeanUtil;
175 import org.apache.hadoop.util.ReflectionUtils;
176 import org.apache.hadoop.util.StringUtils;
177 import org.apache.zookeeper.KeeperException;
178 import org.apache.zookeeper.KeeperException.NoNodeException;
179 import org.apache.zookeeper.data.Stat;
180
181 import com.google.common.annotations.VisibleForTesting;
182 import com.google.common.base.Preconditions;
183 import com.google.common.collect.Maps;
184 import com.google.protobuf.BlockingRpcChannel;
185 import com.google.protobuf.Descriptors;
186 import com.google.protobuf.Message;
187 import com.google.protobuf.RpcCallback;
188 import com.google.protobuf.RpcController;
189 import com.google.protobuf.Service;
190 import com.google.protobuf.ServiceException;
191 import sun.misc.Signal;
192 import sun.misc.SignalHandler;
193
194
195
196
197
198 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
199 @SuppressWarnings("deprecation")
200 public class HRegionServer extends HasThread implements
201 RegionServerServices, LastSequenceId {
202
203 private static final Log LOG = LogFactory.getLog(HRegionServer.class);
204
205
206
207
208
209 protected static final String OPEN = "OPEN";
210 protected static final String CLOSE = "CLOSE";
211
212
213
214
215 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
216 new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
217
218
219 protected MemStoreFlusher cacheFlusher;
220
221 protected HeapMemoryManager hMemManager;
222
223
224
225
226
227
228 protected ClusterConnection clusterConnection;
229
230
231
232
233
234
235
236 protected MetaTableLocator metaTableLocator;
237
238
239 @SuppressWarnings("unused")
240 private RecoveringRegionWatcher recoveringRegionWatcher;
241
242
243
244
245 protected TableDescriptors tableDescriptors;
246
247
248 protected ReplicationSourceService replicationSourceHandler;
249 protected ReplicationSinkService replicationSinkHandler;
250
251
252 public CompactSplitThread compactSplitThread;
253
254
255
256
257
258 protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
259
260
261
262
263
264
265
266
267
268
269 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
270 new ConcurrentHashMap<String, InetSocketAddress[]>();
271
272
273
274
275
276 protected final Map<String, Region> recoveringRegions = Collections
277 .synchronizedMap(new HashMap<String, Region>());
278
279
280 protected Leases leases;
281
282
283 protected ExecutorService service;
284
285
286 protected volatile boolean fsOk;
287 protected HFileSystem fs;
288
289
290
291
292 private volatile boolean stopped = false;
293
294
295
296 private volatile boolean abortRequested;
297
298 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
299
300
301
302 private boolean stopping = false;
303
304 private volatile boolean killed = false;
305
306 protected final Configuration conf;
307
308 private Path rootDir;
309
310 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
311
312 final int numRetries;
313 protected final int threadWakeFrequency;
314 protected final int msgInterval;
315
316 protected final int numRegionsToReport;
317
318
319 private volatile RegionServerStatusService.BlockingInterface rssStub;
320
321 RpcClient rpcClient;
322
323 private RpcRetryingCallerFactory rpcRetryingCallerFactory;
324 private RpcControllerFactory rpcControllerFactory;
325
326 private UncaughtExceptionHandler uncaughtExceptionHandler;
327
328
329
330
331 protected InfoServer infoServer;
332 private JvmPauseMonitor pauseMonitor;
333
334
335 public static final String REGIONSERVER = "regionserver";
336
337 MetricsRegionServer metricsRegionServer;
338 private SpanReceiverHost spanReceiverHost;
339
340
341
342
343 private final ChoreService choreService;
344
345
346
347
348 ScheduledChore compactionChecker;
349
350
351
352
353 ScheduledChore periodicFlusher;
354
355 protected volatile WALFactory walFactory;
356
357
358
359 final LogRoller walRoller;
360
361 final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
362
363
364 final AtomicBoolean online = new AtomicBoolean(false);
365
366
367 protected ZooKeeperWatcher zooKeeper;
368
369
370 private MasterAddressTracker masterAddressTracker;
371
372
373 protected ClusterStatusTracker clusterStatusTracker;
374
375
376 private SplitLogWorker splitLogWorker;
377
378
379 protected final Sleeper sleeper;
380
381 private final int operationTimeout;
382 private final int shortOperationTimeout;
383
384 private final RegionServerAccounting regionServerAccounting;
385
386
387 protected CacheConfig cacheConfig;
388
389
390 private HealthCheckChore healthCheckChore;
391
392
393 private ScheduledChore nonceManagerChore;
394
395 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
396
397
398
399
400
401
402 protected ServerName serverName;
403
404
405
406
407 private String useThisHostnameInstead;
408
409
410
411
412 final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
413
414 final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
415
416
417
418
419 protected final long startcode;
420
421
422
423
424 private String clusterId;
425
426
427
428
429 private ObjectName mxBean = null;
430
431
432
433
434 private MovedRegionsCleaner movedRegionsCleaner;
435
436
437 private StorefileRefresherChore storefileRefresher;
438
439 private RegionServerCoprocessorHost rsHost;
440
441 private RegionServerProcedureManagerHost rspmHost;
442
443 private RegionServerQuotaManager rsQuotaManager;
444
445
446 protected TableLockManager tableLockManager;
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466 final ServerNonceManager nonceManager;
467
468 private UserProvider userProvider;
469
470 protected final RSRpcServices rpcServices;
471
472 protected BaseCoordinatedStateManager csm;
473
474 private final boolean useZKForAssignment;
475
476
477
478
479
480 protected final ConfigurationManager configurationManager;
481
482
483
484
485
486
487
488 public HRegionServer(Configuration conf) throws IOException, InterruptedException {
489 this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
490 }
491
492
493
494
495
496
497
498
499 public HRegionServer(Configuration conf, CoordinatedStateManager csm)
500 throws IOException, InterruptedException {
501 this.fsOk = true;
502 this.conf = conf;
503 checkCodecs(this.conf);
504 this.userProvider = UserProvider.instantiate(conf);
505 Superusers.initialize(conf);
506 FSUtils.setupShortCircuitRead(this.conf);
507
508 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
509
510
511 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
512 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
513 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
514 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
515
516 this.sleeper = new Sleeper(this.msgInterval, this);
517
518 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
519 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
520
521 this.numRegionsToReport = conf.getInt(
522 "hbase.regionserver.numregionstoreport", 10);
523
524 this.operationTimeout = conf.getInt(
525 HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
526 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
527
528 this.shortOperationTimeout = conf.getInt(
529 HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
530 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
531
532 this.abortRequested = false;
533 this.stopped = false;
534
535 rpcServices = createRpcServices();
536 this.startcode = System.currentTimeMillis();
537 if (this instanceof HMaster) {
538 useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
539 } else {
540 useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
541 }
542 String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
543 rpcServices.isa.getHostName();
544 serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
545
546 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
547 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
548
549
550 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
551 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
552
553 login(userProvider, hostName);
554
555 regionServerAccounting = new RegionServerAccounting();
556 uncaughtExceptionHandler = new UncaughtExceptionHandler() {
557 @Override
558 public void uncaughtException(Thread t, Throwable e) {
559 abort("Uncaught exception in service thread " + t.getName(), e);
560 }
561 };
562
563 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
564
565
566
567
568 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
569
570
571 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
572 this.fs = new HFileSystem(this.conf, useHBaseChecksum);
573 this.rootDir = FSUtils.getRootDir(this.conf);
574 this.tableDescriptors = new FSTableDescriptors(
575 this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
576
577 service = new ExecutorService(getServerName().toShortString());
578 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
579
580
581 if (!conf.getBoolean("hbase.testing.nocluster", false)) {
582
583 zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
584 rpcServices.isa.getPort(), this, canCreateBaseZNode());
585
586 this.csm = (BaseCoordinatedStateManager) csm;
587 this.csm.initialize(this);
588 this.csm.start();
589
590 tableLockManager = TableLockManager.createTableLockManager(
591 conf, zooKeeper, serverName);
592
593 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
594 masterAddressTracker.start();
595
596 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
597 clusterStatusTracker.start();
598 }
599 this.configurationManager = new ConfigurationManager();
600
601 rpcServices.start();
602 putUpWebUI();
603 this.walRoller = new LogRoller(this, this);
604 this.choreService = new ChoreService(getServerName().toString(), true);
605
606 if (!SystemUtils.IS_OS_WINDOWS) {
607 Signal.handle(new Signal("HUP"), new SignalHandler() {
608 public void handle(Signal signal) {
609 getConfiguration().reloadConfiguration();
610 configurationManager.notifyAllObservers(getConfiguration());
611 }
612 });
613 }
614 }
615
616
617
618
619 protected boolean shouldUseThisHostnameInstead() {
620 return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
621 }
622
623 protected void login(UserProvider user, String host) throws IOException {
624 user.login("hbase.regionserver.keytab.file",
625 "hbase.regionserver.kerberos.principal", host);
626 }
627
628 protected void waitForMasterActive(){
629 }
630
631 protected String getProcessName() {
632 return REGIONSERVER;
633 }
634
635 protected boolean canCreateBaseZNode() {
636 return false;
637 }
638
639 protected boolean canUpdateTableDescriptor() {
640 return false;
641 }
642
643 protected RSRpcServices createRpcServices() throws IOException {
644 return new RSRpcServices(this);
645 }
646
647 protected void configureInfoServer() {
648 infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
649 infoServer.setAttribute(REGIONSERVER, this);
650 }
651
652 protected Class<? extends HttpServlet> getDumpServlet() {
653 return RSDumpServlet.class;
654 }
655
656 protected void doMetrics() {
657 }
658
659 @Override
660 public boolean registerService(Service instance) {
661
662
663
664 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
665 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
666 LOG.error("Coprocessor service " + serviceDesc.getFullName()
667 + " already registered, rejecting request from " + instance);
668 return false;
669 }
670
671 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
672 if (LOG.isDebugEnabled()) {
673 LOG.debug("Registered regionserver coprocessor service: service="+serviceDesc.getFullName());
674 }
675 return true;
676 }
677
678
679
680
681
682
683
684 @VisibleForTesting
685 protected ClusterConnection createClusterConnection() throws IOException {
686
687
688
689 return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
690 serverName, rpcServices, rpcServices);
691 }
692
693
694
695
696
697
698 private static void checkCodecs(final Configuration c) throws IOException {
699
700 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
701 if (codecs == null) return;
702 for (String codec : codecs) {
703 if (!CompressionTest.testCompression(codec)) {
704 throw new IOException("Compression codec " + codec +
705 " not supported, aborting RS construction");
706 }
707 }
708 }
709
710 public String getClusterId() {
711 return this.clusterId;
712 }
713
714
715
716
717
718 protected synchronized void setupClusterConnection() throws IOException {
719 if (clusterConnection == null) {
720 clusterConnection = createClusterConnection();
721 metaTableLocator = new MetaTableLocator();
722 }
723 }
724
725
726
727
728
729
730
731 private void preRegistrationInitialization(){
732 try {
733 setupClusterConnection();
734
735
736 if (isHealthCheckerConfigured()) {
737 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
738 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
739 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
740 }
741 this.pauseMonitor = new JvmPauseMonitor(conf);
742 pauseMonitor.start();
743
744 initializeZooKeeper();
745 if (!isStopped() && !isAborted()) {
746 initializeThreads();
747 }
748 } catch (Throwable t) {
749
750
751 this.rpcServices.stop();
752 abort("Initialization of RS failed. Hence aborting RS.", t);
753 }
754 }
755
756
757
758
759
760
761
762
763
764 private void initializeZooKeeper() throws IOException, InterruptedException {
765
766
767
768 blockAndCheckIfStopped(this.masterAddressTracker);
769
770
771
772 blockAndCheckIfStopped(this.clusterStatusTracker);
773
774
775
776
777 try {
778 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
779 if (clusterId == null) {
780 this.abort("Cluster ID has not been set");
781 }
782 LOG.info("ClusterId : "+clusterId);
783 } catch (KeeperException e) {
784 this.abort("Failed to retrieve Cluster ID",e);
785 }
786
787
788
789
790
791 waitForMasterActive();
792 if (isStopped() || isAborted()) {
793 return;
794 }
795
796
797 try {
798 rspmHost = new RegionServerProcedureManagerHost();
799 rspmHost.loadProcedures(conf);
800 rspmHost.initialize(this);
801 } catch (KeeperException e) {
802 this.abort("Failed to reach zk cluster when creating procedure handler.", e);
803 }
804
805 this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
806 }
807
808
809
810
811
812
813
814
815 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
816 throws IOException, InterruptedException {
817 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
818 if (this.stopped) {
819 throw new IOException("Received the shutdown message while waiting.");
820 }
821 }
822 }
823
824
825
826
827 private boolean isClusterUp() {
828 return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
829 }
830
831 private void initializeThreads() throws IOException {
832
833 this.cacheFlusher = new MemStoreFlusher(conf, this);
834
835
836 this.compactSplitThread = new CompactSplitThread(this);
837
838
839
840 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
841 this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
842 this.leases = new Leases(this.threadWakeFrequency);
843
844
845 movedRegionsCleaner = MovedRegionsCleaner.create(this);
846
847 if (this.nonceManager != null) {
848
849 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
850 }
851
852
853 rsQuotaManager = new RegionServerQuotaManager(this);
854
855
856 rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
857 rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
858
859 boolean onlyMetaRefresh = false;
860 int storefileRefreshPeriod = conf.getInt(
861 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
862 , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
863 if (storefileRefreshPeriod == 0) {
864 storefileRefreshPeriod = conf.getInt(
865 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
866 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
867 onlyMetaRefresh = true;
868 }
869 if (storefileRefreshPeriod > 0) {
870 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
871 onlyMetaRefresh, this, this);
872 }
873 registerConfigurationObservers();
874 }
875
876 private void registerConfigurationObservers() {
877
878 configurationManager.registerObserver(this.compactSplitThread);
879 configurationManager.registerObserver(this.rpcServices);
880 }
881
882
883
884
885 @Override
886 public void run() {
887 try {
888
889 preRegistrationInitialization();
890 } catch (Throwable e) {
891 abort("Fatal exception during initialization", e);
892 }
893
894 try {
895 if (!isStopped() && !isAborted()) {
896 ShutdownHook.install(conf, fs, this, Thread.currentThread());
897
898 createMyEphemeralNode();
899
900
901 this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
902 }
903
904
905
906 while (keepLooping()) {
907 RegionServerStartupResponse w = reportForDuty();
908 if (w == null) {
909 LOG.warn("reportForDuty failed; sleeping and then retrying.");
910 this.sleeper.sleep();
911 } else {
912 handleReportForDutyResponse(w);
913 break;
914 }
915 }
916
917 if (!isStopped() && isHealthy()){
918
919
920 rspmHost.start();
921 }
922
923
924 if (this.rsQuotaManager != null) {
925 rsQuotaManager.start(getRpcServer().getScheduler());
926 }
927
928
929 long lastMsg = System.currentTimeMillis();
930 long oldRequestCount = -1;
931
932 while (!isStopped() && isHealthy()) {
933 if (!isClusterUp()) {
934 if (isOnlineRegionsEmpty()) {
935 stop("Exiting; cluster shutdown set and not carrying any regions");
936 } else if (!this.stopping) {
937 this.stopping = true;
938 LOG.info("Closing user regions");
939 closeUserRegions(this.abortRequested);
940 } else if (this.stopping) {
941 boolean allUserRegionsOffline = areAllUserRegionsOffline();
942 if (allUserRegionsOffline) {
943
944
945
946 if (oldRequestCount == getWriteRequestCount()) {
947 stop("Stopped; only catalog regions remaining online");
948 break;
949 }
950 oldRequestCount = getWriteRequestCount();
951 } else {
952
953
954
955 closeUserRegions(this.abortRequested);
956 }
957 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
958 }
959 }
960 long now = System.currentTimeMillis();
961 if ((now - lastMsg) >= msgInterval) {
962 tryRegionServerReport(lastMsg, now);
963 lastMsg = System.currentTimeMillis();
964 doMetrics();
965 }
966 if (!isStopped() && !isAborted()) {
967 this.sleeper.sleep();
968 }
969 }
970 } catch (Throwable t) {
971 if (!rpcServices.checkOOME(t)) {
972 String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
973 abort(prefix + t.getMessage(), t);
974 }
975 }
976
977 if (mxBean != null) {
978 MBeanUtil.unregisterMBean(mxBean);
979 mxBean = null;
980 }
981 if (this.leases != null) this.leases.closeAfterLeasesExpire();
982 if (this.splitLogWorker != null) {
983 splitLogWorker.stop();
984 }
985 if (this.infoServer != null) {
986 LOG.info("Stopping infoServer");
987 try {
988 this.infoServer.stop();
989 } catch (Exception e) {
990 LOG.error("Failed to stop infoServer", e);
991 }
992 }
993
994 if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
995 cacheConfig.getBlockCache().shutdown();
996 }
997
998 if (movedRegionsCleaner != null) {
999 movedRegionsCleaner.stop("Region Server stopping");
1000 }
1001
1002
1003
1004 if (this.hMemManager != null) this.hMemManager.stop();
1005 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1006 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1007 if (this.compactionChecker != null) this.compactionChecker.cancel(true);
1008 if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
1009 if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
1010 if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
1011 sendShutdownInterrupt();
1012
1013
1014 if (rsQuotaManager != null) {
1015 rsQuotaManager.stop();
1016 }
1017
1018
1019 if (rspmHost != null) {
1020 rspmHost.stop(this.abortRequested || this.killed);
1021 }
1022
1023 if (this.killed) {
1024
1025 } else if (abortRequested) {
1026 if (this.fsOk) {
1027 closeUserRegions(abortRequested);
1028 }
1029 LOG.info("aborting server " + this.serverName);
1030 } else {
1031 closeUserRegions(abortRequested);
1032 LOG.info("stopping server " + this.serverName);
1033 }
1034
1035
1036 if (this.metaTableLocator != null) this.metaTableLocator.stop();
1037 if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1038 try {
1039 this.clusterConnection.close();
1040 } catch (IOException e) {
1041
1042
1043 LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1044 }
1045 }
1046
1047
1048 if (!this.killed && containsMetaTableRegions()) {
1049 if (!abortRequested || this.fsOk) {
1050 if (this.compactSplitThread != null) {
1051 this.compactSplitThread.join();
1052 this.compactSplitThread = null;
1053 }
1054 closeMetaTableRegions(abortRequested);
1055 }
1056 }
1057
1058 if (!this.killed && this.fsOk) {
1059 waitOnAllRegionsToClose(abortRequested);
1060 LOG.info("stopping server " + this.serverName +
1061 "; all regions closed.");
1062 }
1063
1064
1065 if (this.fsOk) {
1066 shutdownWAL(!abortRequested);
1067 }
1068
1069
1070 if (this.rssStub != null) {
1071 this.rssStub = null;
1072 }
1073 if (this.rpcClient != null) {
1074 this.rpcClient.close();
1075 }
1076 if (this.leases != null) {
1077 this.leases.close();
1078 }
1079 if (this.pauseMonitor != null) {
1080 this.pauseMonitor.stop();
1081 }
1082
1083 if (!killed) {
1084 stopServiceThreads();
1085 }
1086
1087 if (this.rpcServices != null) {
1088 this.rpcServices.stop();
1089 }
1090
1091 try {
1092 deleteMyEphemeralNode();
1093 } catch (KeeperException.NoNodeException nn) {
1094 } catch (KeeperException e) {
1095 LOG.warn("Failed deleting my ephemeral node", e);
1096 }
1097
1098
1099 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1100
1101 if (this.zooKeeper != null) {
1102 this.zooKeeper.close();
1103 }
1104 LOG.info("stopping server " + this.serverName +
1105 "; zookeeper connection closed.");
1106
1107 LOG.info(Thread.currentThread().getName() + " exiting");
1108 }
1109
1110 private boolean containsMetaTableRegions() {
1111 return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1112 }
1113
1114 private boolean areAllUserRegionsOffline() {
1115 if (getNumberOfOnlineRegions() > 2) return false;
1116 boolean allUserRegionsOffline = true;
1117 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1118 if (!e.getValue().getRegionInfo().isMetaTable()) {
1119 allUserRegionsOffline = false;
1120 break;
1121 }
1122 }
1123 return allUserRegionsOffline;
1124 }
1125
1126
1127
1128
1129 private long getWriteRequestCount() {
1130 long writeCount = 0;
1131 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1132 writeCount += e.getValue().getWriteRequestsCount();
1133 }
1134 return writeCount;
1135 }
1136
1137 @VisibleForTesting
1138 protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1139 throws IOException {
1140 RegionServerStatusService.BlockingInterface rss = rssStub;
1141 if (rss == null) {
1142
1143 return;
1144 }
1145 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1146 try {
1147 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1148 ServerName sn = ServerName.parseVersionedServerName(
1149 this.serverName.getVersionedBytes());
1150 request.setServer(ProtobufUtil.toServerName(sn));
1151 request.setLoad(sl);
1152 rss.regionServerReport(null, request.build());
1153 } catch (ServiceException se) {
1154 IOException ioe = ProtobufUtil.getRemoteException(se);
1155 if (ioe instanceof YouAreDeadException) {
1156
1157 throw ioe;
1158 }
1159 if (rssStub == rss) {
1160 rssStub = null;
1161 }
1162
1163
1164 createRegionServerStatusStub();
1165 }
1166 }
1167
1168 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1169 throws IOException {
1170
1171
1172
1173
1174
1175
1176
1177 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1178 Collection<Region> regions = getOnlineRegionsLocalContext();
1179 MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1180
1181 ClusterStatusProtos.ServerLoad.Builder serverLoad =
1182 ClusterStatusProtos.ServerLoad.newBuilder();
1183 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1184 serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1185 serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1186 serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1187 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1188 Builder coprocessorBuilder = Coprocessor.newBuilder();
1189 for (String coprocessor : coprocessors) {
1190 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1191 }
1192 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1193 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1194 for (Region region : regions) {
1195 if (region.getCoprocessorHost() != null) {
1196 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1197 Iterator<String> iterator = regionCoprocessors.iterator();
1198 while (iterator.hasNext()) {
1199 serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1200 }
1201 }
1202 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1203 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1204 .getCoprocessors()) {
1205 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1206 }
1207 }
1208 serverLoad.setReportStartTime(reportStartTime);
1209 serverLoad.setReportEndTime(reportEndTime);
1210 if (this.infoServer != null) {
1211 serverLoad.setInfoServerPort(this.infoServer.getPort());
1212 } else {
1213 serverLoad.setInfoServerPort(-1);
1214 }
1215
1216
1217
1218 ReplicationSourceService rsources = getReplicationSourceService();
1219
1220 if (rsources != null) {
1221
1222 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1223 if (rLoad != null) {
1224 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1225 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1226 serverLoad.addReplLoadSource(rLS);
1227 }
1228 }
1229 }
1230
1231 return serverLoad.build();
1232 }
1233
1234 String getOnlineRegionsAsPrintableString() {
1235 StringBuilder sb = new StringBuilder();
1236 for (Region r: this.onlineRegions.values()) {
1237 if (sb.length() > 0) sb.append(", ");
1238 sb.append(r.getRegionInfo().getEncodedName());
1239 }
1240 return sb.toString();
1241 }
1242
1243
1244
1245
1246 private void waitOnAllRegionsToClose(final boolean abort) {
1247
1248 int lastCount = -1;
1249 long previousLogTime = 0;
1250 Set<String> closedRegions = new HashSet<String>();
1251 boolean interrupted = false;
1252 try {
1253 while (!isOnlineRegionsEmpty()) {
1254 int count = getNumberOfOnlineRegions();
1255
1256 if (count != lastCount) {
1257
1258 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1259 previousLogTime = System.currentTimeMillis();
1260 lastCount = count;
1261 LOG.info("Waiting on " + count + " regions to close");
1262
1263
1264 if (count < 10 && LOG.isDebugEnabled()) {
1265 LOG.debug(this.onlineRegions);
1266 }
1267 }
1268 }
1269
1270
1271
1272 for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1273 HRegionInfo hri = e.getValue().getRegionInfo();
1274 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1275 && !closedRegions.contains(hri.getEncodedName())) {
1276 closedRegions.add(hri.getEncodedName());
1277
1278 closeRegionIgnoreErrors(hri, abort);
1279 }
1280 }
1281
1282 if (this.regionsInTransitionInRS.isEmpty()) {
1283 if (!isOnlineRegionsEmpty()) {
1284 LOG.info("We were exiting though online regions are not empty," +
1285 " because some regions failed closing");
1286 }
1287 break;
1288 }
1289 if (sleep(200)) {
1290 interrupted = true;
1291 }
1292 }
1293 } finally {
1294 if (interrupted) {
1295 Thread.currentThread().interrupt();
1296 }
1297 }
1298 }
1299
1300 private boolean sleep(long millis) {
1301 boolean interrupted = false;
1302 try {
1303 Thread.sleep(millis);
1304 } catch (InterruptedException e) {
1305 LOG.warn("Interrupted while sleeping");
1306 interrupted = true;
1307 }
1308 return interrupted;
1309 }
1310
1311 private void shutdownWAL(final boolean close) {
1312 if (this.walFactory != null) {
1313 try {
1314 if (close) {
1315 walFactory.close();
1316 } else {
1317 walFactory.shutdown();
1318 }
1319 } catch (Throwable e) {
1320 e = RemoteExceptionHandler.checkThrowable(e);
1321 LOG.error("Shutdown / close of WAL failed: " + e);
1322 LOG.debug("Shutdown / close exception details:", e);
1323 }
1324 }
1325 }
1326
1327
1328
1329
1330
1331
1332 protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1333 throws IOException {
1334 try {
1335 for (NameStringPair e : c.getMapEntriesList()) {
1336 String key = e.getName();
1337
1338 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1339 String hostnameFromMasterPOV = e.getValue();
1340 this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1341 rpcServices.isa.getPort(), this.startcode);
1342 if (shouldUseThisHostnameInstead() &&
1343 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1344 String msg = "Master passed us a different hostname to use; was=" +
1345 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1346 LOG.error(msg);
1347 throw new IOException(msg);
1348 }
1349 if (!shouldUseThisHostnameInstead() &&
1350 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1351 String msg = "Master passed us a different hostname to use; was=" +
1352 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1353 LOG.error(msg);
1354 }
1355 continue;
1356 }
1357 String value = e.getValue();
1358 if (LOG.isDebugEnabled()) {
1359 LOG.info("Config from master: " + key + "=" + value);
1360 }
1361 this.conf.set(key, value);
1362 }
1363
1364
1365
1366 if (this.conf.get("mapreduce.task.attempt.id") == null) {
1367 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1368 this.serverName.toString());
1369 }
1370
1371
1372 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1373
1374 this.cacheConfig = new CacheConfig(conf);
1375 this.walFactory = setupWALAndReplication();
1376
1377 this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1378
1379 startServiceThreads();
1380 startHeapMemoryManager();
1381 LOG.info("Serving as " + this.serverName +
1382 ", RpcServer on " + rpcServices.isa +
1383 ", sessionid=0x" +
1384 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1385
1386
1387 synchronized (online) {
1388 online.set(true);
1389 online.notifyAll();
1390 }
1391 } catch (Throwable e) {
1392 stop("Failed initialization");
1393 throw convertThrowableToIOE(cleanup(e, "Failed init"),
1394 "Region server startup failed");
1395 } finally {
1396 sleeper.skipSleepCycle();
1397 }
1398 }
1399
1400 private void startHeapMemoryManager() {
1401 this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
1402 this, this.regionServerAccounting);
1403 if (this.hMemManager != null) {
1404 this.hMemManager.start(getChoreService());
1405 }
1406 }
1407
1408 private void createMyEphemeralNode() throws KeeperException, IOException {
1409 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1410 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1411 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1412 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1413 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1414 getMyEphemeralNodePath(), data);
1415 }
1416
1417 private void deleteMyEphemeralNode() throws KeeperException {
1418 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1419 }
1420
1421 @Override
1422 public RegionServerAccounting getRegionServerAccounting() {
1423 return regionServerAccounting;
1424 }
1425
1426 @Override
1427 public TableLockManager getTableLockManager() {
1428 return tableLockManager;
1429 }
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439 private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1440 RegionSpecifier.Builder regionSpecifier) throws IOException {
1441 byte[] name = r.getRegionInfo().getRegionName();
1442 int stores = 0;
1443 int storefiles = 0;
1444 int storeUncompressedSizeMB = 0;
1445 int storefileSizeMB = 0;
1446 int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1447 int storefileIndexSizeMB = 0;
1448 int rootIndexSizeKB = 0;
1449 int totalStaticIndexSizeKB = 0;
1450 int totalStaticBloomSizeKB = 0;
1451 long totalCompactingKVs = 0;
1452 long currentCompactedKVs = 0;
1453 List<Store> storeList = r.getStores();
1454 stores += storeList.size();
1455 for (Store store : storeList) {
1456 storefiles += store.getStorefilesCount();
1457 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1458 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1459 storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1460 CompactionProgress progress = store.getCompactionProgress();
1461 if (progress != null) {
1462 totalCompactingKVs += progress.totalCompactingKVs;
1463 currentCompactedKVs += progress.currentCompactedKVs;
1464 }
1465 rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1466 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1467 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1468 }
1469
1470 float dataLocality =
1471 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1472 if (regionLoadBldr == null) {
1473 regionLoadBldr = RegionLoad.newBuilder();
1474 }
1475 if (regionSpecifier == null) {
1476 regionSpecifier = RegionSpecifier.newBuilder();
1477 }
1478 regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1479 regionSpecifier.setValue(ByteStringer.wrap(name));
1480 regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1481 .setStores(stores)
1482 .setStorefiles(storefiles)
1483 .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1484 .setStorefileSizeMB(storefileSizeMB)
1485 .setMemstoreSizeMB(memstoreSizeMB)
1486 .setStorefileIndexSizeMB(storefileIndexSizeMB)
1487 .setRootIndexSizeKB(rootIndexSizeKB)
1488 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1489 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1490 .setReadRequestsCount(r.getReadRequestsCount())
1491 .setWriteRequestsCount(r.getWriteRequestsCount())
1492 .setTotalCompactingKVs(totalCompactingKVs)
1493 .setCurrentCompactedKVs(currentCompactedKVs)
1494 .setDataLocality(dataLocality)
1495 .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1496 ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1497
1498 return regionLoadBldr.build();
1499 }
1500
1501
1502
1503
1504
1505 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1506 Region r = onlineRegions.get(encodedRegionName);
1507 return r != null ? createRegionLoad(r, null, null) : null;
1508 }
1509
1510
1511
1512
1513 private static class CompactionChecker extends ScheduledChore {
1514 private final HRegionServer instance;
1515 private final int majorCompactPriority;
1516 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1517 private long iteration = 0;
1518
1519 CompactionChecker(final HRegionServer h, final int sleepTime,
1520 final Stoppable stopper) {
1521 super("CompactionChecker", stopper, sleepTime);
1522 this.instance = h;
1523 LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1524
1525
1526
1527
1528 this.majorCompactPriority = this.instance.conf.
1529 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1530 DEFAULT_PRIORITY);
1531 }
1532
1533 @Override
1534 protected void chore() {
1535 for (Region r : this.instance.onlineRegions.values()) {
1536 if (r == null)
1537 continue;
1538 for (Store s : r.getStores()) {
1539 try {
1540 long multiplier = s.getCompactionCheckMultiplier();
1541 assert multiplier > 0;
1542 if (iteration % multiplier != 0) continue;
1543 if (s.needsCompaction()) {
1544
1545 this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1546 + " requests compaction");
1547 } else if (s.isMajorCompaction()) {
1548 if (majorCompactPriority == DEFAULT_PRIORITY
1549 || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1550 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1551 + " requests major compaction; use default priority", null);
1552 } else {
1553 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1554 + " requests major compaction; use configured priority",
1555 this.majorCompactPriority, null, null);
1556 }
1557 }
1558 } catch (IOException e) {
1559 LOG.warn("Failed major compaction check on " + r, e);
1560 }
1561 }
1562 }
1563 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1564 }
1565 }
1566
1567 static class PeriodicMemstoreFlusher extends ScheduledChore {
1568 final HRegionServer server;
1569 final static int RANGE_OF_DELAY = 5 * 60 * 1000;
1570 final static int MIN_DELAY_TIME = 0;
1571 public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1572 super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1573 this.server = server;
1574 }
1575
1576 @Override
1577 protected void chore() {
1578 final StringBuffer whyFlush = new StringBuffer();
1579 for (Region r : this.server.onlineRegions.values()) {
1580 if (r == null) continue;
1581 if (((HRegion)r).shouldFlush(whyFlush)) {
1582 FlushRequester requester = server.getFlushRequester();
1583 if (requester != null) {
1584 long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1585 LOG.info(getName() + " requesting flush of " +
1586 r.getRegionInfo().getRegionNameAsString() + " because " +
1587 whyFlush.toString() +
1588 " after random delay " + randomDelay + "ms");
1589
1590
1591
1592 requester.requestDelayedFlush(r, randomDelay, false);
1593 }
1594 }
1595 }
1596 }
1597 }
1598
1599
1600
1601
1602
1603
1604
1605
1606 public boolean isOnline() {
1607 return online.get();
1608 }
1609
1610
1611
1612
1613
1614
1615
1616 private WALFactory setupWALAndReplication() throws IOException {
1617
1618 final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1619 final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1620
1621 Path logdir = new Path(rootDir, logName);
1622 if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1623 if (this.fs.exists(logdir)) {
1624 throw new RegionServerRunningException("Region server has already " +
1625 "created directory at " + this.serverName.toString());
1626 }
1627
1628
1629
1630 createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1631
1632
1633 final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1634 listeners.add(new MetricsWAL());
1635 if (this.replicationSourceHandler != null &&
1636 this.replicationSourceHandler.getWALActionsListener() != null) {
1637
1638 listeners.add(this.replicationSourceHandler.getWALActionsListener());
1639 }
1640
1641 return new WALFactory(conf, listeners, serverName.toString());
1642 }
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652 protected LogRoller ensureMetaWALRoller() {
1653
1654
1655 LogRoller roller = metawalRoller.get();
1656 if (null == roller) {
1657 LogRoller tmpLogRoller = new LogRoller(this, this);
1658 String n = Thread.currentThread().getName();
1659 Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1660 n + "-MetaLogRoller", uncaughtExceptionHandler);
1661 if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1662 roller = tmpLogRoller;
1663 } else {
1664
1665 Threads.shutdown(tmpLogRoller.getThread());
1666 roller = metawalRoller.get();
1667 }
1668 }
1669 return roller;
1670 }
1671
1672 public MetricsRegionServer getRegionServerMetrics() {
1673 return this.metricsRegionServer;
1674 }
1675
1676
1677
1678
1679 public MasterAddressTracker getMasterAddressTracker() {
1680 return this.masterAddressTracker;
1681 }
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695 private void startServiceThreads() throws IOException {
1696
1697 this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1698 conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1699 this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1700 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1701 this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1702 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1703 this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1704 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1705 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1706 this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1707 conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1708 }
1709 this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1710 "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1711
1712 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1713 this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1714 conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1715 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1716 }
1717
1718 Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1719 uncaughtExceptionHandler);
1720 this.cacheFlusher.start(uncaughtExceptionHandler);
1721
1722 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1723 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1724 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1725 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1726 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1727 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1728
1729
1730
1731 Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1732 uncaughtExceptionHandler);
1733
1734 if (this.replicationSourceHandler == this.replicationSinkHandler &&
1735 this.replicationSourceHandler != null) {
1736 this.replicationSourceHandler.startReplicationService();
1737 } else {
1738 if (this.replicationSourceHandler != null) {
1739 this.replicationSourceHandler.startReplicationService();
1740 }
1741 if (this.replicationSinkHandler != null) {
1742 this.replicationSinkHandler.startReplicationService();
1743 }
1744 }
1745
1746
1747
1748
1749
1750 Configuration sinkConf = HBaseConfiguration.create(conf);
1751 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1752 conf.getInt("hbase.log.replay.retries.number", 8));
1753 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1754 conf.getInt("hbase.log.replay.rpc.timeout", 30000));
1755 sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1756 this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1757 splitLogWorker.start();
1758 }
1759
1760
1761
1762
1763
1764
1765 private int putUpWebUI() throws IOException {
1766 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1767 HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1768 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1769
1770 if(this instanceof HMaster) {
1771 port = conf.getInt(HConstants.MASTER_INFO_PORT,
1772 HConstants.DEFAULT_MASTER_INFOPORT);
1773 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1774 }
1775
1776 if (port < 0) return port;
1777
1778 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1779 String msg =
1780 "Failed to start http info server. Address " + addr
1781 + " does not belong to this host. Correct configuration parameter: "
1782 + "hbase.regionserver.info.bindAddress";
1783 LOG.error(msg);
1784 throw new IOException(msg);
1785 }
1786
1787 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1788 false);
1789 while (true) {
1790 try {
1791 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1792 infoServer.addServlet("dump", "/dump", getDumpServlet());
1793 configureInfoServer();
1794 this.infoServer.start();
1795 break;
1796 } catch (BindException e) {
1797 if (!auto) {
1798
1799 LOG.error("Failed binding http info server to port: " + port);
1800 throw e;
1801 }
1802
1803 LOG.info("Failed binding http info server to port: " + port);
1804 port++;
1805 }
1806 }
1807 port = this.infoServer.getPort();
1808 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1809 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1810 HConstants.DEFAULT_MASTER_INFOPORT);
1811 conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1812 conf.setInt(HConstants.MASTER_INFO_PORT, port);
1813 return port;
1814 }
1815
1816
1817
1818
1819 private boolean isHealthy() {
1820 if (!fsOk) {
1821
1822 return false;
1823 }
1824
1825 if (!(leases.isAlive()
1826 && cacheFlusher.isAlive() && walRoller.isAlive()
1827 && this.compactionChecker.isScheduled()
1828 && this.periodicFlusher.isScheduled())) {
1829 stop("One or more threads are no longer alive -- stop");
1830 return false;
1831 }
1832 final LogRoller metawalRoller = this.metawalRoller.get();
1833 if (metawalRoller != null && !metawalRoller.isAlive()) {
1834 stop("Meta WAL roller thread is no longer alive -- stop");
1835 return false;
1836 }
1837 return true;
1838 }
1839
1840 private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1841
1842 @Override
1843 public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1844 WAL wal;
1845 LogRoller roller = walRoller;
1846
1847 if (regionInfo != null && regionInfo.isMetaTable() &&
1848 regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1849 roller = ensureMetaWALRoller();
1850 wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1851 } else if (regionInfo == null) {
1852 wal = walFactory.getWAL(UNSPECIFIED_REGION);
1853 } else {
1854 wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
1855 }
1856 roller.addWAL(wal);
1857 return wal;
1858 }
1859
1860 @Override
1861 public ClusterConnection getConnection() {
1862 return this.clusterConnection;
1863 }
1864
1865 @Override
1866 public MetaTableLocator getMetaTableLocator() {
1867 return this.metaTableLocator;
1868 }
1869
1870 @Override
1871 public void stop(final String msg) {
1872 if (!this.stopped) {
1873 try {
1874 if (this.rsHost != null) {
1875 this.rsHost.preStop(msg);
1876 }
1877 this.stopped = true;
1878 LOG.info("STOPPED: " + msg);
1879
1880 sleeper.skipSleepCycle();
1881 } catch (IOException exp) {
1882 LOG.warn("The region server did not stop", exp);
1883 }
1884 }
1885 }
1886
1887 public void waitForServerOnline(){
1888 while (!isStopped() && !isOnline()) {
1889 synchronized (online) {
1890 try {
1891 online.wait(msgInterval);
1892 } catch (InterruptedException ie) {
1893 Thread.currentThread().interrupt();
1894 break;
1895 }
1896 }
1897 }
1898 }
1899
1900 @Override
1901 public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1902 postOpenDeployTasks(new PostOpenDeployContext(r, -1));
1903 }
1904
1905 @Override
1906 public void postOpenDeployTasks(final PostOpenDeployContext context)
1907 throws KeeperException, IOException {
1908 Region r = context.getRegion();
1909 long masterSystemTime = context.getMasterSystemTime();
1910 Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
1911 rpcServices.checkOpen();
1912 LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
1913
1914 for (Store s : r.getStores()) {
1915 if (s.hasReferences() || s.needsCompaction()) {
1916 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1917 }
1918 }
1919 long openSeqNum = r.getOpenSeqNum();
1920 if (openSeqNum == HConstants.NO_SEQNUM) {
1921
1922 LOG.error("No sequence number found when opening " +
1923 r.getRegionInfo().getRegionNameAsString());
1924 openSeqNum = 0;
1925 }
1926
1927
1928 updateRecoveringRegionLastFlushedSequenceId(r);
1929
1930
1931 if (r.getRegionInfo().isMetaRegion()) {
1932 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, r.getRegionInfo().getReplicaId(),
1933 State.OPEN);
1934 } else if (useZKForAssignment) {
1935 MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
1936 this.serverName, openSeqNum, masterSystemTime);
1937 }
1938 if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
1939 TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
1940 throw new IOException("Failed to report opened region to master: "
1941 + r.getRegionInfo().getRegionNameAsString());
1942 }
1943
1944 triggerFlushInPrimaryRegion((HRegion)r);
1945
1946 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
1947 }
1948
1949 @Override
1950 public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
1951 return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
1952 }
1953
1954 @Override
1955 public boolean reportRegionStateTransition(
1956 TransitionCode code, long openSeqNum, HRegionInfo... hris) {
1957 return reportRegionStateTransition(
1958 new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
1959 }
1960
1961 @Override
1962 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
1963 TransitionCode code = context.getCode();
1964 long openSeqNum = context.getOpenSeqNum();
1965 HRegionInfo[] hris = context.getHris();
1966
1967 ReportRegionStateTransitionRequest.Builder builder =
1968 ReportRegionStateTransitionRequest.newBuilder();
1969 builder.setServer(ProtobufUtil.toServerName(serverName));
1970 RegionStateTransition.Builder transition = builder.addTransitionBuilder();
1971 transition.setTransitionCode(code);
1972 if (code == TransitionCode.OPENED && openSeqNum >= 0) {
1973 transition.setOpenSeqNum(openSeqNum);
1974 }
1975 for (HRegionInfo hri: hris) {
1976 transition.addRegionInfo(HRegionInfo.convert(hri));
1977 }
1978 ReportRegionStateTransitionRequest request = builder.build();
1979 while (keepLooping()) {
1980 RegionServerStatusService.BlockingInterface rss = rssStub;
1981 try {
1982 if (rss == null) {
1983 createRegionServerStatusStub();
1984 continue;
1985 }
1986 ReportRegionStateTransitionResponse response =
1987 rss.reportRegionStateTransition(null, request);
1988 if (response.hasErrorMessage()) {
1989 LOG.info("Failed to transition " + hris[0]
1990 + " to " + code + ": " + response.getErrorMessage());
1991 return false;
1992 }
1993 return true;
1994 } catch (ServiceException se) {
1995 IOException ioe = ProtobufUtil.getRemoteException(se);
1996 LOG.info("Failed to report region transition, will retry", ioe);
1997 if (rssStub == rss) {
1998 rssStub = null;
1999 }
2000 }
2001 }
2002 return false;
2003 }
2004
2005
2006
2007
2008
2009 void triggerFlushInPrimaryRegion(final HRegion region) {
2010 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2011 return;
2012 }
2013 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2014 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2015 region.conf)) {
2016 region.setReadsEnabled(true);
2017 return;
2018 }
2019
2020 region.setReadsEnabled(false);
2021
2022
2023
2024 this.service.submit(
2025 new RegionReplicaFlushHandler(this, clusterConnection,
2026 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2027 }
2028
2029 @Override
2030 public RpcServerInterface getRpcServer() {
2031 return rpcServices.rpcServer;
2032 }
2033
2034 @VisibleForTesting
2035 public RSRpcServices getRSRpcServices() {
2036 return rpcServices;
2037 }
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049 @Override
2050 public void abort(String reason, Throwable cause) {
2051 String msg = "ABORTING region server " + this + ": " + reason;
2052 if (cause != null) {
2053 LOG.fatal(msg, cause);
2054 } else {
2055 LOG.fatal(msg);
2056 }
2057 this.abortRequested = true;
2058
2059
2060
2061 LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2062 CoprocessorHost.getLoadedCoprocessors());
2063
2064 try {
2065 LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2066 } catch (MalformedObjectNameException | IOException e) {
2067 LOG.warn("Failed dumping metrics", e);
2068 }
2069
2070
2071 try {
2072 if (cause != null) {
2073 msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2074 }
2075
2076 if (rssStub != null && this.serverName != null) {
2077 ReportRSFatalErrorRequest.Builder builder =
2078 ReportRSFatalErrorRequest.newBuilder();
2079 ServerName sn =
2080 ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2081 builder.setServer(ProtobufUtil.toServerName(sn));
2082 builder.setErrorMessage(msg);
2083 rssStub.reportRSFatalError(null, builder.build());
2084 }
2085 } catch (Throwable t) {
2086 LOG.warn("Unable to report fatal error to master", t);
2087 }
2088 stop(reason);
2089 }
2090
2091
2092
2093
2094 public void abort(String reason) {
2095 abort(reason, null);
2096 }
2097
2098 @Override
2099 public boolean isAborted() {
2100 return this.abortRequested;
2101 }
2102
2103
2104
2105
2106
2107
2108 protected void kill() {
2109 this.killed = true;
2110 abort("Simulated kill");
2111 }
2112
2113
2114
2115
2116 protected void sendShutdownInterrupt() {
2117 }
2118
2119
2120
2121
2122
2123 protected void stopServiceThreads() {
2124
2125 if (this.choreService != null) choreService.shutdown();
2126 if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2127 if (this.compactionChecker != null) compactionChecker.cancel(true);
2128 if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2129 if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2130 if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2131 if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2132
2133 if (this.cacheFlusher != null) {
2134 this.cacheFlusher.join();
2135 }
2136
2137 if (this.spanReceiverHost != null) {
2138 this.spanReceiverHost.closeReceivers();
2139 }
2140 if (this.walRoller != null) {
2141 Threads.shutdown(this.walRoller.getThread());
2142 }
2143 final LogRoller metawalRoller = this.metawalRoller.get();
2144 if (metawalRoller != null) {
2145 Threads.shutdown(metawalRoller.getThread());
2146 }
2147 if (this.compactSplitThread != null) {
2148 this.compactSplitThread.join();
2149 }
2150 if (this.service != null) this.service.shutdown();
2151 if (this.replicationSourceHandler != null &&
2152 this.replicationSourceHandler == this.replicationSinkHandler) {
2153 this.replicationSourceHandler.stopReplicationService();
2154 } else {
2155 if (this.replicationSourceHandler != null) {
2156 this.replicationSourceHandler.stopReplicationService();
2157 }
2158 if (this.replicationSinkHandler != null) {
2159 this.replicationSinkHandler.stopReplicationService();
2160 }
2161 }
2162 }
2163
2164
2165
2166
2167
2168 ReplicationSourceService getReplicationSourceService() {
2169 return replicationSourceHandler;
2170 }
2171
2172
2173
2174
2175
2176 ReplicationSinkService getReplicationSinkService() {
2177 return replicationSinkHandler;
2178 }
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188 @VisibleForTesting
2189 protected synchronized ServerName createRegionServerStatusStub() {
2190 if (rssStub != null) {
2191 return masterAddressTracker.getMasterAddress();
2192 }
2193 ServerName sn = null;
2194 long previousLogTime = 0;
2195 boolean refresh = false;
2196 RegionServerStatusService.BlockingInterface intf = null;
2197 boolean interrupted = false;
2198 try {
2199 while (keepLooping()) {
2200 sn = this.masterAddressTracker.getMasterAddress(refresh);
2201 if (sn == null) {
2202 if (!keepLooping()) {
2203
2204 LOG.debug("No master found and cluster is stopped; bailing out");
2205 return null;
2206 }
2207 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2208 LOG.debug("No master found; retry");
2209 previousLogTime = System.currentTimeMillis();
2210 }
2211 refresh = true;
2212 if (sleep(200)) {
2213 interrupted = true;
2214 }
2215 continue;
2216 }
2217
2218
2219 if (this instanceof HMaster && sn.equals(getServerName())) {
2220 intf = ((HMaster)this).getMasterRpcServices();
2221 break;
2222 }
2223 try {
2224 BlockingRpcChannel channel =
2225 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2226 shortOperationTimeout);
2227 intf = RegionServerStatusService.newBlockingStub(channel);
2228 break;
2229 } catch (IOException e) {
2230 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2231 e = e instanceof RemoteException ?
2232 ((RemoteException)e).unwrapRemoteException() : e;
2233 if (e instanceof ServerNotRunningYetException) {
2234 LOG.info("Master isn't available yet, retrying");
2235 } else {
2236 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2237 }
2238 previousLogTime = System.currentTimeMillis();
2239 }
2240 if (sleep(200)) {
2241 interrupted = true;
2242 }
2243 }
2244 }
2245 } finally {
2246 if (interrupted) {
2247 Thread.currentThread().interrupt();
2248 }
2249 }
2250 rssStub = intf;
2251 return sn;
2252 }
2253
2254
2255
2256
2257
2258 private boolean keepLooping() {
2259 return !this.stopped && isClusterUp();
2260 }
2261
2262
2263
2264
2265
2266
2267
2268
2269 private RegionServerStartupResponse reportForDuty() throws IOException {
2270 ServerName masterServerName = createRegionServerStatusStub();
2271 if (masterServerName == null) return null;
2272 RegionServerStartupResponse result = null;
2273 try {
2274 rpcServices.requestCount.set(0);
2275 LOG.info("reportForDuty to master=" + masterServerName + " with port="
2276 + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2277 long now = EnvironmentEdgeManager.currentTime();
2278 int port = rpcServices.isa.getPort();
2279 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2280 if (shouldUseThisHostnameInstead()) {
2281 request.setUseThisHostnameInstead(useThisHostnameInstead);
2282 }
2283 request.setPort(port);
2284 request.setServerStartCode(this.startcode);
2285 request.setServerCurrentTime(now);
2286 result = this.rssStub.regionServerStartup(null, request.build());
2287 } catch (ServiceException se) {
2288 IOException ioe = ProtobufUtil.getRemoteException(se);
2289 if (ioe instanceof ClockOutOfSyncException) {
2290 LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2291
2292 throw ioe;
2293 } else if (ioe instanceof ServerNotRunningYetException) {
2294 LOG.debug("Master is not running yet");
2295 } else {
2296 LOG.warn("error telling master we are up", se);
2297 }
2298 rssStub = null;
2299 }
2300 return result;
2301 }
2302
2303 @Override
2304 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2305 try {
2306 GetLastFlushedSequenceIdRequest req =
2307 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2308 RegionServerStatusService.BlockingInterface rss = rssStub;
2309 if (rss == null) {
2310 createRegionServerStatusStub();
2311 rss = rssStub;
2312 if (rss == null) {
2313
2314 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2315 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2316 .build();
2317 }
2318 }
2319 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2320 return RegionStoreSequenceIds.newBuilder()
2321 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2322 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2323 } catch (ServiceException e) {
2324 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2325 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2326 .build();
2327 }
2328 }
2329
2330
2331
2332
2333
2334
2335 protected void closeAllRegions(final boolean abort) {
2336 closeUserRegions(abort);
2337 closeMetaTableRegions(abort);
2338 }
2339
2340
2341
2342
2343
2344 void closeMetaTableRegions(final boolean abort) {
2345 Region meta = null;
2346 this.lock.writeLock().lock();
2347 try {
2348 for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2349 HRegionInfo hri = e.getValue().getRegionInfo();
2350 if (hri.isMetaRegion()) {
2351 meta = e.getValue();
2352 }
2353 if (meta != null) break;
2354 }
2355 } finally {
2356 this.lock.writeLock().unlock();
2357 }
2358 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2359 }
2360
2361
2362
2363
2364
2365
2366
2367 void closeUserRegions(final boolean abort) {
2368 this.lock.writeLock().lock();
2369 try {
2370 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2371 Region r = e.getValue();
2372 if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2373
2374 closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2375 }
2376 }
2377 } finally {
2378 this.lock.writeLock().unlock();
2379 }
2380 }
2381
2382
2383 public InfoServer getInfoServer() {
2384 return infoServer;
2385 }
2386
2387
2388
2389
2390 @Override
2391 public boolean isStopped() {
2392 return this.stopped;
2393 }
2394
2395 @Override
2396 public boolean isStopping() {
2397 return this.stopping;
2398 }
2399
2400 @Override
2401 public Map<String, Region> getRecoveringRegions() {
2402 return this.recoveringRegions;
2403 }
2404
2405
2406
2407
2408
2409 @Override
2410 public Configuration getConfiguration() {
2411 return conf;
2412 }
2413
2414
2415 ReentrantReadWriteLock.WriteLock getWriteLock() {
2416 return lock.writeLock();
2417 }
2418
2419 public int getNumberOfOnlineRegions() {
2420 return this.onlineRegions.size();
2421 }
2422
2423 boolean isOnlineRegionsEmpty() {
2424 return this.onlineRegions.isEmpty();
2425 }
2426
2427
2428
2429
2430
2431
2432 public Collection<Region> getOnlineRegionsLocalContext() {
2433 Collection<Region> regions = this.onlineRegions.values();
2434 return Collections.unmodifiableCollection(regions);
2435 }
2436
2437 @Override
2438 public void addToOnlineRegions(Region region) {
2439 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2440 configurationManager.registerObserver(region);
2441 }
2442
2443
2444
2445
2446
2447
2448 SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2449
2450 SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2451 new Comparator<Long>() {
2452 @Override
2453 public int compare(Long a, Long b) {
2454 return -1 * a.compareTo(b);
2455 }
2456 });
2457
2458 for (Region region : this.onlineRegions.values()) {
2459 sortedRegions.put(region.getMemstoreSize(), region);
2460 }
2461 return sortedRegions;
2462 }
2463
2464
2465
2466
2467 public long getStartcode() {
2468 return this.startcode;
2469 }
2470
2471
2472 @Override
2473 public FlushRequester getFlushRequester() {
2474 return this.cacheFlusher;
2475 }
2476
2477
2478
2479
2480
2481
2482
2483 protected HRegionInfo[] getMostLoadedRegions() {
2484 ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2485 for (Region r : onlineRegions.values()) {
2486 if (!r.isAvailable()) {
2487 continue;
2488 }
2489 if (regions.size() < numRegionsToReport) {
2490 regions.add(r.getRegionInfo());
2491 } else {
2492 break;
2493 }
2494 }
2495 return regions.toArray(new HRegionInfo[regions.size()]);
2496 }
2497
2498 @Override
2499 public Leases getLeases() {
2500 return leases;
2501 }
2502
2503
2504
2505
2506 protected Path getRootDir() {
2507 return rootDir;
2508 }
2509
2510
2511
2512
2513 @Override
2514 public FileSystem getFileSystem() {
2515 return fs;
2516 }
2517
2518 @Override
2519 public String toString() {
2520 return getServerName().toString();
2521 }
2522
2523
2524
2525
2526
2527
2528 public int getThreadWakeFrequency() {
2529 return threadWakeFrequency;
2530 }
2531
2532 @Override
2533 public ZooKeeperWatcher getZooKeeper() {
2534 return zooKeeper;
2535 }
2536
2537 @Override
2538 public BaseCoordinatedStateManager getCoordinatedStateManager() {
2539 return csm;
2540 }
2541
2542 @Override
2543 public ServerName getServerName() {
2544 return serverName;
2545 }
2546
2547 @Override
2548 public CompactionRequestor getCompactionRequester() {
2549 return this.compactSplitThread;
2550 }
2551
2552 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2553 return this.rsHost;
2554 }
2555
2556 @Override
2557 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2558 return this.regionsInTransitionInRS;
2559 }
2560
2561 @Override
2562 public ExecutorService getExecutorService() {
2563 return service;
2564 }
2565
2566 @Override
2567 public ChoreService getChoreService() {
2568 return choreService;
2569 }
2570
2571 @Override
2572 public RegionServerQuotaManager getRegionServerQuotaManager() {
2573 return rsQuotaManager;
2574 }
2575
2576
2577
2578
2579
2580
2581
2582
2583 static private void createNewReplicationInstance(Configuration conf,
2584 HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2585
2586
2587 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2588 HConstants.REPLICATION_ENABLE_DEFAULT)) {
2589 return;
2590 }
2591
2592
2593 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2594 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2595
2596
2597 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2598 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2599
2600
2601
2602 if (sourceClassname.equals(sinkClassname)) {
2603 server.replicationSourceHandler = (ReplicationSourceService)
2604 newReplicationInstance(sourceClassname,
2605 conf, server, fs, logDir, oldLogDir);
2606 server.replicationSinkHandler = (ReplicationSinkService)
2607 server.replicationSourceHandler;
2608 } else {
2609 server.replicationSourceHandler = (ReplicationSourceService)
2610 newReplicationInstance(sourceClassname,
2611 conf, server, fs, logDir, oldLogDir);
2612 server.replicationSinkHandler = (ReplicationSinkService)
2613 newReplicationInstance(sinkClassname,
2614 conf, server, fs, logDir, oldLogDir);
2615 }
2616 }
2617
2618 static private ReplicationService newReplicationInstance(String classname,
2619 Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2620 Path oldLogDir) throws IOException{
2621
2622 Class<?> clazz = null;
2623 try {
2624 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2625 clazz = Class.forName(classname, true, classLoader);
2626 } catch (java.lang.ClassNotFoundException nfe) {
2627 throw new IOException("Could not find class for " + classname);
2628 }
2629
2630
2631 ReplicationService service = (ReplicationService)
2632 ReflectionUtils.newInstance(clazz, conf);
2633 service.initialize(server, fs, logDir, oldLogDir);
2634 return service;
2635 }
2636
2637
2638
2639
2640
2641
2642
2643
2644 public static HRegionServer constructRegionServer(
2645 Class<? extends HRegionServer> regionServerClass,
2646 final Configuration conf2, CoordinatedStateManager cp) {
2647 try {
2648 Constructor<? extends HRegionServer> c = regionServerClass
2649 .getConstructor(Configuration.class, CoordinatedStateManager.class);
2650 return c.newInstance(conf2, cp);
2651 } catch (Exception e) {
2652 throw new RuntimeException("Failed construction of " + "Regionserver: "
2653 + regionServerClass.toString(), e);
2654 }
2655 }
2656
2657
2658
2659
2660 public static void main(String[] args) throws Exception {
2661 VersionInfo.logVersion();
2662 Configuration conf = HBaseConfiguration.create();
2663 @SuppressWarnings("unchecked")
2664 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2665 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2666
2667 new HRegionServerCommandLine(regionServerClass).doMain(args);
2668 }
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680 @Override
2681 public List<Region> getOnlineRegions(TableName tableName) {
2682 List<Region> tableRegions = new ArrayList<Region>();
2683 synchronized (this.onlineRegions) {
2684 for (Region region: this.onlineRegions.values()) {
2685 HRegionInfo regionInfo = region.getRegionInfo();
2686 if(regionInfo.getTable().equals(tableName)) {
2687 tableRegions.add(region);
2688 }
2689 }
2690 }
2691 return tableRegions;
2692 }
2693
2694
2695
2696
2697
2698
2699 @Override
2700 public Set<TableName> getOnlineTables() {
2701 Set<TableName> tables = new HashSet<TableName>();
2702 synchronized (this.onlineRegions) {
2703 for (Region region: this.onlineRegions.values()) {
2704 tables.add(region.getTableDesc().getTableName());
2705 }
2706 }
2707 return tables;
2708 }
2709
2710
2711 public String[] getRegionServerCoprocessors() {
2712 TreeSet<String> coprocessors = new TreeSet<String>();
2713 try {
2714 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2715 } catch (IOException exception) {
2716 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2717 "skipping.");
2718 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2719 }
2720 Collection<Region> regions = getOnlineRegionsLocalContext();
2721 for (Region region: regions) {
2722 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2723 try {
2724 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2725 } catch (IOException exception) {
2726 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2727 "; skipping.");
2728 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2729 }
2730 }
2731 return coprocessors.toArray(new String[coprocessors.size()]);
2732 }
2733
2734
2735
2736
2737
2738 private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2739 try {
2740 CloseRegionCoordination.CloseRegionDetails details =
2741 csm.getCloseRegionCoordination().getDetaultDetails();
2742 if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2743 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2744 " - ignoring and continuing");
2745 }
2746 } catch (IOException e) {
2747 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2748 " - ignoring and continuing", e);
2749 }
2750 }
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772 protected boolean closeRegion(String encodedName, final boolean abort,
2773 CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2774 throws NotServingRegionException, RegionAlreadyInTransitionException {
2775
2776 Region actualRegion = this.getFromOnlineRegions(encodedName);
2777 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2778 try {
2779 actualRegion.getCoprocessorHost().preClose(false);
2780 } catch (IOException exp) {
2781 LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2782 return false;
2783 }
2784 }
2785
2786 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2787 Boolean.FALSE);
2788
2789 if (Boolean.TRUE.equals(previous)) {
2790 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2791 "trying to OPEN. Cancelling OPENING.");
2792 if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2793
2794
2795 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2796 " Doing a standard close now");
2797 return closeRegion(encodedName, abort, crd, sn);
2798 }
2799
2800 actualRegion = this.getFromOnlineRegions(encodedName);
2801 if (actualRegion == null) {
2802 LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2803
2804 throw new RegionAlreadyInTransitionException("The region " + encodedName +
2805 " was opening but not yet served. Opening is cancelled.");
2806 }
2807 } else if (Boolean.FALSE.equals(previous)) {
2808 LOG.info("Received CLOSE for the region: " + encodedName +
2809 ", which we are already trying to CLOSE, but not completed yet");
2810
2811
2812
2813
2814
2815
2816 throw new RegionAlreadyInTransitionException("The region " + encodedName +
2817 " was already closing. New CLOSE request is ignored.");
2818 }
2819
2820 if (actualRegion == null) {
2821 LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2822 this.regionsInTransitionInRS.remove(encodedName.getBytes());
2823
2824 throw new NotServingRegionException("The region " + encodedName +
2825 " is not online, and is not opening.");
2826 }
2827
2828 CloseRegionHandler crh;
2829 final HRegionInfo hri = actualRegion.getRegionInfo();
2830 if (hri.isMetaRegion()) {
2831 crh = new CloseMetaHandler(this, this, hri, abort,
2832 csm.getCloseRegionCoordination(), crd);
2833 } else {
2834 crh = new CloseRegionHandler(this, this, hri, abort,
2835 csm.getCloseRegionCoordination(), crd, sn);
2836 }
2837 this.service.submit(crh);
2838 return true;
2839 }
2840
2841
2842
2843
2844
2845
2846 public Region getOnlineRegion(final byte[] regionName) {
2847 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2848 return this.onlineRegions.get(encodedRegionName);
2849 }
2850
2851 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2852 return this.regionFavoredNodesMap.get(encodedRegionName);
2853 }
2854
2855 @Override
2856 public Region getFromOnlineRegions(final String encodedRegionName) {
2857 return this.onlineRegions.get(encodedRegionName);
2858 }
2859
2860
2861 @Override
2862 public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
2863 Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2864 if (destination != null) {
2865 long closeSeqNum = r.getMaxFlushedSeqId();
2866 if (closeSeqNum == HConstants.NO_SEQNUM) {
2867
2868 closeSeqNum = r.getOpenSeqNum();
2869 if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
2870 }
2871 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2872 }
2873 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2874 return toReturn != null;
2875 }
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885 protected Region getRegion(final byte[] regionName)
2886 throws NotServingRegionException {
2887 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2888 return getRegionByEncodedName(regionName, encodedRegionName);
2889 }
2890
2891 public Region getRegionByEncodedName(String encodedRegionName)
2892 throws NotServingRegionException {
2893 return getRegionByEncodedName(null, encodedRegionName);
2894 }
2895
2896 protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2897 throws NotServingRegionException {
2898 Region region = this.onlineRegions.get(encodedRegionName);
2899 if (region == null) {
2900 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2901 if (moveInfo != null) {
2902 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2903 }
2904 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2905 String regionNameStr = regionName == null?
2906 encodedRegionName: Bytes.toStringBinary(regionName);
2907 if (isOpening != null && isOpening.booleanValue()) {
2908 throw new RegionOpeningException("Region " + regionNameStr +
2909 " is opening on " + this.serverName);
2910 }
2911 throw new NotServingRegionException("Region " + regionNameStr +
2912 " is not online on " + this.serverName);
2913 }
2914 return region;
2915 }
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927 private Throwable cleanup(final Throwable t, final String msg) {
2928
2929 if (t instanceof NotServingRegionException) {
2930 LOG.debug("NotServingRegionException; " + t.getMessage());
2931 return t;
2932 }
2933 if (msg == null) {
2934 LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2935 } else {
2936 LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2937 }
2938 if (!rpcServices.checkOOME(t)) {
2939 checkFileSystem();
2940 }
2941 return t;
2942 }
2943
2944
2945
2946
2947
2948
2949
2950
2951 protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2952 return (t instanceof IOException ? (IOException) t : msg == null
2953 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2954 }
2955
2956
2957
2958
2959
2960
2961
2962 public boolean checkFileSystem() {
2963 if (this.fsOk && this.fs != null) {
2964 try {
2965 FSUtils.checkFileSystemAvailable(this.fs);
2966 } catch (IOException e) {
2967 abort("File System not available", e);
2968 this.fsOk = false;
2969 }
2970 }
2971 return this.fsOk;
2972 }
2973
2974 @Override
2975 public void updateRegionFavoredNodesMapping(String encodedRegionName,
2976 List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
2977 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
2978
2979
2980 for (int i = 0; i < favoredNodes.size(); i++) {
2981 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
2982 favoredNodes.get(i).getPort());
2983 }
2984 regionFavoredNodesMap.put(encodedRegionName, addr);
2985 }
2986
2987
2988
2989
2990
2991
2992
2993 @Override
2994 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
2995 return regionFavoredNodesMap.get(encodedRegionName);
2996 }
2997
2998 @Override
2999 public ServerNonceManager getNonceManager() {
3000 return this.nonceManager;
3001 }
3002
3003 private static class MovedRegionInfo {
3004 private final ServerName serverName;
3005 private final long seqNum;
3006 private final long ts;
3007
3008 public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3009 this.serverName = serverName;
3010 this.seqNum = closeSeqNum;
3011 ts = EnvironmentEdgeManager.currentTime();
3012 }
3013
3014 public ServerName getServerName() {
3015 return serverName;
3016 }
3017
3018 public long getSeqNum() {
3019 return seqNum;
3020 }
3021
3022 public long getMoveTime() {
3023 return ts;
3024 }
3025 }
3026
3027
3028
3029 protected Map<String, MovedRegionInfo> movedRegions =
3030 new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3031
3032
3033
3034 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3035
3036 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3037 if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3038 LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3039 return;
3040 }
3041 LOG.info("Adding moved region record: "
3042 + encodedName + " to " + destination + " as of " + closeSeqNum);
3043 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3044 }
3045
3046 void removeFromMovedRegions(String encodedName) {
3047 movedRegions.remove(encodedName);
3048 }
3049
3050 private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3051 MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3052
3053 long now = EnvironmentEdgeManager.currentTime();
3054 if (dest != null) {
3055 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3056 return dest;
3057 } else {
3058 movedRegions.remove(encodedRegionName);
3059 }
3060 }
3061
3062 return null;
3063 }
3064
3065
3066
3067
3068 protected void cleanMovedRegions() {
3069 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3070 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3071
3072 while (it.hasNext()){
3073 Map.Entry<String, MovedRegionInfo> e = it.next();
3074 if (e.getValue().getMoveTime() < cutOff) {
3075 it.remove();
3076 }
3077 }
3078 }
3079
3080
3081
3082
3083
3084 protected int movedRegionCleanerPeriod() {
3085 return TIMEOUT_REGION_MOVED;
3086 }
3087
3088
3089
3090
3091
3092 protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3093 private HRegionServer regionServer;
3094 Stoppable stoppable;
3095
3096 private MovedRegionsCleaner(
3097 HRegionServer regionServer, Stoppable stoppable){
3098 super("MovedRegionsCleaner for region " + regionServer, stoppable,
3099 regionServer.movedRegionCleanerPeriod());
3100 this.regionServer = regionServer;
3101 this.stoppable = stoppable;
3102 }
3103
3104 static MovedRegionsCleaner create(HRegionServer rs){
3105 Stoppable stoppable = new Stoppable() {
3106 private volatile boolean isStopped = false;
3107 @Override public void stop(String why) { isStopped = true;}
3108 @Override public boolean isStopped() {return isStopped;}
3109 };
3110
3111 return new MovedRegionsCleaner(rs, stoppable);
3112 }
3113
3114 @Override
3115 protected void chore() {
3116 regionServer.cleanMovedRegions();
3117 }
3118
3119 @Override
3120 public void stop(String why) {
3121 stoppable.stop(why);
3122 }
3123
3124 @Override
3125 public boolean isStopped() {
3126 return stoppable.isStopped();
3127 }
3128 }
3129
3130 private String getMyEphemeralNodePath() {
3131 return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3132 }
3133
3134 private boolean isHealthCheckerConfigured() {
3135 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3136 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3137 }
3138
3139
3140
3141
3142 public CompactSplitThread getCompactSplitThread() {
3143 return this.compactSplitThread;
3144 }
3145
3146
3147
3148
3149
3150
3151
3152
3153 private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3154 IOException {
3155 if (!r.isRecovering()) {
3156
3157 return;
3158 }
3159
3160 HRegionInfo regionInfo = r.getRegionInfo();
3161 ZooKeeperWatcher zkw = getZooKeeper();
3162 String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3163 Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3164 long minSeqIdForLogReplay = -1;
3165 for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3166 if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3167 minSeqIdForLogReplay = storeSeqIdForReplay;
3168 }
3169 }
3170
3171 try {
3172 long lastRecordedFlushedSequenceId = -1;
3173 String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3174 regionInfo.getEncodedName());
3175
3176 byte[] data;
3177 try {
3178 data = ZKUtil.getData(zkw, nodePath);
3179 } catch (InterruptedException e) {
3180 throw new InterruptedIOException();
3181 }
3182 if (data != null) {
3183 lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3184 }
3185 if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3186 ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3187 }
3188 if (previousRSName != null) {
3189
3190 nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3191 ZKUtil.setData(zkw, nodePath,
3192 ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3193 LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3194 " for " + previousRSName);
3195 } else {
3196 LOG.warn("Can't find failed region server for recovering region " +
3197 regionInfo.getEncodedName());
3198 }
3199 } catch (NoNodeException ignore) {
3200 LOG.debug("Region " + regionInfo.getEncodedName() +
3201 " must have completed recovery because its recovery znode has been removed", ignore);
3202 }
3203 }
3204
3205
3206
3207
3208
3209
3210 private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3211 String result = null;
3212 long maxZxid = 0;
3213 ZooKeeperWatcher zkw = this.getZooKeeper();
3214 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3215 List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3216 if (failedServers == null || failedServers.isEmpty()) {
3217 return result;
3218 }
3219 for (String failedServer : failedServers) {
3220 String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3221 Stat stat = new Stat();
3222 ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3223 if (maxZxid < stat.getCzxid()) {
3224 maxZxid = stat.getCzxid();
3225 result = failedServer;
3226 }
3227 }
3228 return result;
3229 }
3230
3231 public CoprocessorServiceResponse execRegionServerService(
3232 @SuppressWarnings("UnusedParameters") final RpcController controller,
3233 final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3234 try {
3235 ServerRpcController serviceController = new ServerRpcController();
3236 CoprocessorServiceCall call = serviceRequest.getCall();
3237 String serviceName = call.getServiceName();
3238 String methodName = call.getMethodName();
3239 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3240 throw new UnknownProtocolException(null,
3241 "No registered coprocessor service found for name " + serviceName);
3242 }
3243 Service service = coprocessorServiceHandlers.get(serviceName);
3244 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3245 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3246 if (methodDesc == null) {
3247 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3248 + " called on service " + serviceName);
3249 }
3250 Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
3251 ProtobufUtil.mergeFrom(builderForType, call.getRequest());
3252 Message request = builderForType.build();
3253 final Message.Builder responseBuilder =
3254 service.getResponsePrototype(methodDesc).newBuilderForType();
3255 service.callMethod(methodDesc, serviceController, request, new RpcCallback<Message>() {
3256 @Override
3257 public void run(Message message) {
3258 if (message != null) {
3259 responseBuilder.mergeFrom(message);
3260 }
3261 }
3262 });
3263 IOException exception = ResponseConverter.getControllerException(serviceController);
3264 if (exception != null) {
3265 throw exception;
3266 }
3267 Message execResult = responseBuilder.build();
3268 ClientProtos.CoprocessorServiceResponse.Builder builder =
3269 ClientProtos.CoprocessorServiceResponse.newBuilder();
3270 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3271 HConstants.EMPTY_BYTE_ARRAY));
3272 builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3273 .setValue(execResult.toByteString()));
3274 return builder.build();
3275 } catch (IOException ie) {
3276 throw new ServiceException(ie);
3277 }
3278 }
3279
3280
3281
3282
3283 public CacheConfig getCacheConfig() {
3284 return this.cacheConfig;
3285 }
3286
3287
3288
3289
3290 protected ConfigurationManager getConfigurationManager() {
3291 return configurationManager;
3292 }
3293
3294
3295
3296
3297 public TableDescriptors getTableDescriptors() {
3298 return this.tableDescriptors;
3299 }
3300
3301
3302
3303
3304 public void updateConfiguration() {
3305 LOG.info("Reloading the configuration from disk.");
3306
3307 conf.reloadConfiguration();
3308 configurationManager.notifyAllObservers(conf);
3309 }
3310
3311 @Override
3312 public HeapMemoryManager getHeapMemoryManager() {
3313 return hMemManager;
3314 }
3315
3316 @Override
3317 public double getCompactionPressure() {
3318 double max = 0;
3319 for (Region region : onlineRegions.values()) {
3320 for (Store store : region.getStores()) {
3321 double normCount = store.getCompactionPressure();
3322 if (normCount > max) {
3323 max = normCount;
3324 }
3325 }
3326 }
3327 return max;
3328 }
3329
3330
3331
3332
3333
3334 @VisibleForTesting
3335 public boolean walRollRequestFinished() {
3336 return this.walRoller.walRollFinished();
3337 }
3338 }