View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Random;
31  import java.util.Set;
32  import java.util.SortedMap;
33  import java.util.SortedSet;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.CopyOnWriteArrayList;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadPoolExecutor;
41  import java.util.concurrent.TimeUnit;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.fs.FileSystem;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.hbase.Server;
49  import org.apache.hadoop.hbase.TableDescriptors;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.regionserver.HRegionServer;
52  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
53  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
54  import org.apache.hadoop.hbase.replication.ReplicationException;
55  import org.apache.hadoop.hbase.replication.ReplicationListener;
56  import org.apache.hadoop.hbase.replication.ReplicationPeer;
57  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
58  import org.apache.hadoop.hbase.replication.ReplicationPeers;
59  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
60  import org.apache.hadoop.hbase.replication.ReplicationQueues;
61  import org.apache.hadoop.hbase.replication.ReplicationTracker;
62  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
63  
64  import com.google.common.util.concurrent.ThreadFactoryBuilder;
65  
66  /**
67   * This class is responsible to manage all the replication
68   * sources. There are two classes of sources:
69   * <ul>
70   * <li> Normal sources are persistent and one per peer cluster</li>
71   * <li> Old sources are recovered from a failed region server and our
72   * only goal is to finish replicating the WAL queue it had up in ZK</li>
73   * </ul>
74   *
75   * When a region server dies, this class uses a watcher to get notified and it
76   * tries to grab a lock in order to transfer all the queues in a local
77   * old source.
78   *
79   * This class implements the ReplicationListener interface so that it can track changes in
80   * replication state.
81   */
82  @InterfaceAudience.Private
83  public class ReplicationSourceManager implements ReplicationListener {
84    private static final Log LOG =
85        LogFactory.getLog(ReplicationSourceManager.class);
86    // List of all the sources that read this RS's logs
87    private final List<ReplicationSourceInterface> sources;
88    // List of all the sources we got from died RSs
89    private final List<ReplicationSourceInterface> oldsources;
90    private final ReplicationQueues replicationQueues;
91    private final ReplicationTracker replicationTracker;
92    private final ReplicationPeers replicationPeers;
93    // UUID for this cluster
94    private final UUID clusterId;
95    // All about stopping
96    private final Server server;
97    // All logs we are currently tracking
98    // Index structure of the map is: peer_id->logPrefix/logGroup->logs
99    private final Map<String, Map<String, SortedSet<String>>> walsById;
100   // Logs for recovered sources we are currently tracking
101   private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
102   private final Configuration conf;
103   private final FileSystem fs;
104   // The paths to the latest log of each wal group, for new coming peers
105   private Set<Path> latestPaths;
106   // Path to the wals directories
107   private final Path logDir;
108   // Path to the wal archive
109   private final Path oldLogDir;
110   // The number of ms that we wait before moving znodes, HBASE-3596
111   private final long sleepBeforeFailover;
112   // Homemade executer service for replication
113   private final ThreadPoolExecutor executor;
114 
115   private final Random rand;
116 
117 
118   /**
119    * Creates a replication manager and sets the watch on all the other registered region servers
120    * @param replicationQueues the interface for manipulating replication queues
121    * @param replicationPeers
122    * @param replicationTracker
123    * @param conf the configuration to use
124    * @param server the server for this region server
125    * @param fs the file system to use
126    * @param logDir the directory that contains all wal directories of live RSs
127    * @param oldLogDir the directory where old logs are archived
128    * @param clusterId
129    */
130   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
131       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
132       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
133       final Path oldLogDir, final UUID clusterId) {
134     //CopyOnWriteArrayList is thread-safe.
135     //Generally, reading is more than modifying.
136     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
137     this.replicationQueues = replicationQueues;
138     this.replicationPeers = replicationPeers;
139     this.replicationTracker = replicationTracker;
140     this.server = server;
141     this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
142     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
143     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
144     this.conf = conf;
145     this.fs = fs;
146     this.logDir = logDir;
147     this.oldLogDir = oldLogDir;
148     this.sleepBeforeFailover =
149         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
150     this.clusterId = clusterId;
151     this.replicationTracker.registerListener(this);
152     this.replicationPeers.getAllPeerIds();
153     // It's preferable to failover 1 RS at a time, but with good zk servers
154     // more could be processed at the same time.
155     int nbWorkers = conf.getInt("replication.executor.workers", 1);
156     // use a short 100ms sleep since this could be done inline with a RS startup
157     // even if we fail, other region servers can take care of it
158     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
159         100, TimeUnit.MILLISECONDS,
160         new LinkedBlockingQueue<Runnable>());
161     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
162     tfb.setNameFormat("ReplicationExecutor-%d");
163     tfb.setDaemon(true);
164     this.executor.setThreadFactory(tfb.build());
165     this.rand = new Random();
166     this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
167   }
168 
169   /**
170    * Provide the id of the peer and a log key and this method will figure which
171    * wal it belongs to and will log, for this region server, the current
172    * position. It will also clean old logs from the queue.
173    * @param log Path to the log currently being replicated from
174    * replication status in zookeeper. It will also delete older entries.
175    * @param id id of the peer cluster
176    * @param position current location in the log
177    * @param queueRecovered indicates if this queue comes from another region server
178    * @param holdLogInZK if true then the log is retained in ZK
179    */
180   public void logPositionAndCleanOldLogs(Path log, String id, long position,
181       boolean queueRecovered, boolean holdLogInZK) {
182     String fileName = log.getName();
183     this.replicationQueues.setLogPosition(id, fileName, position);
184     if (holdLogInZK) {
185      return;
186     }
187     cleanOldLogs(fileName, id, queueRecovered);
188   }
189 
190   /**
191    * Cleans a log file and all older files from ZK. Called when we are sure that a
192    * log file is closed and has no more entries.
193    * @param key Path to the log
194    * @param id id of the peer cluster
195    * @param queueRecovered Whether this is a recovered queue
196    */
197   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
198     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
199     if (queueRecovered) {
200       SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
201       if (wals != null && !wals.first().equals(key)) {
202         cleanOldLogs(wals, key, id);
203       }
204     } else {
205       synchronized (this.walsById) {
206         SortedSet<String> wals = walsById.get(id).get(logPrefix);
207         if (wals != null && !wals.first().equals(key)) {
208           cleanOldLogs(wals, key, id);
209         }
210       }
211     }
212  }
213 
214   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
215     SortedSet<String> walSet = wals.headSet(key);
216     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
217     for (String wal : walSet) {
218       this.replicationQueues.removeLog(id, wal);
219     }
220     walSet.clear();
221   }
222 
223   /**
224    * Adds a normal source per registered peer cluster and tries to process all
225    * old region server wal queues
226    */
227   protected void init() throws IOException, ReplicationException {
228     for (String id : this.replicationPeers.getPeerIds()) {
229       addSource(id);
230     }
231     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
232     if (currentReplicators == null || currentReplicators.size() == 0) {
233       return;
234     }
235     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
236     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
237         + otherRegionServers);
238 
239     // Look if there's anything to process after a restart
240     for (String rs : currentReplicators) {
241       if (!otherRegionServers.contains(rs)) {
242         transferQueues(rs);
243       }
244     }
245   }
246 
247   /**
248    * Add sources for the given peer cluster on this region server. For the newly added peer, we only
249    * need to enqueue the latest log of each wal group and do replication
250    * @param id the id of the peer cluster
251    * @return the source that was created
252    * @throws IOException
253    */
254   protected ReplicationSourceInterface addSource(String id) throws IOException,
255       ReplicationException {
256     ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
257     ReplicationPeer peer = replicationPeers.getPeer(id);
258     ReplicationSourceInterface src =
259         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
260           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
261     synchronized (this.walsById) {
262       this.sources.add(src);
263       Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
264       this.walsById.put(id, walsByGroup);
265       // Add the latest wal to that source's queue
266       synchronized (latestPaths) {
267         if (this.latestPaths.size() > 0) {
268           for (Path logPath : latestPaths) {
269             String name = logPath.getName();
270             String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
271             SortedSet<String> logs = new TreeSet<String>();
272             logs.add(name);
273             walsByGroup.put(walPrefix, logs);
274             try {
275               this.replicationQueues.addLog(id, name);
276             } catch (ReplicationException e) {
277               String message =
278                   "Cannot add log to queue when creating a new source, queueId=" + id
279                       + ", filename=" + name;
280               server.stop(message);
281               throw e;
282             }
283             src.enqueueLog(logPath);
284           }
285         }
286       }
287     }
288     src.startup();
289     return src;
290   }
291 
292   /**
293    * Delete a complete queue of wals associated with a peer cluster
294    * @param peerId Id of the peer cluster queue of wals to delete
295    */
296   public void deleteSource(String peerId, boolean closeConnection) {
297     this.replicationQueues.removeQueue(peerId);
298     if (closeConnection) {
299       this.replicationPeers.peerRemoved(peerId);
300     }
301   }
302 
303   /**
304    * Terminate the replication on this region server
305    */
306   public void join() {
307     this.executor.shutdown();
308     if (this.sources.size() == 0) {
309       this.replicationQueues.removeAllQueues();
310     }
311     for (ReplicationSourceInterface source : this.sources) {
312       source.terminate("Region server is closing");
313     }
314   }
315 
316   /**
317    * Get a copy of the wals of the first source on this rs
318    * @return a sorted set of wal names
319    */
320   protected Map<String, Map<String, SortedSet<String>>> getWALs() {
321     return Collections.unmodifiableMap(walsById);
322   }
323 
324   /**
325    * Get a copy of the wals of the recovered sources on this rs
326    * @return a sorted set of wal names
327    */
328   protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
329     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
330   }
331 
332   /**
333    * Get a list of all the normal sources of this rs
334    * @return lis of all sources
335    */
336   public List<ReplicationSourceInterface> getSources() {
337     return this.sources;
338   }
339 
340   /**
341    * Get a list of all the old sources of this rs
342    * @return list of all old sources
343    */
344   public List<ReplicationSourceInterface> getOldSources() {
345     return this.oldsources;
346   }
347 
348   void preLogRoll(Path newLog) throws IOException {
349     recordLog(newLog);
350     String logName = newLog.getName();
351     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
352     synchronized (latestPaths) {
353       Iterator<Path> iterator = latestPaths.iterator();
354       while (iterator.hasNext()) {
355         Path path = iterator.next();
356         if (path.getName().contains(logPrefix)) {
357           iterator.remove();
358           break;
359         }
360       }
361       this.latestPaths.add(newLog);
362     }
363   }
364 
365   /**
366    * Check and enqueue the given log to the correct source. If there's still no source for the
367    * group to which the given log belongs, create one
368    * @param logPath the log path to check and enqueue
369    * @throws IOException
370    */
371   private void recordLog(Path logPath) throws IOException {
372     String logName = logPath.getName();
373     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
374     // update replication queues on ZK
375     synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
376                                      // the to-be-removed peer
377       for (String id : replicationPeers.getPeerIds()) {
378         try {
379           this.replicationQueues.addLog(id, logName);
380         } catch (ReplicationException e) {
381           throw new IOException("Cannot add log to replication queue"
382               + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
383         }
384       }
385     }
386     // update walsById map
387     synchronized (walsById) {
388       for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
389         String peerId = entry.getKey();
390         Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
391         boolean existingPrefix = false;
392         for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
393           SortedSet<String> wals = walsEntry.getValue();
394           if (this.sources.isEmpty()) {
395             // If there's no slaves, don't need to keep the old wals since
396             // we only consider the last one when a new slave comes in
397             wals.clear();
398           }
399           if (logPrefix.equals(walsEntry.getKey())) {
400             wals.add(logName);
401             existingPrefix = true;
402           }
403         }
404         if (!existingPrefix) {
405           // The new log belongs to a new group, add it into this peer
406           LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
407           SortedSet<String> wals = new TreeSet<String>();
408           wals.add(logName);
409           walsByPrefix.put(logPrefix, wals);
410         }
411       }
412     }
413   }
414 
415   void postLogRoll(Path newLog) throws IOException {
416     // This only updates the sources we own, not the recovered ones
417     for (ReplicationSourceInterface source : this.sources) {
418       source.enqueueLog(newLog);
419     }
420   }
421 
422   /**
423    * Factory method to create a replication source
424    * @param conf the configuration to use
425    * @param fs the file system to use
426    * @param manager the manager to use
427    * @param server the server object for this region server
428    * @param peerId the id of the peer cluster
429    * @return the created source
430    * @throws IOException
431    */
432   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
433       final FileSystem fs, final ReplicationSourceManager manager,
434       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
435       final Server server, final String peerId, final UUID clusterId,
436       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
437       throws IOException {
438     RegionServerCoprocessorHost rsServerHost = null;
439     TableDescriptors tableDescriptors = null;
440     if (server instanceof HRegionServer) {
441       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
442       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
443     }
444     ReplicationSourceInterface src;
445     try {
446       @SuppressWarnings("rawtypes")
447       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
448           ReplicationSource.class.getCanonicalName()));
449       src = (ReplicationSourceInterface) c.newInstance();
450     } catch (Exception e) {
451       LOG.warn("Passed replication source implementation throws errors, " +
452           "defaulting to ReplicationSource", e);
453       src = new ReplicationSource();
454     }
455 
456     ReplicationEndpoint replicationEndpoint = null;
457     try {
458       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
459       if (replicationEndpointImpl == null) {
460         // Default to HBase inter-cluster replication endpoint
461         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
462       }
463       @SuppressWarnings("rawtypes")
464       Class c = Class.forName(replicationEndpointImpl);
465       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
466       if(rsServerHost != null) {
467         ReplicationEndpoint newReplicationEndPoint = rsServerHost
468             .postCreateReplicationEndPoint(replicationEndpoint);
469         if(newReplicationEndPoint != null) {
470           // Override the newly created endpoint from the hook with configured end point
471           replicationEndpoint = newReplicationEndPoint;
472         }
473       }
474     } catch (Exception e) {
475       LOG.warn("Passed replication endpoint implementation throws errors"
476           + " while initializing ReplicationSource for peer: " + peerId, e);
477       throw new IOException(e);
478     }
479 
480     MetricsSource metrics = new MetricsSource(peerId);
481     // init replication source
482     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
483       clusterId, replicationEndpoint, metrics);
484 
485     // init replication endpoint
486     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
487       fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
488 
489     return src;
490   }
491 
492   /**
493    * Transfer all the queues of the specified to this region server.
494    * First it tries to grab a lock and if it works it will move the
495    * znodes and finally will delete the old znodes.
496    *
497    * It creates one old source for any type of source of the old rs.
498    * @param rsZnode
499    */
500   private void transferQueues(String rsZnode) {
501     NodeFailoverWorker transfer =
502         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
503             this.clusterId);
504     try {
505       this.executor.execute(transfer);
506     } catch (RejectedExecutionException ex) {
507       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
508     }
509   }
510 
511   /**
512    * Clear the references to the specified old source
513    * @param src source to clear
514    */
515   public void closeRecoveredQueue(ReplicationSourceInterface src) {
516     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
517     this.oldsources.remove(src);
518     deleteSource(src.getPeerClusterZnode(), false);
519     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
520   }
521 
522   /**
523    * Thie method first deletes all the recovered sources for the specified
524    * id, then deletes the normal source (deleting all related data in ZK).
525    * @param id The id of the peer cluster
526    */
527   public void removePeer(String id) {
528     LOG.info("Closing the following queue " + id + ", currently have "
529         + sources.size() + " and another "
530         + oldsources.size() + " that were recovered");
531     String terminateMessage = "Replication stream was removed by a user";
532     List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
533     List<ReplicationSourceInterface> oldSourcesToDelete =
534         new ArrayList<ReplicationSourceInterface>();
535     // First close all the recovered sources for this peer
536     for (ReplicationSourceInterface src : oldsources) {
537       if (id.equals(src.getPeerClusterId())) {
538         oldSourcesToDelete.add(src);
539       }
540     }
541     for (ReplicationSourceInterface src : oldSourcesToDelete) {
542       src.terminate(terminateMessage);
543       closeRecoveredQueue((src));
544     }
545     LOG.info("Number of deleted recovered sources for " + id + ": "
546         + oldSourcesToDelete.size());
547     // Now look for the one on this cluster
548     synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
549                                           // for the to-be-removed peer
550       for (ReplicationSourceInterface src : this.sources) {
551         if (id.equals(src.getPeerClusterId())) {
552           srcToRemove.add(src);
553         }
554       }
555       if (srcToRemove.size() == 0) {
556         LOG.error("The queue we wanted to close is missing " + id);
557         return;
558       }
559       for (ReplicationSourceInterface toRemove : srcToRemove) {
560         toRemove.terminate(terminateMessage);
561         this.sources.remove(toRemove);
562       }
563       deleteSource(id, true);
564     }
565   }
566 
567   @Override
568   public void regionServerRemoved(String regionserver) {
569     transferQueues(regionserver);
570   }
571 
572   @Override
573   public void peerRemoved(String peerId) {
574     removePeer(peerId);
575   }
576 
577   @Override
578   public void peerListChanged(List<String> peerIds) {
579     for (String id : peerIds) {
580       try {
581         boolean added = this.replicationPeers.peerAdded(id);
582         if (added) {
583           addSource(id);
584         }
585       } catch (Exception e) {
586         LOG.error("Error while adding a new peer", e);
587       }
588     }
589   }
590 
591   /**
592    * Class responsible to setup new ReplicationSources to take care of the
593    * queues from dead region servers.
594    */
595   class NodeFailoverWorker extends Thread {
596 
597     private String rsZnode;
598     private final ReplicationQueues rq;
599     private final ReplicationPeers rp;
600     private final UUID clusterId;
601 
602     /**
603      *
604      * @param rsZnode
605      */
606     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
607         final ReplicationPeers replicationPeers, final UUID clusterId) {
608       super("Failover-for-"+rsZnode);
609       this.rsZnode = rsZnode;
610       this.rq = replicationQueues;
611       this.rp = replicationPeers;
612       this.clusterId = clusterId;
613     }
614 
615     @Override
616     public void run() {
617       if (this.rq.isThisOurZnode(rsZnode)) {
618         return;
619       }
620       // Wait a bit before transferring the queues, we may be shutting down.
621       // This sleep may not be enough in some cases.
622       try {
623         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
624       } catch (InterruptedException e) {
625         LOG.warn("Interrupted while waiting before transferring a queue.");
626         Thread.currentThread().interrupt();
627       }
628       // We try to lock that rs' queue directory
629       if (server.isStopped()) {
630         LOG.info("Not transferring queue since we are shutting down");
631         return;
632       }
633       SortedMap<String, SortedSet<String>> newQueues = null;
634 
635       newQueues = this.rq.claimQueues(rsZnode);
636 
637       // Copying over the failed queue is completed.
638       if (newQueues.isEmpty()) {
639         // We either didn't get the lock or the failed region server didn't have any outstanding
640         // WALs to replicate, so we are done.
641         return;
642       }
643 
644       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
645         String peerId = entry.getKey();
646         SortedSet<String> walsSet = entry.getValue();
647         try {
648           // there is not an actual peer defined corresponding to peerId for the failover.
649           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
650           String actualPeerId = replicationQueueInfo.getPeerId();
651           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
652           ReplicationPeerConfig peerConfig = null;
653           try {
654             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
655           } catch (ReplicationException ex) {
656             LOG.warn("Received exception while getting replication peer config, skipping replay"
657                 + ex);
658           }
659           if (peer == null || peerConfig == null) {
660             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
661             continue;
662           }
663           // track sources in walsByIdRecoveredQueues
664           Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
665           walsByIdRecoveredQueues.put(peerId, walsByGroup);
666           for (String wal : walsSet) {
667             String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
668             SortedSet<String> wals = walsByGroup.get(walPrefix);
669             if (wals == null) {
670               wals = new TreeSet<String>();
671               walsByGroup.put(walPrefix, wals);
672             }
673             wals.add(wal);
674           }
675 
676           // enqueue sources
677           ReplicationSourceInterface src =
678               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
679                 server, peerId, this.clusterId, peerConfig, peer);
680           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
681             src.terminate("Recovered queue doesn't belong to any current peer");
682             break;
683           }
684           oldsources.add(src);
685           for (String wal : walsSet) {
686             src.enqueueLog(new Path(oldLogDir, wal));
687           }
688           src.startup();
689         } catch (IOException e) {
690           // TODO manage it
691           LOG.error("Failed creating a source", e);
692         }
693       }
694     }
695   }
696 
697   /**
698    * Get the directory where wals are archived
699    * @return the directory where wals are archived
700    */
701   public Path getOldLogDir() {
702     return this.oldLogDir;
703   }
704 
705   /**
706    * Get the directory where wals are stored by their RSs
707    * @return the directory where wals are stored by their RSs
708    */
709   public Path getLogDir() {
710     return this.logDir;
711   }
712 
713   /**
714    * Get the handle on the local file system
715    * @return Handle on the local file system
716    */
717   public FileSystem getFs() {
718     return this.fs;
719   }
720 
721   /**
722    * Get a string representation of all the sources' metrics
723    */
724   public String getStats() {
725     StringBuffer stats = new StringBuffer();
726     for (ReplicationSourceInterface source : sources) {
727       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
728       stats.append(source.getStats() + "\n");
729     }
730     for (ReplicationSourceInterface oldSource : oldsources) {
731       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
732       stats.append(oldSource.getStats()+ "\n");
733     }
734     return stats.toString();
735   }
736 }