1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.master;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileStatus;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
30 import org.apache.hadoop.hbase.replication.ReplicationException;
31 import org.apache.hadoop.hbase.replication.ReplicationFactory;
32 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
33 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
34 import java.io.IOException;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.Set;
38
39 import com.google.common.base.Predicate;
40 import com.google.common.collect.ImmutableSet;
41 import com.google.common.collect.Iterables;
42 import com.google.common.collect.Sets;
43 import org.apache.zookeeper.KeeperException;
44
45
46
47
48
49 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
50 public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
51 private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
52 private ZooKeeperWatcher zkw;
53 private ReplicationQueuesClient replicationQueues;
54 private boolean stopped = false;
55 private boolean aborted;
56
57
58 @Override
59 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
60
61
62 if (this.getConf() == null) {
63 return files;
64 }
65
66 final Set<String> wals;
67 try {
68
69
70 wals = loadWALsFromQueues();
71 } catch (KeeperException e) {
72 LOG.warn("Failed to read zookeeper, skipping checking deletable files");
73 return Collections.emptyList();
74 }
75 return Iterables.filter(files, new Predicate<FileStatus>() {
76 @Override
77 public boolean apply(FileStatus file) {
78 String wal = file.getPath().getName();
79 boolean logInReplicationQueue = wals.contains(wal);
80 if (LOG.isDebugEnabled()) {
81 if (logInReplicationQueue) {
82 LOG.debug("Found log in ZK, keeping: " + wal);
83 } else {
84 LOG.debug("Didn't find this log in ZK, deleting: " + wal);
85 }
86 }
87 return !logInReplicationQueue;
88 }});
89 }
90
91
92
93
94
95
96
97 private Set<String> loadWALsFromQueues() throws KeeperException {
98 for (int retry = 0; ; retry++) {
99 int v0 = replicationQueues.getQueuesZNodeCversion();
100 List<String> rss = replicationQueues.getListOfReplicators();
101 if (rss == null) {
102 LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
103 return ImmutableSet.of();
104 }
105 Set<String> wals = Sets.newHashSet();
106 for (String rs : rss) {
107 List<String> listOfPeers = replicationQueues.getAllQueues(rs);
108
109 if (listOfPeers == null) {
110 continue;
111 }
112 for (String id : listOfPeers) {
113 List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
114 if (peersWals != null) {
115 wals.addAll(peersWals);
116 }
117 }
118 }
119 int v1 = replicationQueues.getQueuesZNodeCversion();
120 if (v0 == v1) {
121 return wals;
122 }
123 LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
124 v0, v1, retry));
125 }
126 }
127
128 @Override
129 public void setConf(Configuration config) {
130
131 if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
132 HConstants.REPLICATION_ENABLE_DEFAULT)) {
133 LOG.warn("Not configured - allowing all wals to be deleted");
134 return;
135 }
136
137
138 Configuration conf = new Configuration(config);
139 super.setConf(conf);
140 try {
141 this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
142 this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
143 this.replicationQueues.init();
144 } catch (ReplicationException e) {
145 LOG.error("Error while configuring " + this.getClass().getName(), e);
146 } catch (IOException e) {
147 LOG.error("Error while configuring " + this.getClass().getName(), e);
148 }
149 }
150
151 @Override
152 public void stop(String why) {
153 if (this.stopped) return;
154 this.stopped = true;
155 if (this.zkw != null) {
156 LOG.info("Stopping " + this.zkw);
157 this.zkw.close();
158 }
159 }
160
161 @Override
162 public boolean isStopped() {
163 return this.stopped;
164 }
165
166 @Override
167 public void abort(String why, Throwable e) {
168 LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
169 this.aborted = true;
170 stop(why);
171 }
172
173 @Override
174 public boolean isAborted() {
175 return this.aborted;
176 }
177 }