View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.net.URLEncoder;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.CountDownLatch;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.ChoreService;
41  import org.apache.hadoop.hbase.ClusterId;
42  import org.apache.hadoop.hbase.CoordinatedStateManager;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HBaseTestingUtility;
45  import org.apache.hadoop.hbase.HColumnDescriptor;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.HRegionInfo;
48  import org.apache.hadoop.hbase.HTableDescriptor;
49  import org.apache.hadoop.hbase.KeyValue;
50  import org.apache.hadoop.hbase.Server;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.client.ClusterConnection;
54  import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
55  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
56  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57  import org.apache.hadoop.hbase.replication.ReplicationFactory;
58  import org.apache.hadoop.hbase.replication.ReplicationPeers;
59  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
60  import org.apache.hadoop.hbase.replication.ReplicationQueues;
61  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
62  import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
63  import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
64  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
65  import org.apache.hadoop.hbase.testclassification.MediumTests;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.FSUtils;
68  import org.apache.hadoop.hbase.wal.WAL;
69  import org.apache.hadoop.hbase.wal.WALFactory;
70  import org.apache.hadoop.hbase.wal.WALKey;
71  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
72  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
73  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
74  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
75  import org.junit.After;
76  import org.junit.AfterClass;
77  import org.junit.Before;
78  import org.junit.BeforeClass;
79  import org.junit.Test;
80  import org.junit.experimental.categories.Category;
81  
82  import com.google.common.collect.Sets;
83  
84  @Category(MediumTests.class)
85  public class TestReplicationSourceManager {
86  
87    private static final Log LOG =
88        LogFactory.getLog(TestReplicationSourceManager.class);
89  
90    private static Configuration conf;
91  
92    private static HBaseTestingUtility utility;
93  
94    private static Replication replication;
95  
96    private static ReplicationSourceManager manager;
97  
98    private static ZooKeeperWatcher zkw;
99  
100   private static HTableDescriptor htd;
101 
102   private static HRegionInfo hri;
103 
104   private static final byte[] r1 = Bytes.toBytes("r1");
105 
106   private static final byte[] r2 = Bytes.toBytes("r2");
107 
108   private static final byte[] f1 = Bytes.toBytes("f1");
109 
110   private static final TableName test =
111       TableName.valueOf("test");
112 
113   private static final String slaveId = "1";
114 
115   private static FileSystem fs;
116 
117   private static Path oldLogDir;
118 
119   private static Path logDir;
120 
121   private static CountDownLatch latch;
122 
123   private static List<String> files = new ArrayList<String>();
124 
125   @BeforeClass
126   public static void setUpBeforeClass() throws Exception {
127 
128     conf = HBaseConfiguration.create();
129     conf.set("replication.replicationsource.implementation",
130         ReplicationSourceDummy.class.getCanonicalName());
131     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
132         HConstants.REPLICATION_ENABLE_DEFAULT);
133     conf.setLong("replication.sleep.before.failover", 2000);
134     conf.setInt("replication.source.maxretriesmultiplier", 10);
135     utility = new HBaseTestingUtility(conf);
136     utility.startMiniZKCluster();
137 
138     zkw = new ZooKeeperWatcher(conf, "test", null);
139     ZKUtil.createWithParents(zkw, "/hbase/replication");
140     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
141     ZKUtil.setData(zkw, "/hbase/replication/peers/1",
142         Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
143             + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
144     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
145     ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
146       ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
147     ZKUtil.createWithParents(zkw, "/hbase/replication/state");
148     ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
149 
150     ZKClusterId.setClusterId(zkw, new ClusterId());
151     FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
152     fs = FileSystem.get(conf);
153     oldLogDir = new Path(utility.getDataTestDir(),
154         HConstants.HREGION_OLDLOGDIR_NAME);
155     logDir = new Path(utility.getDataTestDir(),
156         HConstants.HREGION_LOGDIR_NAME);
157     replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
158     manager = replication.getReplicationManager();
159 
160     manager.addSource(slaveId);
161 
162     htd = new HTableDescriptor(test);
163     HColumnDescriptor col = new HColumnDescriptor("f1");
164     col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
165     htd.addFamily(col);
166     col = new HColumnDescriptor("f2");
167     col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
168     htd.addFamily(col);
169 
170     hri = new HRegionInfo(htd.getTableName(), r1, r2);
171   }
172 
173   @AfterClass
174   public static void tearDownAfterClass() throws Exception {
175     manager.join();
176     utility.shutdownMiniCluster();
177   }
178 
179   @Before
180   public void setUp() throws Exception {
181     fs.delete(logDir, true);
182     fs.delete(oldLogDir, true);
183   }
184 
185   @After
186   public void tearDown() throws Exception {
187     setUp();
188   }
189 
190   @Test
191   public void testLogRoll() throws Exception {
192     long baseline = 1000;
193     long time = baseline;
194     MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
195     KeyValue kv = new KeyValue(r1, f1, r1);
196     WALEdit edit = new WALEdit();
197     edit.add(kv);
198 
199     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
200     listeners.add(replication);
201     final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
202         URLEncoder.encode("regionserver:60020", "UTF8"));
203     final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes());
204     manager.init();
205     HTableDescriptor htd = new HTableDescriptor();
206     htd.addFamily(new HColumnDescriptor(f1));
207     // Testing normal log rolling every 20
208     for(long i = 1; i < 101; i++) {
209       if(i > 1 && i % 20 == 0) {
210         wal.rollWriter();
211       }
212       LOG.info(i);
213       final long txid = wal.append(htd,
214           hri,
215           new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
216           edit,
217           true);
218       wal.sync(txid);
219     }
220 
221     // Simulate a rapid insert that's followed
222     // by a report that's still not totally complete (missing last one)
223     LOG.info(baseline + " and " + time);
224     baseline += 101;
225     time = baseline;
226     LOG.info(baseline + " and " + time);
227 
228     for (int i = 0; i < 3; i++) {
229       wal.append(htd, hri,
230           new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
231           edit,
232           true);
233     }
234     wal.sync();
235 
236     int logNumber = 0;
237     for (Map.Entry<String, SortedSet<String>> entry : manager.getWALs().get(slaveId).entrySet()) {
238       logNumber += entry.getValue().size();
239     }
240     assertEquals(6, logNumber);
241 
242     wal.rollWriter();
243 
244     manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
245         "1", 0, false, false);
246 
247     wal.append(htd, hri,
248         new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
249         edit,
250         true);
251     wal.sync();
252 
253     assertEquals(1, manager.getWALs().size());
254 
255 
256     // TODO Need a case with only 2 WALs and we only want to delete the first one
257   }
258 
259   @Test
260   public void testClaimQueues() throws Exception {
261     LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
262     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
263     final Server server = new DummyServer("hostname0.example.org");
264     ReplicationQueues rq =
265         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
266           server);
267     rq.init(server.getServerName().toString());
268     // populate some znodes in the peer znode
269     files.add("log1");
270     files.add("log2");
271     for (String file : files) {
272       rq.addLog("1", file);
273     }
274     // create 3 DummyServers
275     Server s1 = new DummyServer("dummyserver1.example.org");
276     Server s2 = new DummyServer("dummyserver2.example.org");
277     Server s3 = new DummyServer("dummyserver3.example.org");
278 
279     // create 3 DummyNodeFailoverWorkers
280     DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
281         server.getServerName().getServerName(), s1);
282     DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
283         server.getServerName().getServerName(), s2);
284     DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
285         server.getServerName().getServerName(), s3);
286 
287     latch = new CountDownLatch(3);
288     // start the threads
289     w1.start();
290     w2.start();
291     w3.start();
292     // make sure only one is successful
293     int populatedMap = 0;
294     // wait for result now... till all the workers are done.
295     latch.await();
296     populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
297         + w3.isLogZnodesMapPopulated();
298     assertEquals(1, populatedMap);
299     server.abort("", null);
300   }
301 
302   @Test
303   public void testCleanupFailoverQueues() throws Exception {
304     final Server server = new DummyServer("hostname1.example.org");
305     ReplicationQueues rq =
306         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
307           server);
308     rq.init(server.getServerName().toString());
309     // populate some znodes in the peer znode
310     SortedSet<String> files = new TreeSet<String>();
311     String group = "testgroup";
312     String file1 = group + ".log1";
313     String file2 = group + ".log2";
314     files.add(file1);
315     files.add(file2);
316     for (String file : files) {
317       rq.addLog("1", file);
318     }
319     Server s1 = new DummyServer("dummyserver1.example.org");
320     ReplicationQueues rq1 =
321         ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
322     rq1.init(s1.getServerName().toString());
323     ReplicationPeers rp1 =
324         ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
325     rp1.init();
326     NodeFailoverWorker w1 =
327         manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
328             new Long(1), new Long(2)));
329     w1.start();
330     w1.join(5000);
331     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
332     String id = "1-" + server.getServerName().getServerName();
333     assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
334     manager.cleanOldLogs(file2, id, true);
335     // log1 should be deleted
336     assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
337   }
338 
339   @Test
340   public void testNodeFailoverDeadServerParsing() throws Exception {
341     LOG.debug("testNodeFailoverDeadServerParsing");
342     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
343     final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
344     ReplicationQueues repQueues =
345         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
346     repQueues.init(server.getServerName().toString());
347     // populate some znodes in the peer znode
348     files.add("log1");
349     files.add("log2");
350     for (String file : files) {
351       repQueues.addLog("1", file);
352     }
353 
354     // create 3 DummyServers
355     Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
356     Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
357     Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
358 
359     // simulate three servers fail sequentially
360     ReplicationQueues rq1 =
361         ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
362     rq1.init(s1.getServerName().toString());
363     SortedMap<String, SortedSet<String>> testMap =
364         rq1.claimQueues(server.getServerName().getServerName());
365     ReplicationQueues rq2 =
366         ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
367     rq2.init(s2.getServerName().toString());
368     testMap = rq2.claimQueues(s1.getServerName().getServerName());
369     ReplicationQueues rq3 =
370         ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
371     rq3.init(s3.getServerName().toString());
372     testMap = rq3.claimQueues(s2.getServerName().getServerName());
373 
374     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
375     List<String> result = replicationQueueInfo.getDeadRegionServers();
376 
377     // verify
378     assertTrue(result.contains(server.getServerName().getServerName()));
379     assertTrue(result.contains(s1.getServerName().getServerName()));
380     assertTrue(result.contains(s2.getServerName().getServerName()));
381 
382     server.abort("", null);
383   }
384 
385   @Test
386   public void testFailoverDeadServerCversionChange() throws Exception {
387     LOG.debug("testFailoverDeadServerCversionChange");
388 
389     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
390     final Server s0 = new DummyServer("cversion-change0.example.org");
391     ReplicationQueues repQueues =
392         ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
393     repQueues.init(s0.getServerName().toString());
394     // populate some znodes in the peer znode
395     files.add("log1");
396     files.add("log2");
397     for (String file : files) {
398       repQueues.addLog("1", file);
399     }
400     // simulate queue transfer
401     Server s1 = new DummyServer("cversion-change1.example.org");
402     ReplicationQueues rq1 =
403         ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
404     rq1.init(s1.getServerName().toString());
405 
406     ReplicationQueuesClient client =
407         ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
408 
409     int v0 = client.getQueuesZNodeCversion();
410     rq1.claimQueues(s0.getServerName().getServerName());
411     int v1 = client.getQueuesZNodeCversion();
412     // cversion should increased by 1 since a child node is deleted
413     assertEquals(v0 + 1, v1);
414 
415     s0.abort("", null);
416   }
417 
418   static class DummyNodeFailoverWorker extends Thread {
419     private SortedMap<String, SortedSet<String>> logZnodesMap;
420     Server server;
421     private String deadRsZnode;
422     ReplicationQueues rq;
423 
424     public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
425       this.deadRsZnode = znode;
426       this.server = s;
427       this.rq =
428           ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
429             server);
430       this.rq.init(this.server.getServerName().toString());
431     }
432 
433     @Override
434     public void run() {
435       try {
436         logZnodesMap = rq.claimQueues(deadRsZnode);
437         server.abort("Done with testing", null);
438       } catch (Exception e) {
439         LOG.error("Got exception while running NodeFailoverWorker", e);
440       } finally {
441         latch.countDown();
442       }
443     }
444 
445     /**
446      * @return 1 when the map is not empty.
447      */
448     private int isLogZnodesMapPopulated() {
449       Collection<SortedSet<String>> sets = logZnodesMap.values();
450       if (sets.size() > 1) {
451         throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
452       }
453       if (sets.size() == 1) {
454         SortedSet<String> s = sets.iterator().next();
455         for (String file : files) {
456           // at least one file was missing
457           if (!s.contains(file)) {
458             return 0;
459           }
460         }
461         return 1; // we found all the files
462       }
463       return 0;
464     }
465   }
466 
467   static class DummyServer implements Server {
468     String hostname;
469 
470     DummyServer() {
471       hostname = "hostname.example.org";
472     }
473 
474     DummyServer(String hostname) {
475       this.hostname = hostname;
476     }
477 
478     @Override
479     public Configuration getConfiguration() {
480       return conf;
481     }
482 
483     @Override
484     public ZooKeeperWatcher getZooKeeper() {
485       return zkw;
486     }
487 
488     @Override
489     public CoordinatedStateManager getCoordinatedStateManager() {
490       return null;
491     }
492     @Override
493     public ClusterConnection getConnection() {
494       return null;
495     }
496 
497     @Override
498     public MetaTableLocator getMetaTableLocator() {
499       return null;
500     }
501 
502     @Override
503     public ServerName getServerName() {
504       return ServerName.valueOf(hostname, 1234, 1L);
505     }
506 
507     @Override
508     public void abort(String why, Throwable e) {
509       // To change body of implemented methods use File | Settings | File Templates.
510     }
511 
512     @Override
513     public boolean isAborted() {
514       return false;
515     }
516 
517     @Override
518     public void stop(String why) {
519       // To change body of implemented methods use File | Settings | File Templates.
520     }
521 
522     @Override
523     public boolean isStopped() {
524       return false; // To change body of implemented methods use File | Settings | File Templates.
525     }
526 
527     @Override
528     public ChoreService getChoreService() {
529       return null;
530     }
531   }
532 }