View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.SortedMap;
24  import java.util.SortedSet;
25  import java.util.TreeMap;
26  import java.util.TreeSet;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.Abortable;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.exceptions.DeserializationException;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
40  import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
41  import org.apache.zookeeper.KeeperException;
42  
43  /**
44   * This class provides an implementation of the ReplicationQueues interface using Zookeeper. The
45   * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
46   * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is
47   * the regionserver name (a concatenation of the region server’s hostname, client port and start
48   * code). For example:
49   *
50   * /hbase/replication/rs/hostname.example.org,6020,1234
51   *
52   * Within this znode, the region server maintains a set of WAL replication queues. These queues are
53   * represented by child znodes named using there give queue id. For example:
54   *
55   * /hbase/replication/rs/hostname.example.org,6020,1234/1
56   * /hbase/replication/rs/hostname.example.org,6020,1234/2
57   *
58   * Each queue has one child znode for every WAL that still needs to be replicated. The value of
59   * these WAL child znodes is the latest position that has been replicated. This position is updated
60   * every time a WAL entry is replicated. For example:
61   *
62   * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
63   */
64  @InterfaceAudience.Private
65  public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
66  
67    /** Znode containing all replication queues for this region server. */
68    private String myQueuesZnode;
69    /** Name of znode we use to lock during failover */
70    private final static String RS_LOCK_ZNODE = "lock";
71  
72    private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
73  
74    public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
75        Abortable abortable) {
76      super(zk, conf, abortable);
77    }
78  
79    @Override
80    public void init(String serverName) throws ReplicationException {
81      this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
82      try {
83        ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
84      } catch (KeeperException e) {
85        throw new ReplicationException("Could not initialize replication queues.", e);
86      }
87    }
88  
89    @Override
90    public void removeQueue(String queueId) {
91      try {
92        ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
93      } catch (KeeperException e) {
94        this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
95      }
96    }
97  
98    @Override
99    public void addLog(String queueId, String filename) throws ReplicationException {
100     String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
101     znode = ZKUtil.joinZNode(znode, filename);
102     try {
103       ZKUtil.createWithParents(this.zookeeper, znode);
104     } catch (KeeperException e) {
105       throw new ReplicationException(
106           "Could not add log because znode could not be created. queueId=" + queueId
107               + ", filename=" + filename);
108     }
109   }
110 
111   @Override
112   public void removeLog(String queueId, String filename) {
113     try {
114       String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
115       znode = ZKUtil.joinZNode(znode, filename);
116       ZKUtil.deleteNode(this.zookeeper, znode);
117     } catch (KeeperException e) {
118       this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
119           + filename + ")", e);
120     }
121   }
122 
123   @Override
124   public void setLogPosition(String queueId, String filename, long position) {
125     try {
126       String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
127       znode = ZKUtil.joinZNode(znode, filename);
128       // Why serialize String of Long and not Long as bytes?
129       ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
130     } catch (KeeperException e) {
131       this.abortable.abort("Failed to write replication wal position (filename=" + filename
132           + ", position=" + position + ")", e);
133     }
134   }
135 
136   @Override
137   public long getLogPosition(String queueId, String filename) throws ReplicationException {
138     String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
139     String znode = ZKUtil.joinZNode(clusterZnode, filename);
140     byte[] bytes = null;
141     try {
142       bytes = ZKUtil.getData(this.zookeeper, znode);
143     } catch (KeeperException e) {
144       throw new ReplicationException("Internal Error: could not get position in log for queueId="
145           + queueId + ", filename=" + filename, e);
146     } catch (InterruptedException e) {
147       Thread.currentThread().interrupt();
148       return 0;
149     }
150     try {
151       return ZKUtil.parseWALPositionFrom(bytes);
152     } catch (DeserializationException de) {
153       LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
154           + "znode content, continuing.");
155     }
156     // if we can not parse the position, start at the beginning of the wal file
157     // again
158     return 0;
159   }
160 
161   @Override
162   public boolean isThisOurZnode(String znode) {
163     return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode);
164   }
165 
166   @Override
167   public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
168     SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
169     // check whether there is multi support. If yes, use it.
170     if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
171       LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue");
172       newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
173     } else {
174       LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
175       if (!lockOtherRS(regionserverZnode)) {
176         return newQueues;
177       }
178       newQueues = copyQueuesFromRS(regionserverZnode);
179       deleteAnotherRSQueues(regionserverZnode);
180     }
181     return newQueues;
182   }
183 
184   @Override
185   public void removeAllQueues() {
186     try {
187       ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
188     } catch (KeeperException e) {
189       // if the znode is already expired, don't bother going further
190       if (e instanceof KeeperException.SessionExpiredException) {
191         return;
192       }
193       this.abortable.abort("Failed to delete replication queues for region server: "
194           + this.myQueuesZnode, e);
195     }
196   }
197 
198   @Override
199   public List<String> getLogsInQueue(String queueId) {
200     String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
201     List<String> result = null;
202     try {
203       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
204     } catch (KeeperException e) {
205       this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
206     }
207     return result;
208   }
209 
210   @Override
211   public List<String> getAllQueues() {
212     List<String> listOfQueues = null;
213     try {
214       listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
215     } catch (KeeperException e) {
216       this.abortable.abort("Failed to get a list of queues for region server: "
217           + this.myQueuesZnode, e);
218     }
219     return listOfQueues;
220   }
221 
222   /**
223    * Try to set a lock in another region server's znode.
224    * @param znode the server names of the other server
225    * @return true if the lock was acquired, false in every other cases
226    */
227   private boolean lockOtherRS(String znode) {
228     try {
229       String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
230       if (parent.equals(this.myQueuesZnode)) {
231         LOG.warn("Won't lock because this is us, we're dead!");
232         return false;
233       }
234       String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
235       ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode));
236     } catch (KeeperException e) {
237       // This exception will pop up if the znode under which we're trying to
238       // create the lock is already deleted by another region server, meaning
239       // that the transfer already occurred.
240       // NoNode => transfer is done and znodes are already deleted
241       // NodeExists => lock znode already created by another RS
242       if (e instanceof KeeperException.NoNodeException
243           || e instanceof KeeperException.NodeExistsException) {
244         LOG.info("Won't transfer the queue," + " another RS took care of it because of: "
245             + e.getMessage());
246       } else {
247         LOG.info("Failed lock other rs", e);
248       }
249       return false;
250     }
251     return true;
252   }
253 
254   /**
255    * Delete all the replication queues for a given region server.
256    * @param regionserverZnode The znode of the region server to delete.
257    */
258   private void deleteAnotherRSQueues(String regionserverZnode) {
259     String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
260     try {
261       List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
262       for (String cluster : clusters) {
263         // No need to delete, it will be deleted later.
264         if (cluster.equals(RS_LOCK_ZNODE)) {
265           continue;
266         }
267         String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
268         ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
269       }
270       // Finish cleaning up
271       ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
272     } catch (KeeperException e) {
273       if (e instanceof KeeperException.NoNodeException
274           || e instanceof KeeperException.NotEmptyException) {
275         // Testing a special case where another region server was able to
276         // create a lock just after we deleted it, but then was also able to
277         // delete the RS znode before us or its lock znode is still there.
278         if (e.getPath().equals(fullpath)) {
279           return;
280         }
281       }
282       this.abortable.abort("Failed to delete replication queues for region server: "
283           + regionserverZnode, e);
284     }
285   }
286 
287   /**
288    * It "atomically" copies all the wals queues from another region server and returns them all
289    * sorted per peer cluster (appended with the dead server's znode).
290    * @param znode pertaining to the region server to copy the queues from
291    * @return WAL queues sorted per peer cluster
292    */
293   private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
294     SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
295     // hbase/replication/rs/deadrs
296     String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
297     List<String> peerIdsToProcess = null;
298     List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
299     try {
300       peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
301       if (peerIdsToProcess == null) return queues; // node already processed
302       for (String peerId : peerIdsToProcess) {
303         ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
304         if (!peerExists(replicationQueueInfo.getPeerId())) {
305           LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
306           // Protection against moving orphaned queues
307           continue;
308         }
309         String newPeerId = peerId + "-" + znode;
310         String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
311         // check the logs queue for the old peer cluster
312         String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
313         List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
314         if (wals == null || wals.size() == 0) {
315           listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
316           continue; // empty log queue.
317         }
318         // create the new cluster znode
319         SortedSet<String> logQueue = new TreeSet<String>();
320         queues.put(newPeerId, logQueue);
321         ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
322         listOfOps.add(op);
323         // get the offset of the logs and set it to new znodes
324         for (String wal : wals) {
325           String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
326           byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
327           LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
328           String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
329           listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
330           // add ops for deleting
331           listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
332           logQueue.add(wal);
333         }
334         // add delete op for peer
335         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
336       }
337       // add delete op for dead rs, this will update the cversion of the parent.
338       // The reader will make optimistic locking with this to get a consistent
339       // snapshot
340       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
341       if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
342       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
343       if (LOG.isTraceEnabled()) LOG.trace("Atomically moved the dead regionserver logs. ");
344     } catch (KeeperException e) {
345       // Multi call failed; it looks like some other regionserver took away the logs.
346       LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
347       queues.clear();
348     } catch (InterruptedException e) {
349       LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
350       queues.clear();
351       Thread.currentThread().interrupt();
352     }
353     return queues;
354   }
355 
356   /**
357    * This methods copies all the wals queues from another region server and returns them all sorted
358    * per peer cluster (appended with the dead server's znode)
359    * @param znode server names to copy
360    * @return all wals for all peers of that cluster, null if an error occurred
361    */
362   private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
363     // TODO this method isn't atomic enough, we could start copying and then
364     // TODO fail for some reason and we would end up with znodes we don't want.
365     SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
366     try {
367       String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
368       List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
369       // We have a lock znode in there, it will count as one.
370       if (clusters == null || clusters.size() <= 1) {
371         return queues;
372       }
373       // The lock isn't a peer cluster, remove it
374       clusters.remove(RS_LOCK_ZNODE);
375       for (String cluster : clusters) {
376         ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
377         if (!peerExists(replicationQueueInfo.getPeerId())) {
378           LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
379           // Protection against moving orphaned queues
380           continue;
381         }
382         // We add the name of the recovered RS to the new znode, we can even
383         // do that for queues that were recovered 10 times giving a znode like
384         // number-startcode-number-otherstartcode-number-anotherstartcode-etc
385         String newCluster = cluster + "-" + znode;
386         String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
387         String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
388         List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
389         // That region server didn't have anything to replicate for this cluster
390         if (wals == null || wals.size() == 0) {
391           continue;
392         }
393         ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
394           HConstants.EMPTY_BYTE_ARRAY);
395         SortedSet<String> logQueue = new TreeSet<String>();
396         queues.put(newCluster, logQueue);
397         for (String wal : wals) {
398           String z = ZKUtil.joinZNode(clusterPath, wal);
399           byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
400           long position = 0;
401           try {
402             position = ZKUtil.parseWALPositionFrom(positionBytes);
403           } catch (DeserializationException e) {
404             LOG.warn("Failed parse of wal position from the following znode: " + z
405                 + ", Exception: " + e);
406           }
407           LOG.debug("Creating " + wal + " with data " + position);
408           String child = ZKUtil.joinZNode(newClusterZnode, wal);
409           // Position doesn't actually change, we are just deserializing it for
410           // logging, so just use the already serialized version
411           ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
412           logQueue.add(wal);
413         }
414       }
415     } catch (KeeperException e) {
416       this.abortable.abort("Copy queues from rs", e);
417     } catch (InterruptedException e) {
418       LOG.warn(e);
419       Thread.currentThread().interrupt();
420     }
421     return queues;
422   }
423 
424   /**
425    * @param lockOwner
426    * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable
427    *         for use as content of an replication lock during region server fail over.
428    */
429   static byte[] lockToByteArray(final String lockOwner) {
430     byte[] bytes =
431         ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
432     return ProtobufUtil.prependPBMagic(bytes);
433   }
434 }