View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertNotNull;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.lang.ref.SoftReference;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.fs.FSDataInputStream;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.FilterFileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.fs.PositionedReadable;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.MiniHBaseCluster;
45  import org.apache.hadoop.hbase.client.Table;
46  import org.apache.hadoop.hbase.testclassification.MediumTests;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Admin;
49  import org.apache.hadoop.hbase.client.HBaseAdmin;
50  import org.apache.hadoop.hbase.fs.HFileSystem;
51  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
52  import org.apache.hadoop.hbase.io.hfile.HFileContext;
53  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
54  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.junit.Assume;
57  import org.junit.Test;
58  import org.junit.experimental.categories.Category;
59  
60  /**
61   * Test cases that ensure that file system level errors are bubbled up
62   * appropriately to clients, rather than swallowed.
63   */
64  @Category(MediumTests.class)
65  public class TestFSErrorsExposed {
66    private static final Log LOG = LogFactory.getLog(TestFSErrorsExposed.class);
67  
68    HBaseTestingUtility util = new HBaseTestingUtility();
69  
70    /**
71     * Injects errors into the pread calls of an on-disk file, and makes
72     * sure those bubble up to the HFile scanner
73     */
74    @Test
75    public void testHFileScannerThrowsErrors() throws IOException {
76      Path hfilePath = new Path(new Path(
77          util.getDataTestDir("internalScannerExposesErrors"),
78          "regionname"), "familyname");
79      HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
80      FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
81      FileSystem fs = new HFileSystem(faultyfs);
82      CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
83      HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
84      StoreFile.Writer writer = new StoreFile.WriterBuilder(
85          util.getConfiguration(), cacheConf, hfs)
86              .withOutputDir(hfilePath)
87              .withFileContext(meta)
88              .build();
89      TestStoreFile.writeStoreFile(
90          writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
91  
92      StoreFile sf = new StoreFile(fs, writer.getPath(),
93        util.getConfiguration(), cacheConf, BloomType.NONE);
94  
95      StoreFile.Reader reader = sf.createReader();
96      HFileScanner scanner = reader.getScanner(false, true);
97  
98      FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
99      assertNotNull(inStream);
100 
101     scanner.seekTo();
102     // Do at least one successful read
103     assertTrue(scanner.next());
104 
105     faultyfs.startFaults();
106 
107     try {
108       int scanned=0;
109       while (scanner.next()) {
110         scanned++;
111       }
112       fail("Scanner didn't throw after faults injected");
113     } catch (IOException ioe) {
114       LOG.info("Got expected exception", ioe);
115       assertTrue(ioe.getMessage().contains("Fault"));
116     }
117     reader.close(true); // end of test so evictOnClose
118   }
119 
120   /**
121    * Injects errors into the pread calls of an on-disk file, and makes
122    * sure those bubble up to the StoreFileScanner
123    */
124   @Test
125   public void testStoreFileScannerThrowsErrors() throws IOException {
126     Path hfilePath = new Path(new Path(
127         util.getDataTestDir("internalScannerExposesErrors"),
128         "regionname"), "familyname");
129     HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
130     FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
131     HFileSystem fs = new HFileSystem(faultyfs);
132     CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
133     HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
134     StoreFile.Writer writer = new StoreFile.WriterBuilder(
135         util.getConfiguration(), cacheConf, hfs)
136             .withOutputDir(hfilePath)
137             .withFileContext(meta)
138             .build();
139     TestStoreFile.writeStoreFile(
140         writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
141 
142     StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(),
143       cacheConf, BloomType.NONE);
144 
145     List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
146         Collections.singletonList(sf), false, true, false, false,
147         // 0 is passed as readpoint because this test operates on StoreFile directly
148         0);
149     KeyValueScanner scanner = scanners.get(0);
150 
151     FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
152     assertNotNull(inStream);
153 
154     scanner.seek(KeyValue.LOWESTKEY);
155     // Do at least one successful read
156     assertNotNull(scanner.next());
157     faultyfs.startFaults();
158 
159     try {
160       int scanned=0;
161       while (scanner.next() != null) {
162         scanned++;
163       }
164       fail("Scanner didn't throw after faults injected");
165     } catch (IOException ioe) {
166       LOG.info("Got expected exception", ioe);
167       assertTrue(ioe.getMessage().contains("Could not iterate"));
168     }
169     scanner.close();
170   }
171 
172   /**
173    * Cluster test which starts a region server with a region, then
174    * removes the data from HDFS underneath it, and ensures that
175    * errors are bubbled to the client.
176    */
177   @Test(timeout=5 * 60 * 1000)
178   public void testFullSystemBubblesFSErrors() throws Exception {
179     // We won't have an error if the datanode is not there if we use short circuit
180     //  it's a known 'feature'.
181     Assume.assumeTrue(!util.isReadShortCircuitOn());
182 
183     try {
184       // Make it fail faster.
185       util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
186 
187       util.startMiniCluster(1);
188       TableName tableName = TableName.valueOf("table");
189       byte[] fam = Bytes.toBytes("fam");
190 
191       Admin admin = new HBaseAdmin(util.getConfiguration());
192       HTableDescriptor desc = new HTableDescriptor(tableName);
193       desc.addFamily(new HColumnDescriptor(fam)
194           .setMaxVersions(1)
195           .setBlockCacheEnabled(false)
196       );
197       admin.createTable(desc);
198 
199       // Make a new Configuration so it makes a new connection that has the
200       // above configuration on it; else we use the old one w/ 10 as default.
201       try (Table table = util.getConnection().getTable(tableName)) {
202         // Load some data
203         util.loadTable(table, fam, false);
204         util.flush();
205         util.countRows(table);
206 
207         // Kill the DFS cluster
208         util.getDFSCluster().shutdownDataNodes();
209 
210         try {
211           util.countRows(table);
212           fail("Did not fail to count after removing data");
213         } catch (Exception e) {
214           LOG.info("Got expected error", e);
215           assertTrue(e.getMessage().contains("Could not seek"));
216         }
217       }
218 
219       // Restart data nodes so that HBase can shut down cleanly.
220       util.getDFSCluster().restartDataNodes();
221 
222     } finally {
223       MiniHBaseCluster cluster = util.getMiniHBaseCluster();
224       if (cluster != null) cluster.killAll();
225       util.shutdownMiniCluster();
226     }
227   }
228 
229   static class FaultyFileSystem extends FilterFileSystem {
230     List<SoftReference<FaultyInputStream>> inStreams =
231       new ArrayList<SoftReference<FaultyInputStream>>();
232 
233     public FaultyFileSystem(FileSystem testFileSystem) {
234       super(testFileSystem);
235     }
236 
237     @Override
238     public FSDataInputStream open(Path p, int bufferSize) throws IOException  {
239       FSDataInputStream orig = fs.open(p, bufferSize);
240       FaultyInputStream faulty = new FaultyInputStream(orig);
241       inStreams.add(new SoftReference<FaultyInputStream>(faulty));
242       return faulty;
243     }
244 
245     /**
246      * Starts to simulate faults on all streams opened so far
247      */
248     public void startFaults() {
249       for (SoftReference<FaultyInputStream> is: inStreams) {
250         is.get().startFaults();
251       }
252     }
253   }
254 
255   static class FaultyInputStream extends FSDataInputStream {
256     boolean faultsStarted = false;
257 
258     public FaultyInputStream(InputStream in) throws IOException {
259       super(in);
260     }
261 
262     public void startFaults() {
263       faultsStarted = true;
264     }
265 
266     @Override
267     public int read(long position, byte[] buffer, int offset, int length)
268       throws IOException {
269       injectFault();
270       return ((PositionedReadable)in).read(position, buffer, offset, length);
271     }
272 
273     private void injectFault() throws IOException {
274       if (faultsStarted) {
275         throw new IOException("Fault injected");
276       }
277     }
278   }
279 
280 
281 
282 }