View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  import java.lang.reflect.Field;
28  import java.util.ArrayList;
29  import java.util.Comparator;
30  import java.util.List;
31  import java.util.Set;
32  import java.util.UUID;
33  
34  import org.apache.commons.lang.mutable.MutableBoolean;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.CellScanner;
42  import org.apache.hadoop.hbase.Coprocessor;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HBaseTestingUtility;
45  import org.apache.hadoop.hbase.HColumnDescriptor;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.HRegionInfo;
48  import org.apache.hadoop.hbase.HTableDescriptor;
49  import org.apache.hadoop.hbase.KeyValue;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.client.Get;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
55  import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
56  import org.apache.hadoop.hbase.regionserver.HRegion;
57  import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
58  import org.apache.hadoop.hbase.testclassification.MediumTests;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.util.EnvironmentEdge;
61  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.Threads;
64  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
65  import org.apache.hadoop.hbase.wal.WAL;
66  import org.apache.hadoop.hbase.wal.WALKey;
67  import org.junit.After;
68  import org.junit.AfterClass;
69  import org.junit.Before;
70  import org.junit.BeforeClass;
71  import org.junit.Rule;
72  import org.junit.Test;
73  import org.junit.experimental.categories.Category;
74  import org.junit.rules.TestName;
75  
76  /**
77   * Provides FSHLog test cases.
78   */
79  @Category(MediumTests.class)
80  public class TestFSHLog {
81    private static final Log LOG = LogFactory.getLog(TestFSHLog.class);
82  
83    protected static Configuration conf;
84    protected static FileSystem fs;
85    protected static Path dir;
86    protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
87  
88    @Rule
89    public final TestName currentTest = new TestName();
90  
91    @Before
92    public void setUp() throws Exception {
93      FileStatus[] entries = fs.listStatus(new Path("/"));
94      for (FileStatus dir : entries) {
95        fs.delete(dir.getPath(), true);
96      }
97      final Path hbaseDir = TEST_UTIL.createRootDir();
98      dir = new Path(hbaseDir, currentTest.getMethodName());
99    }
100 
101   @After
102   public void tearDown() throws Exception {
103   }
104 
105   @BeforeClass
106   public static void setUpBeforeClass() throws Exception {
107     // Make block sizes small.
108     TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
109     // quicker heartbeat interval for faster DN death notification
110     TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
111     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
112     TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
113 
114     // faster failover with cluster.shutdown();fs.close() idiom
115     TEST_UTIL.getConfiguration()
116         .setInt("hbase.ipc.client.connect.max.retries", 1);
117     TEST_UTIL.getConfiguration().setInt(
118         "dfs.client.block.recovery.retries", 1);
119     TEST_UTIL.getConfiguration().setInt(
120       "hbase.ipc.client.connection.maxidletime", 500);
121     TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
122         SampleRegionWALObserver.class.getName());
123     TEST_UTIL.startMiniDFSCluster(3);
124 
125     conf = TEST_UTIL.getConfiguration();
126     fs = TEST_UTIL.getDFSCluster().getFileSystem();
127   }
128 
129   @AfterClass
130   public static void tearDownAfterClass() throws Exception {
131     TEST_UTIL.shutdownMiniCluster();
132   }
133 
134   /**
135    * A loaded WAL coprocessor won't break existing WAL test cases.
136    */
137   @Test
138   public void testWALCoprocessorLoaded() throws Exception {
139     // test to see whether the coprocessor is loaded or not.
140     FSHLog log = null;
141     try {
142       log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
143           HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
144       WALCoprocessorHost host = log.getCoprocessorHost();
145       Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
146       assertNotNull(c);
147     } finally {
148       if (log != null) {
149         log.close();
150       }
151     }
152   }
153 
154   protected void addEdits(WAL log,
155                           HRegionInfo hri,
156                           HTableDescriptor htd,
157                           int times,
158                           MultiVersionConcurrencyControl mvcc)
159       throws IOException {
160     final byte[] row = Bytes.toBytes("row");
161     for (int i = 0; i < times; i++) {
162       long timestamp = System.currentTimeMillis();
163       WALEdit cols = new WALEdit();
164       cols.add(new KeyValue(row, row, row, timestamp, row));
165       WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(),
166           WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
167           HConstants.NO_NONCE, mvcc);
168       log.append(htd, hri, key, cols, true);
169     }
170     log.sync();
171   }
172 
173   /**
174    * helper method to simulate region flush for a WAL.
175    * @param wal
176    * @param regionEncodedName
177    */
178   protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
179     wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
180     wal.completeCacheFlush(regionEncodedName);
181   }
182 
183   /**
184    * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
185    * exception if we do). Comparison is based on the timestamp present in the wal name.
186    * @throws Exception
187    */
188   @Test 
189   public void testWALComparator() throws Exception {
190     FSHLog wal1 = null;
191     FSHLog walMeta = null;
192     try {
193       wal1 = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
194           HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
195       LOG.debug("Log obtained is: " + wal1);
196       Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
197       Path p1 = wal1.computeFilename(11);
198       Path p2 = wal1.computeFilename(12);
199       // comparing with itself returns 0
200       assertTrue(comp.compare(p1, p1) == 0);
201       // comparing with different filenum.
202       assertTrue(comp.compare(p1, p2) < 0);
203       walMeta = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
204           HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
205           DefaultWALProvider.META_WAL_PROVIDER_ID);
206       Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
207 
208       Path p1WithMeta = walMeta.computeFilename(11);
209       Path p2WithMeta = walMeta.computeFilename(12);
210       assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
211       assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
212       // mixing meta and non-meta logs gives error
213       boolean ex = false;
214       try {
215         comp.compare(p1WithMeta, p2);
216       } catch (IllegalArgumentException e) {
217         ex = true;
218       }
219       assertTrue("Comparator doesn't complain while checking meta log files", ex);
220       boolean exMeta = false;
221       try {
222         compMeta.compare(p1WithMeta, p2);
223       } catch (IllegalArgumentException e) {
224         exMeta = true;
225       }
226       assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
227     } finally {
228       if (wal1 != null) {
229         wal1.close();
230       }
231       if (walMeta != null) {
232         walMeta.close();
233       }
234     }
235   }
236 
237   /**
238    * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the
239    * list of regions which should be flushed in order to archive the oldest wal file.
240    * <p>
241    * This method tests this behavior by inserting edits and rolling the wal enough times to reach
242    * the max number of logs threshold. It checks whether we get the "right regions" for flush on
243    * rolling the wal.
244    * @throws Exception
245    */
246   @Test 
247   public void testFindMemStoresEligibleForFlush() throws Exception {
248     LOG.debug("testFindMemStoresEligibleForFlush");
249     Configuration conf1 = HBaseConfiguration.create(conf);
250     conf1.setInt("hbase.regionserver.maxlogs", 1);
251     FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
252         HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
253     HTableDescriptor t1 =
254         new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
255     HTableDescriptor t2 =
256         new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
257     HRegionInfo hri1 =
258         new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
259     HRegionInfo hri2 =
260         new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
261     // add edits and roll the wal
262     MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
263     try {
264       addEdits(wal, hri1, t1, 2, mvcc);
265       wal.rollWriter();
266       // add some more edits and roll the wal. This would reach the log number threshold
267       addEdits(wal, hri1, t1, 2, mvcc);
268       wal.rollWriter();
269       // with above rollWriter call, the max logs limit is reached.
270       assertTrue(wal.getNumRolledLogFiles() == 2);
271 
272       // get the regions to flush; since there is only one region in the oldest wal, it should
273       // return only one region.
274       byte[][] regionsToFlush = wal.findRegionsToForceFlush();
275       assertEquals(1, regionsToFlush.length);
276       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
277       // insert edits in second region
278       addEdits(wal, hri2, t2, 2, mvcc);
279       // get the regions to flush, it should still read region1.
280       regionsToFlush = wal.findRegionsToForceFlush();
281       assertEquals(regionsToFlush.length, 1);
282       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
283       // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
284       // remain.
285       flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
286       wal.rollWriter();
287       // only one wal should remain now (that is for the second region).
288       assertEquals(1, wal.getNumRolledLogFiles());
289       // flush the second region
290       flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
291       wal.rollWriter(true);
292       // no wal should remain now.
293       assertEquals(0, wal.getNumRolledLogFiles());
294       // add edits both to region 1 and region 2, and roll.
295       addEdits(wal, hri1, t1, 2, mvcc);
296       addEdits(wal, hri2, t2, 2, mvcc);
297       wal.rollWriter();
298       // add edits and roll the writer, to reach the max logs limit.
299       assertEquals(1, wal.getNumRolledLogFiles());
300       addEdits(wal, hri1, t1, 2, mvcc);
301       wal.rollWriter();
302       // it should return two regions to flush, as the oldest wal file has entries
303       // for both regions.
304       regionsToFlush = wal.findRegionsToForceFlush();
305       assertEquals(2, regionsToFlush.length);
306       // flush both regions
307       flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
308       flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
309       wal.rollWriter(true);
310       assertEquals(0, wal.getNumRolledLogFiles());
311       // Add an edit to region1, and roll the wal.
312       addEdits(wal, hri1, t1, 2, mvcc);
313       // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
314       wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
315       wal.rollWriter();
316       wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
317       assertEquals(1, wal.getNumRolledLogFiles());
318     } finally {
319       if (wal != null) {
320         wal.close();
321       }
322     }
323   }
324 
325   @Test(expected=IOException.class)
326   public void testFailedToCreateWALIfParentRenamed() throws IOException {
327     final String name = "testFailedToCreateWALIfParentRenamed";
328     FSHLog log = new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME,
329         conf, null, true, null, null);
330     long filenum = System.currentTimeMillis();
331     Path path = log.computeFilename(filenum);
332     log.createWriterInstance(path);
333     Path parent = path.getParent();
334     path = log.computeFilename(filenum + 1);
335     Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
336     fs.rename(parent, newPath);
337     log.createWriterInstance(path);
338     fail("It should fail to create the new WAL");
339   }
340 
341   /**
342    * Test flush for sure has a sequence id that is beyond the last edit appended.  We do this
343    * by slowing appends in the background ring buffer thread while in foreground we call
344    * flush.  The addition of the sync over HRegion in flush should fix an issue where flush was
345    * returning before all of its appends had made it out to the WAL (HBASE-11109).
346    * @throws IOException
347    * @see HBASE-11109
348    */
349   @Test
350   public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
351     String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
352     final TableName tableName = TableName.valueOf(testName);
353     final HRegionInfo hri = new HRegionInfo(tableName);
354     final byte[] rowName = tableName.getName();
355     final HTableDescriptor htd = new HTableDescriptor(tableName);
356     htd.addFamily(new HColumnDescriptor("f"));
357     HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(),
358       TEST_UTIL.getConfiguration(), htd);
359     HRegion.closeHRegion(r);
360     final int countPerFamily = 10;
361     final MutableBoolean goslow = new MutableBoolean(false);
362     // subclass and doctor a method.
363     FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
364         testName, conf) {
365       @Override
366       void atHeadOfRingBufferEventHandlerAppend() {
367         if (goslow.isTrue()) {
368           Threads.sleep(100);
369           LOG.debug("Sleeping before appending 100ms");
370         }
371         super.atHeadOfRingBufferEventHandlerAppend();
372       }
373     };
374     HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
375       TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
376     EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
377     try {
378       List<Put> puts = null;
379       for (HColumnDescriptor hcd: htd.getFamilies()) {
380         puts =
381           TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
382       }
383 
384       // Now assert edits made it in.
385       final Get g = new Get(rowName);
386       Result result = region.get(g);
387       assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
388 
389       // Construct a WALEdit and add it a few times to the WAL.
390       WALEdit edits = new WALEdit();
391       for (Put p: puts) {
392         CellScanner cs = p.cellScanner();
393         while (cs.advance()) {
394           edits.add(cs.current());
395         }
396       }
397       // Add any old cluster id.
398       List<UUID> clusterIds = new ArrayList<UUID>();
399       clusterIds.add(UUID.randomUUID());
400       // Now make appends run slow.
401       goslow.setValue(true);
402       for (int i = 0; i < countPerFamily; i++) {
403         final HRegionInfo info = region.getRegionInfo();
404         final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
405             System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC());
406         wal.append(htd, info, logkey, edits, true);
407       }
408       region.flush(true);
409       // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
410       long currentSequenceId = region.getSequenceId();
411       // Now release the appends
412       goslow.setValue(false);
413       synchronized (goslow) {
414         goslow.notifyAll();
415       }
416       assertTrue(currentSequenceId >= region.getSequenceId());
417     } finally {
418       region.close(true);
419       wal.close();
420     }
421   }
422 
423   @Test
424   public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
425       SecurityException, IllegalArgumentException, IllegalAccessException {
426     final String name = "testSyncRunnerIndexOverflow";
427     FSHLog log =
428         new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
429             null, true, null, null);
430     try {
431       Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
432       ringBufferEventHandlerField.setAccessible(true);
433       FSHLog.RingBufferEventHandler ringBufferEventHandler =
434           (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
435       Field syncRunnerIndexField =
436           FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex");
437       syncRunnerIndexField.setAccessible(true);
438       syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
439       HTableDescriptor htd =
440           new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
441       HRegionInfo hri =
442           new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
443       MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
444       for (int i = 0; i < 10; i++) {
445         addEdits(log, hri, htd, 1, mvcc);
446       }
447     } finally {
448       log.close();
449     }
450   }
451 }