1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.SortedMap;
24 import java.util.SortedSet;
25 import java.util.TreeMap;
26 import java.util.TreeSet;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Abortable;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.exceptions.DeserializationException;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
40 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
41 import org.apache.zookeeper.KeeperException;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
66
67
68 private String myQueuesZnode;
69
70 private final static String RS_LOCK_ZNODE = "lock";
71
72 private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
73
74 public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
75 Abortable abortable) {
76 super(zk, conf, abortable);
77 }
78
79 @Override
80 public void init(String serverName) throws ReplicationException {
81 this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
82 try {
83 ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
84 } catch (KeeperException e) {
85 throw new ReplicationException("Could not initialize replication queues.", e);
86 }
87 }
88
89 @Override
90 public void removeQueue(String queueId) {
91 try {
92 ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
93 } catch (KeeperException e) {
94 this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
95 }
96 }
97
98 @Override
99 public void addLog(String queueId, String filename) throws ReplicationException {
100 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
101 znode = ZKUtil.joinZNode(znode, filename);
102 try {
103 ZKUtil.createWithParents(this.zookeeper, znode);
104 } catch (KeeperException e) {
105 throw new ReplicationException(
106 "Could not add log because znode could not be created. queueId=" + queueId
107 + ", filename=" + filename);
108 }
109 }
110
111 @Override
112 public void removeLog(String queueId, String filename) {
113 try {
114 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
115 znode = ZKUtil.joinZNode(znode, filename);
116 ZKUtil.deleteNode(this.zookeeper, znode);
117 } catch (KeeperException e) {
118 this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
119 + filename + ")", e);
120 }
121 }
122
123 @Override
124 public void setLogPosition(String queueId, String filename, long position) {
125 try {
126 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
127 znode = ZKUtil.joinZNode(znode, filename);
128
129 ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
130 } catch (KeeperException e) {
131 this.abortable.abort("Failed to write replication wal position (filename=" + filename
132 + ", position=" + position + ")", e);
133 }
134 }
135
136 @Override
137 public long getLogPosition(String queueId, String filename) throws ReplicationException {
138 String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
139 String znode = ZKUtil.joinZNode(clusterZnode, filename);
140 byte[] bytes = null;
141 try {
142 bytes = ZKUtil.getData(this.zookeeper, znode);
143 } catch (KeeperException e) {
144 throw new ReplicationException("Internal Error: could not get position in log for queueId="
145 + queueId + ", filename=" + filename, e);
146 } catch (InterruptedException e) {
147 Thread.currentThread().interrupt();
148 return 0;
149 }
150 try {
151 return ZKUtil.parseWALPositionFrom(bytes);
152 } catch (DeserializationException de) {
153 LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
154 + "znode content, continuing.");
155 }
156
157
158 return 0;
159 }
160
161 @Override
162 public boolean isThisOurZnode(String znode) {
163 return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode);
164 }
165
166 @Override
167 public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
168 SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
169
170 if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
171 LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue");
172 newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
173 } else {
174 LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
175 if (!lockOtherRS(regionserverZnode)) {
176 return newQueues;
177 }
178 newQueues = copyQueuesFromRS(regionserverZnode);
179 deleteAnotherRSQueues(regionserverZnode);
180 }
181 return newQueues;
182 }
183
184 @Override
185 public void removeAllQueues() {
186 try {
187 ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
188 } catch (KeeperException e) {
189
190 if (e instanceof KeeperException.SessionExpiredException) {
191 return;
192 }
193 this.abortable.abort("Failed to delete replication queues for region server: "
194 + this.myQueuesZnode, e);
195 }
196 }
197
198 @Override
199 public List<String> getLogsInQueue(String queueId) {
200 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
201 List<String> result = null;
202 try {
203 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
204 } catch (KeeperException e) {
205 this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
206 }
207 return result;
208 }
209
210 @Override
211 public List<String> getAllQueues() {
212 List<String> listOfQueues = null;
213 try {
214 listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
215 } catch (KeeperException e) {
216 this.abortable.abort("Failed to get a list of queues for region server: "
217 + this.myQueuesZnode, e);
218 }
219 return listOfQueues;
220 }
221
222
223
224
225
226
227 private boolean lockOtherRS(String znode) {
228 try {
229 String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
230 if (parent.equals(this.myQueuesZnode)) {
231 LOG.warn("Won't lock because this is us, we're dead!");
232 return false;
233 }
234 String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
235 ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode));
236 } catch (KeeperException e) {
237
238
239
240
241
242 if (e instanceof KeeperException.NoNodeException
243 || e instanceof KeeperException.NodeExistsException) {
244 LOG.info("Won't transfer the queue," + " another RS took care of it because of: "
245 + e.getMessage());
246 } else {
247 LOG.info("Failed lock other rs", e);
248 }
249 return false;
250 }
251 return true;
252 }
253
254
255
256
257
258 private void deleteAnotherRSQueues(String regionserverZnode) {
259 String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
260 try {
261 List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
262 for (String cluster : clusters) {
263
264 if (cluster.equals(RS_LOCK_ZNODE)) {
265 continue;
266 }
267 String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
268 ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
269 }
270
271 ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
272 } catch (KeeperException e) {
273 if (e instanceof KeeperException.NoNodeException
274 || e instanceof KeeperException.NotEmptyException) {
275
276
277
278 if (e.getPath().equals(fullpath)) {
279 return;
280 }
281 }
282 this.abortable.abort("Failed to delete replication queues for region server: "
283 + regionserverZnode, e);
284 }
285 }
286
287
288
289
290
291
292
293 private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
294 SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
295
296 String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
297 List<String> peerIdsToProcess = null;
298 List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
299 try {
300 peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
301 if (peerIdsToProcess == null) return queues;
302 for (String peerId : peerIdsToProcess) {
303 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
304 if (!peerExists(replicationQueueInfo.getPeerId())) {
305 LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
306
307 continue;
308 }
309 String newPeerId = peerId + "-" + znode;
310 String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
311
312 String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
313 List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
314 if (wals == null || wals.size() == 0) {
315 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
316 continue;
317 }
318
319 SortedSet<String> logQueue = new TreeSet<String>();
320 queues.put(newPeerId, logQueue);
321 ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
322 listOfOps.add(op);
323
324 for (String wal : wals) {
325 String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
326 byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
327 LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
328 String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
329 listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
330
331 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
332 logQueue.add(wal);
333 }
334
335 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
336 }
337
338
339
340 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
341 if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
342 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
343 if (LOG.isTraceEnabled()) LOG.trace("Atomically moved the dead regionserver logs. ");
344 } catch (KeeperException e) {
345
346 LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
347 queues.clear();
348 } catch (InterruptedException e) {
349 LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
350 queues.clear();
351 Thread.currentThread().interrupt();
352 }
353 return queues;
354 }
355
356
357
358
359
360
361
362 private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
363
364
365 SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
366 try {
367 String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
368 List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
369
370 if (clusters == null || clusters.size() <= 1) {
371 return queues;
372 }
373
374 clusters.remove(RS_LOCK_ZNODE);
375 for (String cluster : clusters) {
376 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
377 if (!peerExists(replicationQueueInfo.getPeerId())) {
378 LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
379
380 continue;
381 }
382
383
384
385 String newCluster = cluster + "-" + znode;
386 String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
387 String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
388 List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
389
390 if (wals == null || wals.size() == 0) {
391 continue;
392 }
393 ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
394 HConstants.EMPTY_BYTE_ARRAY);
395 SortedSet<String> logQueue = new TreeSet<String>();
396 queues.put(newCluster, logQueue);
397 for (String wal : wals) {
398 String z = ZKUtil.joinZNode(clusterPath, wal);
399 byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
400 long position = 0;
401 try {
402 position = ZKUtil.parseWALPositionFrom(positionBytes);
403 } catch (DeserializationException e) {
404 LOG.warn("Failed parse of wal position from the following znode: " + z
405 + ", Exception: " + e);
406 }
407 LOG.debug("Creating " + wal + " with data " + position);
408 String child = ZKUtil.joinZNode(newClusterZnode, wal);
409
410
411 ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
412 logQueue.add(wal);
413 }
414 }
415 } catch (KeeperException e) {
416 this.abortable.abort("Copy queues from rs", e);
417 } catch (InterruptedException e) {
418 LOG.warn(e);
419 Thread.currentThread().interrupt();
420 }
421 return queues;
422 }
423
424
425
426
427
428
429 static byte[] lockToByteArray(final String lockOwner) {
430 byte[] bytes =
431 ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
432 return ProtobufUtil.prependPBMagic(bytes);
433 }
434 }