View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.fs;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.lang.reflect.Field;
24  import java.lang.reflect.InvocationTargetException;
25  import java.lang.reflect.Method;
26  import java.net.BindException;
27  import java.net.ServerSocket;
28  import java.util.List;
29  import java.util.concurrent.CountDownLatch;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.commons.logging.impl.Log4JLogger;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.BlockLocation;
36  import org.apache.hadoop.fs.FSDataInputStream;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.ipc.RemoteException;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.testclassification.LargeTests;
45  import org.apache.hadoop.hbase.MiniHBaseCluster;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.Table;
49  import org.apache.hadoop.hbase.regionserver.HRegion;
50  import org.apache.hadoop.hbase.regionserver.HRegionServer;
51  import org.apache.hadoop.hbase.regionserver.Region;
52  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
53  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
54  import org.apache.hadoop.hbase.util.FSUtils;
55  import org.apache.hadoop.hdfs.DFSClient;
56  import org.apache.hadoop.hdfs.DistributedFileSystem;
57  import org.apache.hadoop.hdfs.MiniDFSCluster;
58  import org.apache.hadoop.hdfs.protocol.ClientProtocol;
59  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
60  import org.apache.hadoop.hdfs.protocol.DirectoryListing;
61  import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
62  import org.apache.hadoop.hdfs.protocol.LocatedBlock;
63  import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
64  import org.apache.hadoop.hdfs.server.datanode.DataNode;
65  import org.apache.log4j.Level;
66  import org.junit.After;
67  import org.junit.Assert;
68  import org.junit.Before;
69  import org.junit.Test;
70  import org.junit.experimental.categories.Category;
71  
72  /**
73   * Tests for the hdfs fix from HBASE-6435.
74   */
75  @Category(LargeTests.class)
76  public class TestBlockReorder {
77    private static final Log LOG = LogFactory.getLog(TestBlockReorder.class);
78  
79    static {
80      ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
81      ((Log4JLogger) HFileSystem.LOG).getLogger().setLevel(Level.ALL);
82    }
83  
84    private Configuration conf;
85    private MiniDFSCluster cluster;
86    private HBaseTestingUtility htu;
87    private DistributedFileSystem dfs;
88    private static final String host1 = "host1";
89    private static final String host2 = "host2";
90    private static final String host3 = "host3";
91  
92    @Before
93    public void setUp() throws Exception {
94      htu = new HBaseTestingUtility();
95      htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
96      htu.getConfiguration().setBoolean("dfs.support.append", true);
97      htu.getConfiguration().setInt("dfs.replication", 3);
98      htu.startMiniDFSCluster(3,
99          new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
100 
101     conf = htu.getConfiguration();
102     cluster = htu.getDFSCluster();
103     dfs = (DistributedFileSystem) FileSystem.get(conf);
104   }
105 
106   @After
107   public void tearDownAfterClass() throws Exception {
108     htu.shutdownMiniCluster();
109   }
110 
111   /**
112    * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS.
113    */
114   @Test
115   public void testBlockLocationReorder() throws Exception {
116     Path p = new Path("hello");
117 
118     Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
119     final int repCount = 2;
120 
121     // Let's write the file
122     FSDataOutputStream fop = dfs.create(p, (short) repCount);
123     final double toWrite = 875.5613;
124     fop.writeDouble(toWrite);
125     fop.close();
126 
127     // Let's check we can read it when everybody's there
128     long start = System.currentTimeMillis();
129     FSDataInputStream fin = dfs.open(p);
130     Assert.assertTrue(toWrite == fin.readDouble());
131     long end = System.currentTimeMillis();
132     LOG.info("readtime= " + (end - start));
133     fin.close();
134     Assert.assertTrue((end - start) < 30 * 1000);
135 
136     // Let's kill the first location. But actually the fist location returned will change
137     // The first thing to do is to get the location, then the port
138     FileStatus f = dfs.getFileStatus(p);
139     BlockLocation[] lbs;
140     do {
141       lbs = dfs.getFileBlockLocations(f, 0, 1);
142     } while (lbs.length != 1 && lbs[0].getLength() != repCount);
143     final String name = lbs[0].getNames()[0];
144     Assert.assertTrue(name.indexOf(':') > 0);
145     String portS = name.substring(name.indexOf(':') + 1);
146     final int port = Integer.parseInt(portS);
147     LOG.info("port= " + port);
148     int ipcPort = -1;
149 
150     // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need
151     // to iterate ourselves.
152     boolean ok = false;
153     final String lookup = lbs[0].getHosts()[0];
154     StringBuilder sb = new StringBuilder();
155     for (DataNode dn : cluster.getDataNodes()) {
156       final String dnName = getHostName(dn);
157       sb.append(dnName).append(' ');
158       if (lookup.equals(dnName)) {
159         ok = true;
160         LOG.info("killing datanode " + name + " / " + lookup);
161         ipcPort = dn.ipcServer.getListenerAddress().getPort();
162         dn.shutdown();
163         LOG.info("killed datanode " + name + " / " + lookup);
164         break;
165       }
166     }
167     Assert.assertTrue(
168         "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok);
169     LOG.info("ipc port= " + ipcPort);
170 
171     // Add the hook, with an implementation checking that we don't use the port we've just killed.
172     Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf,
173         new HFileSystem.ReorderBlocks() {
174           @Override
175           public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
176             for (LocatedBlock lb : lbs.getLocatedBlocks()) {
177               if (lb.getLocations().length > 1) {
178                 DatanodeInfo[] infos = lb.getLocations();
179                 if (infos[0].getHostName().equals(lookup)) {
180                   LOG.info("HFileSystem bad host, inverting");
181                   DatanodeInfo tmp = infos[0];
182                   infos[0] = infos[1];
183                   infos[1] = tmp;
184                 }
185               }
186             }
187           }
188         }));
189 
190 
191     final int retries = 10;
192     ServerSocket ss = null;
193     ServerSocket ssI;
194     try {
195       ss = new ServerSocket(port);// We're taking the port to have a timeout issue later.
196       ssI = new ServerSocket(ipcPort);
197     } catch (BindException be) {
198       LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort +
199           ", this means that the datanode has not closed the socket or" +
200           " someone else took it. It may happen, skipping this test for this time.", be);
201       if (ss != null) {
202         ss.close();
203       }
204       return;
205     }
206 
207     // Now it will fail with a timeout, unfortunately it does not always connect to the same box,
208     // so we try retries times;  with the reorder it will never last more than a few milli seconds
209     for (int i = 0; i < retries; i++) {
210       start = System.currentTimeMillis();
211 
212       fin = dfs.open(p);
213       Assert.assertTrue(toWrite == fin.readDouble());
214       fin.close();
215       end = System.currentTimeMillis();
216       LOG.info("HFileSystem readtime= " + (end - start));
217       Assert.assertFalse("We took too much time to read", (end - start) > 60000);
218     }
219 
220     ss.close();
221     ssI.close();
222   }
223 
224   /**
225    * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2)
226    */
227   private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
228     Method m;
229     try {
230       m = DataNode.class.getMethod("getDisplayName");
231     } catch (NoSuchMethodException e) {
232       try {
233         m = DataNode.class.getMethod("getHostName");
234       } catch (NoSuchMethodException e1) {
235         throw new RuntimeException(e1);
236       }
237     }
238 
239     String res = (String) m.invoke(dn);
240     if (res.contains(":")) {
241       return res.split(":")[0];
242     } else {
243       return res;
244     }
245   }
246 
247   /**
248    * Test that the hook works within HBase, including when there are multiple blocks.
249    */
250   @Test()
251   public void testHBaseCluster() throws Exception {
252     byte[] sb = "sb".getBytes();
253     htu.startMiniZKCluster();
254 
255     MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
256     hbm.waitForActiveAndReadyMaster();
257     hbm.getRegionServer(0).waitForServerOnline();
258     HRegionServer targetRs = hbm.getRegionServer(0);
259 
260     // We want to have a datanode with the same name as the region server, so
261     //  we're going to get the regionservername, and start a new datanode with this name.
262     String host4 = targetRs.getServerName().getHostname();
263     LOG.info("Starting a new datanode with the name=" + host4);
264     cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null);
265     cluster.waitClusterUp();
266 
267     final int repCount = 3;
268 
269     // We use the regionserver file system & conf as we expect it to have the hook.
270     conf = targetRs.getConfiguration();
271     HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
272     Table h = htu.createTable(TableName.valueOf("table"), sb);
273 
274     // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode
275     // with the same node will be used. We can't really stop an existing datanode, this would
276     // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times.
277 
278     // Now we need to find the log file, its locations, and look at it
279 
280     String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
281             "/" + targetRs.getServerName().toString()).toUri().getPath();
282 
283     DistributedFileSystem mdfs = (DistributedFileSystem)
284         hbm.getMaster().getMasterFileSystem().getFileSystem();
285 
286 
287     int nbTest = 0;
288     while (nbTest < 10) {
289       final List<Region> regions = targetRs.getOnlineRegions(h.getName());
290       final CountDownLatch latch = new CountDownLatch(regions.size());
291       // listen for successful log rolls
292       final WALActionsListener listener = new WALActionsListener.Base() {
293             @Override
294             public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
295               latch.countDown();
296             }
297           };
298       for (Region region : regions) {
299         ((HRegion)region).getWAL().registerWALActionsListener(listener);
300       }
301 
302       htu.getHBaseAdmin().rollWALWriter(targetRs.getServerName());
303 
304       // wait
305       try {
306         latch.await();
307       } catch (InterruptedException exception) {
308         LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " +
309             "tests fail, it's probably because we should still be waiting.");
310         Thread.currentThread().interrupt();
311       }
312       for (Region region : regions) {
313         ((HRegion)region).getWAL().unregisterWALActionsListener(listener);
314       }
315 
316       // We need a sleep as the namenode is informed asynchronously
317       Thread.sleep(100);
318 
319       // insert one put to ensure a minimal size
320       Put p = new Put(sb);
321       p.add(sb, sb, sb);
322       h.put(p);
323 
324       DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
325       HdfsFileStatus[] hfs = dl.getPartialListing();
326 
327       // As we wrote a put, we should have at least one log file.
328       Assert.assertTrue(hfs.length >= 1);
329       for (HdfsFileStatus hf : hfs) {
330         // Because this is a live cluster, log files might get archived while we're processing
331         try {
332           LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
333           String logFile = rootDir + "/" + hf.getLocalName();
334           FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
335 
336           LOG.info("Checking log file: " + logFile);
337           // Now checking that the hook is up and running
338           // We can't call directly getBlockLocations, it's not available in HFileSystem
339           // We're trying multiple times to be sure, as the order is random
340 
341           BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
342           if (bls.length > 0) {
343             BlockLocation bl = bls[0];
344 
345             LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
346             for (int i = 0; i < bl.getHosts().length - 1; i++) {
347               LOG.info(bl.getHosts()[i] + "    " + logFile);
348               Assert.assertNotSame(bl.getHosts()[i], host4);
349             }
350             String last = bl.getHosts()[bl.getHosts().length - 1];
351             LOG.info(last + "    " + logFile);
352             if (host4.equals(last)) {
353               nbTest++;
354               LOG.info(logFile + " is on the new datanode and is ok");
355               if (bl.getHosts().length == 3) {
356                 // We can test this case from the file system as well
357                 // Checking the underlying file system. Multiple times as the order is random
358                 testFromDFS(dfs, logFile, repCount, host4);
359 
360                 // now from the master
361                 testFromDFS(mdfs, logFile, repCount, host4);
362               }
363             }
364           }
365         } catch (FileNotFoundException exception) {
366           LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
367               "archived out from under us so we'll ignore and retry. If this test hangs " +
368               "indefinitely you should treat this failure as a symptom.", exception);
369         } catch (RemoteException exception) {
370           if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
371             LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
372                 "archived out from under us so we'll ignore and retry. If this test hangs " +
373                 "indefinitely you should treat this failure as a symptom.", exception);
374           } else {
375             throw exception;
376           }
377         }
378       }
379     }
380   }
381 
382   private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
383       throws Exception {
384     // Multiple times as the order is random
385     for (int i = 0; i < 10; i++) {
386       LocatedBlocks l;
387       // The NN gets the block list asynchronously, so we may need multiple tries to get the list
388       final long max = System.currentTimeMillis() + 10000;
389       boolean done;
390       do {
391         Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max);
392         l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
393         Assert.assertNotNull("Can't get block locations for " + src, l);
394         Assert.assertNotNull(l.getLocatedBlocks());
395         Assert.assertTrue(l.getLocatedBlocks().size() > 0);
396 
397         done = true;
398         for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
399           done = (l.get(y).getLocations().length == repCount);
400         }
401       } while (!done);
402 
403       for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
404         Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName());
405       }
406     }
407   }
408 
409   private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
410     Field nf = DFSClient.class.getDeclaredField("namenode");
411     nf.setAccessible(true);
412     return (ClientProtocol) nf.get(dfsc);
413   }
414 
415   /**
416    * Test that the reorder algo works as we expect.
417    */
418   @Test
419   public void testBlockLocation() throws Exception {
420     // We need to start HBase to get  HConstants.HBASE_DIR set in conf
421     htu.startMiniZKCluster();
422     MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
423     conf = hbm.getConfiguration();
424 
425 
426     // The "/" is mandatory, without it we've got a null pointer exception on the namenode
427     final String fileName = "/helloWorld";
428     Path p = new Path(fileName);
429 
430     final int repCount = 3;
431     Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount);
432 
433     // Let's write the file
434     FSDataOutputStream fop = dfs.create(p, (short) repCount);
435     final double toWrite = 875.5613;
436     fop.writeDouble(toWrite);
437     fop.close();
438 
439     for (int i=0; i<10; i++){
440       // The interceptor is not set in this test, so we get the raw list at this point
441       LocatedBlocks l;
442       final long max = System.currentTimeMillis() + 10000;
443       do {
444         l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1);
445         Assert.assertNotNull(l.getLocatedBlocks());
446         Assert.assertEquals(l.getLocatedBlocks().size(), 1);
447         Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length,
448             System.currentTimeMillis() < max);
449       } while (l.get(0).getLocations().length != repCount);
450 
451       // Should be filtered, the name is different => The order won't change
452       Object originalList[] = l.getLocatedBlocks().toArray();
453       HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks();
454       lrb.reorderBlocks(conf, l, fileName);
455       Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray());
456 
457       // Should be reordered, as we pretend to be a file name with a compliant stuff
458       Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
459       Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty());
460       String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" +
461           HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile";
462 
463       // Check that it will be possible to extract a ServerName from our construction
464       Assert.assertNotNull("log= " + pseudoLogFile,
465           DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile));
466 
467       // And check we're doing the right reorder.
468       lrb.reorderBlocks(conf, l, pseudoLogFile);
469       Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
470 
471       // Check again, it should remain the same.
472       lrb.reorderBlocks(conf, l, pseudoLogFile);
473       Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
474     }
475   }
476 
477 }