1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.cleaner;
19
20 import static org.junit.Assert.assertEquals;
21
22 import java.io.IOException;
23 import java.lang.reflect.Field;
24 import java.net.URLEncoder;
25 import java.util.LinkedList;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.ChoreService;
32 import org.apache.hadoop.hbase.CoordinatedStateManager;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.testclassification.MediumTests;
36 import org.apache.hadoop.hbase.Server;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.Waiter;
39 import org.apache.hadoop.hbase.client.ClusterConnection;
40 import org.apache.hadoop.hbase.replication.ReplicationFactory;
41 import org.apache.hadoop.hbase.replication.ReplicationQueues;
42 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
43 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
44 import org.apache.hadoop.hbase.replication.regionserver.Replication;
45 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
46 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
47 import org.junit.AfterClass;
48 import org.junit.BeforeClass;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51 import org.mockito.Mockito;
52
53 @Category(MediumTests.class)
54 public class TestLogsCleaner {
55
56 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
57
58
59
60
61 @BeforeClass
62 public static void setUpBeforeClass() throws Exception {
63 TEST_UTIL.startMiniZKCluster();
64 }
65
66
67
68
69 @AfterClass
70 public static void tearDownAfterClass() throws Exception {
71 TEST_UTIL.shutdownMiniZKCluster();
72 }
73
74 @Test
75 public void testLogCleaning() throws Exception{
76 Configuration conf = TEST_UTIL.getConfiguration();
77
78 long ttl = 10000;
79 conf.setLong("hbase.master.logcleaner.ttl", ttl);
80 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
81 Replication.decorateMasterConfiguration(conf);
82 Server server = new DummyServer();
83 ReplicationQueues repQueues =
84 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
85 repQueues.init(server.getServerName().toString());
86 final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
87 HConstants.HREGION_OLDLOGDIR_NAME);
88 String fakeMachineName =
89 URLEncoder.encode(server.getServerName().toString(), "UTF8");
90
91 final FileSystem fs = FileSystem.get(conf);
92
93
94 long now = System.currentTimeMillis();
95 fs.delete(oldLogDir, true);
96 fs.mkdirs(oldLogDir);
97
98 fs.createNewFile(new Path(oldLogDir, "a"));
99 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
100
101
102 System.out.println("Now is: " + now);
103 for (int i = 1; i < 31; i++) {
104
105
106 Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) );
107 fs.createNewFile(fileName);
108
109
110
111
112 if (i % (30/3) == 1) {
113 repQueues.addLog(fakeMachineName, fileName.getName());
114 System.out.println("Replication log file: " + fileName);
115 }
116 }
117
118
119 Thread.sleep(ttl);
120 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
121
122
123
124 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) ));
125
126 for (FileStatus stat : fs.listStatus(oldLogDir)) {
127 System.out.println(stat.getPath().toString());
128 }
129
130 assertEquals(34, fs.listStatus(oldLogDir).length);
131
132 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir);
133
134 cleaner.chore();
135
136
137
138 TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
139 @Override
140 public boolean evaluate() throws Exception {
141 return 5 == fs.listStatus(oldLogDir).length;
142 }
143 });
144
145 for (FileStatus file : fs.listStatus(oldLogDir)) {
146 System.out.println("Kept log files: " + file.getPath().getName());
147 }
148 }
149
150 @Test(timeout=5000)
151 public void testZnodeCversionChange() throws Exception {
152 Configuration conf = TEST_UTIL.getConfiguration();
153 ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
154 cleaner.setConf(conf);
155
156 ReplicationQueuesClient rqcMock = Mockito.mock(ReplicationQueuesClient.class);
157 Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
158
159 Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
160 rqc.setAccessible(true);
161
162 rqc.set(cleaner, rqcMock);
163
164
165 cleaner.getDeletableFiles(new LinkedList<FileStatus>());
166 }
167
168 static class DummyServer implements Server {
169
170 @Override
171 public Configuration getConfiguration() {
172 return TEST_UTIL.getConfiguration();
173 }
174
175 @Override
176 public ZooKeeperWatcher getZooKeeper() {
177 try {
178 return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
179 } catch (IOException e) {
180 e.printStackTrace();
181 }
182 return null;
183 }
184
185 @Override
186 public CoordinatedStateManager getCoordinatedStateManager() {
187 return null;
188 }
189
190 @Override
191 public ClusterConnection getConnection() {
192 return null;
193 }
194
195 @Override
196 public MetaTableLocator getMetaTableLocator() {
197 return null;
198 }
199
200 @Override
201 public ServerName getServerName() {
202 return ServerName.valueOf("regionserver,60020,000000");
203 }
204
205 @Override
206 public void abort(String why, Throwable e) {}
207
208 @Override
209 public boolean isAborted() {
210 return false;
211 }
212
213 @Override
214 public void stop(String why) {}
215
216 @Override
217 public boolean isStopped() {
218 return false;
219 }
220
221 @Override
222 public ChoreService getChoreService() {
223 return null;
224 }
225 }
226 }