1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Comparator;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.TreeSet;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.ClusterManager.ServiceType;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.client.Admin;
31 import org.apache.hadoop.hbase.client.ClusterConnection;
32 import org.apache.hadoop.hbase.client.Connection;
33 import org.apache.hadoop.hbase.client.ConnectionFactory;
34 import org.apache.hadoop.hbase.client.RegionLocator;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
37 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.util.Threads;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class DistributedHBaseCluster extends HBaseCluster {
49 private Admin admin;
50 private final Connection connection;
51
52 private ClusterManager clusterManager;
53
54 public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
55 throws IOException {
56 super(conf);
57 this.clusterManager = clusterManager;
58 this.connection = ConnectionFactory.createConnection(conf);
59 this.admin = this.connection.getAdmin();
60 this.initialClusterStatus = getClusterStatus();
61 }
62
63 public void setClusterManager(ClusterManager clusterManager) {
64 this.clusterManager = clusterManager;
65 }
66
67 public ClusterManager getClusterManager() {
68 return clusterManager;
69 }
70
71
72
73
74
75 @Override
76 public ClusterStatus getClusterStatus() throws IOException {
77 return admin.getClusterStatus();
78 }
79
80 @Override
81 public ClusterStatus getInitialClusterStatus() throws IOException {
82 return initialClusterStatus;
83 }
84
85 @Override
86 public void close() throws IOException {
87 if (this.admin != null) {
88 admin.close();
89 }
90 if (this.connection != null && !this.connection.isClosed()) {
91 this.connection.close();
92 }
93 }
94
95 @Override
96 public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
97 throws IOException {
98 return ((ClusterConnection)this.connection).getAdmin(serverName);
99 }
100
101 @Override
102 public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
103 throws IOException {
104 return ((ClusterConnection)this.connection).getClient(serverName);
105 }
106
107 @Override
108 public void startRegionServer(String hostname, int port) throws IOException {
109 LOG.info("Starting RS on: " + hostname);
110 clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
111 }
112
113 @Override
114 public void killRegionServer(ServerName serverName) throws IOException {
115 LOG.info("Aborting RS: " + serverName.getServerName());
116 clusterManager.kill(ServiceType.HBASE_REGIONSERVER,
117 serverName.getHostname(), serverName.getPort());
118 }
119
120 @Override
121 public void stopRegionServer(ServerName serverName) throws IOException {
122 LOG.info("Stopping RS: " + serverName.getServerName());
123 clusterManager.stop(ServiceType.HBASE_REGIONSERVER,
124 serverName.getHostname(), serverName.getPort());
125 }
126
127 @Override
128 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
129 waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
130 }
131
132 @Override
133 public void startZkNode(String hostname, int port) throws IOException {
134 LOG.info("Starting Zookeeper node on: " + hostname);
135 clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port);
136 }
137
138 @Override
139 public void killZkNode(ServerName serverName) throws IOException {
140 LOG.info("Aborting Zookeeper node on: " + serverName.getServerName());
141 clusterManager.kill(ServiceType.ZOOKEEPER_SERVER,
142 serverName.getHostname(), serverName.getPort());
143 }
144
145 @Override
146 public void stopZkNode(ServerName serverName) throws IOException {
147 LOG.info("Stopping Zookeeper node: " + serverName.getServerName());
148 clusterManager.stop(ServiceType.ZOOKEEPER_SERVER,
149 serverName.getHostname(), serverName.getPort());
150 }
151
152 @Override
153 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
154 waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
155 }
156
157 @Override
158 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
159 waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
160 }
161
162 @Override
163 public void startDataNode(ServerName serverName) throws IOException {
164 LOG.info("Starting data node on: " + serverName.getServerName());
165 clusterManager.start(ServiceType.HADOOP_DATANODE,
166 serverName.getHostname(), serverName.getPort());
167 }
168
169 @Override
170 public void killDataNode(ServerName serverName) throws IOException {
171 LOG.info("Aborting data node on: " + serverName.getServerName());
172 clusterManager.kill(ServiceType.HADOOP_DATANODE,
173 serverName.getHostname(), serverName.getPort());
174 }
175
176 @Override
177 public void stopDataNode(ServerName serverName) throws IOException {
178 LOG.info("Stopping data node on: " + serverName.getServerName());
179 clusterManager.stop(ServiceType.HADOOP_DATANODE,
180 serverName.getHostname(), serverName.getPort());
181 }
182
183 @Override
184 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
185 waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout);
186 }
187
188 @Override
189 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
190 waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout);
191 }
192
193 private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
194 throws IOException {
195 LOG.info("Waiting for service: " + service + " to stop: " + serverName.getServerName());
196 long start = System.currentTimeMillis();
197
198 while ((System.currentTimeMillis() - start) < timeout) {
199 if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
200 return;
201 }
202 Threads.sleep(100);
203 }
204 throw new IOException("did timeout waiting for service to stop:" + serverName);
205 }
206
207 private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
208 throws IOException {
209 LOG.info("Waiting for service: " + service + " to start: " + serverName.getServerName());
210 long start = System.currentTimeMillis();
211
212 while ((System.currentTimeMillis() - start) < timeout) {
213 if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
214 return;
215 }
216 Threads.sleep(100);
217 }
218 throw new IOException("did timeout waiting for service to start:" + serverName);
219 }
220
221
222 @Override
223 public MasterService.BlockingInterface getMasterAdminService()
224 throws IOException {
225 return ((ClusterConnection)this.connection).getMaster();
226 }
227
228 @Override
229 public void startMaster(String hostname, int port) throws IOException {
230 LOG.info("Starting Master on: " + hostname + ":" + port);
231 clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
232 }
233
234 @Override
235 public void killMaster(ServerName serverName) throws IOException {
236 LOG.info("Aborting Master: " + serverName.getServerName());
237 clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
238 }
239
240 @Override
241 public void stopMaster(ServerName serverName) throws IOException {
242 LOG.info("Stopping Master: " + serverName.getServerName());
243 clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
244 }
245
246 @Override
247 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
248 waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
249 }
250
251 @Override
252 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
253 long start = System.currentTimeMillis();
254 while (System.currentTimeMillis() - start < timeout) {
255 try {
256 getMasterAdminService();
257 return true;
258 } catch (MasterNotRunningException m) {
259 LOG.warn("Master not started yet " + m);
260 } catch (ZooKeeperConnectionException e) {
261 LOG.warn("Failed to connect to ZK " + e);
262 }
263 Threads.sleep(1000);
264 }
265 return false;
266 }
267
268 @Override
269 public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
270 HRegionLocation regionLoc = null;
271 try (RegionLocator locator = connection.getRegionLocator(tn)) {
272 regionLoc = locator.getRegionLocation(regionName);
273 }
274 if (regionLoc == null) {
275 LOG.warn("Cannot find region server holding region " + Bytes.toString(regionName) +
276 ", start key [" + Bytes.toString(HRegionInfo.getStartKey(regionName)) + "]");
277 return null;
278 }
279
280 AdminProtos.AdminService.BlockingInterface client =
281 ((ClusterConnection)this.connection).getAdmin(regionLoc.getServerName());
282 ServerInfo info = ProtobufUtil.getServerInfo(null, client);
283 return ProtobufUtil.toServerName(info.getServerName());
284 }
285
286 @Override
287 public void waitUntilShutDown() {
288
289 throw new RuntimeException("Not implemented yet");
290 }
291
292 @Override
293 public void shutdown() throws IOException {
294
295 throw new RuntimeException("Not implemented yet");
296 }
297
298 @Override
299 public boolean isDistributedCluster() {
300 return true;
301 }
302
303 @Override
304 public boolean restoreClusterStatus(ClusterStatus initial) throws IOException {
305 ClusterStatus current = getClusterStatus();
306
307 LOG.info("Restoring cluster - started");
308
309
310 boolean success = true;
311 success = restoreMasters(initial, current) & success;
312 success = restoreRegionServers(initial, current) & success;
313 success = restoreAdmin() & success;
314
315 LOG.info("Restoring cluster - done");
316 return success;
317 }
318
319 protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) {
320 List<IOException> deferred = new ArrayList<IOException>();
321
322 final ServerName initMaster = initial.getMaster();
323 if (!ServerName.isSameHostnameAndPort(initMaster, current.getMaster())) {
324 LOG.info("Restoring cluster - Initial active master : "
325 + initMaster.getHostAndPort()
326 + " has changed to : "
327 + current.getMaster().getHostAndPort());
328
329
330 try {
331 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
332 initMaster.getHostname(), initMaster.getPort())) {
333 LOG.info("Restoring cluster - starting initial active master at:"
334 + initMaster.getHostAndPort());
335 startMaster(initMaster.getHostname(), initMaster.getPort());
336 }
337
338
339
340
341
342 for (ServerName currentBackup : current.getBackupMasters()) {
343 if (!ServerName.isSameHostnameAndPort(currentBackup, initMaster)) {
344 LOG.info("Restoring cluster - stopping backup master: " + currentBackup);
345 stopMaster(currentBackup);
346 }
347 }
348 LOG.info("Restoring cluster - stopping active master: " + current.getMaster());
349 stopMaster(current.getMaster());
350 waitForActiveAndReadyMaster();
351 } catch (IOException ex) {
352
353
354 deferred.add(ex);
355 }
356
357
358 for (ServerName backup : initial.getBackupMasters()) {
359 try {
360
361 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
362 backup.getHostname(),
363 backup.getPort())) {
364 LOG.info("Restoring cluster - starting initial backup master: "
365 + backup.getHostAndPort());
366 startMaster(backup.getHostname(), backup.getPort());
367 }
368 } catch (IOException ex) {
369 deferred.add(ex);
370 }
371 }
372 } else {
373
374 Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
375 Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
376 toStart.addAll(initial.getBackupMasters());
377 toKill.addAll(current.getBackupMasters());
378
379 for (ServerName server : current.getBackupMasters()) {
380 toStart.remove(server);
381 }
382 for (ServerName server: initial.getBackupMasters()) {
383 toKill.remove(server);
384 }
385
386 for (ServerName sn:toStart) {
387 try {
388 if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
389 LOG.info("Restoring cluster - starting initial backup master: " + sn.getHostAndPort());
390 startMaster(sn.getHostname(), sn.getPort());
391 }
392 } catch (IOException ex) {
393 deferred.add(ex);
394 }
395 }
396
397 for (ServerName sn:toKill) {
398 try {
399 if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
400 LOG.info("Restoring cluster - stopping backup master: " + sn.getHostAndPort());
401 stopMaster(sn);
402 }
403 } catch (IOException ex) {
404 deferred.add(ex);
405 }
406 }
407 }
408 if (!deferred.isEmpty()) {
409 LOG.warn("Restoring cluster - restoring region servers reported "
410 + deferred.size() + " errors:");
411 for (int i=0; i<deferred.size() && i < 3; i++) {
412 LOG.warn(deferred.get(i));
413 }
414 }
415
416 return deferred.isEmpty();
417 }
418
419
420 private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
421 @Override
422 public int compare(ServerName o1, ServerName o2) {
423 int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
424 if (compare != 0) return compare;
425 compare = o1.getPort() - o2.getPort();
426 if (compare != 0) return compare;
427 return 0;
428 }
429 }
430
431 protected boolean restoreRegionServers(ClusterStatus initial, ClusterStatus current) {
432 Set<ServerName> toStart = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
433 Set<ServerName> toKill = new TreeSet<ServerName>(new ServerNameIgnoreStartCodeComparator());
434 toStart.addAll(initial.getServers());
435 toKill.addAll(current.getServers());
436
437 for (ServerName server : current.getServers()) {
438 toStart.remove(server);
439 }
440 for (ServerName server: initial.getServers()) {
441 toKill.remove(server);
442 }
443
444 List<IOException> deferred = new ArrayList<IOException>();
445
446 for(ServerName sn:toStart) {
447 try {
448 if (!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
449 sn.getHostname(),
450 sn.getPort())) {
451 LOG.info("Restoring cluster - starting initial region server: " + sn.getHostAndPort());
452 startRegionServer(sn.getHostname(), sn.getPort());
453 }
454 } catch (IOException ex) {
455 deferred.add(ex);
456 }
457 }
458
459 for(ServerName sn:toKill) {
460 try {
461 if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER,
462 sn.getHostname(),
463 sn.getPort())) {
464 LOG.info("Restoring cluster - stopping initial region server: " + sn.getHostAndPort());
465 stopRegionServer(sn);
466 }
467 } catch (IOException ex) {
468 deferred.add(ex);
469 }
470 }
471 if (!deferred.isEmpty()) {
472 LOG.warn("Restoring cluster - restoring region servers reported "
473 + deferred.size() + " errors:");
474 for (int i=0; i<deferred.size() && i < 3; i++) {
475 LOG.warn(deferred.get(i));
476 }
477 }
478
479 return deferred.isEmpty();
480 }
481
482 protected boolean restoreAdmin() throws IOException {
483
484
485
486
487 try {
488 admin.close();
489 } catch (IOException ioe) {
490 LOG.warn("While closing the old connection", ioe);
491 }
492 this.admin = this.connection.getAdmin();
493 LOG.info("Added new HBaseAdmin");
494 return true;
495 }
496 }