View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.io.EOFException;
27  import java.io.IOException;
28  import java.util.ArrayList;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Set;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.testclassification.LargeTests;
45  import org.apache.hadoop.hbase.MiniHBaseCluster;
46  import org.apache.hadoop.hbase.ServerName;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Admin;
49  import org.apache.hadoop.hbase.client.Get;
50  import org.apache.hadoop.hbase.client.HTable;
51  import org.apache.hadoop.hbase.client.Put;
52  import org.apache.hadoop.hbase.client.Result;
53  import org.apache.hadoop.hbase.client.ResultScanner;
54  import org.apache.hadoop.hbase.client.Scan;
55  import org.apache.hadoop.hbase.client.Table;
56  import org.apache.hadoop.hbase.fs.HFileSystem;
57  import org.apache.hadoop.hbase.regionserver.HRegionServer;
58  import org.apache.hadoop.hbase.regionserver.Region;
59  import org.apache.hadoop.hbase.regionserver.Store;
60  import org.apache.hadoop.hbase.util.Bytes;
61  import org.apache.hadoop.hbase.util.FSUtils;
62  import org.apache.hadoop.hbase.util.JVMClusterUtil;
63  import org.apache.hadoop.hbase.util.Threads;
64  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
65  import org.apache.hadoop.hbase.wal.WAL;
66  import org.apache.hadoop.hbase.wal.WALFactory;
67  import org.apache.hadoop.hdfs.MiniDFSCluster;
68  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
69  import org.apache.hadoop.hdfs.server.datanode.DataNode;
70  import org.junit.After;
71  import org.junit.Assert;
72  import org.junit.Before;
73  import org.junit.BeforeClass;
74  import org.junit.Test;
75  import org.junit.experimental.categories.Category;
76  
77  /**
78   * Test log deletion as logs are rolled.
79   */
80  @Category(LargeTests.class)
81  public class TestLogRolling  {
82    private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
83    private HRegionServer server;
84    private String tableName;
85    private byte[] value;
86    private FileSystem fs;
87    private MiniDFSCluster dfsCluster;
88    private Admin admin;
89    private MiniHBaseCluster cluster;
90    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
91  
92    public TestLogRolling()  {
93      this.server = null;
94      this.tableName = null;
95  
96      String className = this.getClass().getName();
97      StringBuilder v = new StringBuilder(className);
98      while (v.length() < 1000) {
99        v.append(className);
100     }
101     this.value = Bytes.toBytes(v.toString());
102   }
103 
104   // Need to override this setup so we can edit the config before it gets sent
105   // to the HDFS & HBase cluster startup.
106   @BeforeClass
107   public static void setUpBeforeClass() throws Exception {
108     // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
109     // profile. See HBASE-9337 for related issues.
110     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
111 
112     /**** configuration for testLogRolling ****/
113     // Force a region split after every 768KB
114     TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
115 
116     // We roll the log after every 32 writes
117     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
118 
119     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
120     TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
121 
122     // For less frequently updated regions flush after every 2 flushes
123     TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
124 
125     // We flush the cache after every 8192 bytes
126     TEST_UTIL.getConfiguration().setInt(
127         HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
128 
129     // Increase the amount of time between client retries
130     TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
131 
132     // Reduce thread wake frequency so that other threads can get
133     // a chance to run.
134     TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
135 
136    /**** configuration for testLogRollOnDatanodeDeath ****/
137    // make sure log.hflush() calls syncFs() to open a pipeline
138     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
139    // lower the namenode & datanode heartbeat so the namenode
140    // quickly detects datanode failures
141     TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
142     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
143     // the namenode might still try to choose the recently-dead datanode
144     // for a pipeline, so try to a new pipeline multiple times
145      TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
146     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
147     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
148   }
149 
150   @Before
151   public void setUp() throws Exception {
152     TEST_UTIL.startMiniCluster(1, 1, 2);
153 
154     cluster = TEST_UTIL.getHBaseCluster();
155     dfsCluster = TEST_UTIL.getDFSCluster();
156     fs = TEST_UTIL.getTestFileSystem();
157     admin = TEST_UTIL.getHBaseAdmin();
158 
159     // disable region rebalancing (interferes with log watching)
160     cluster.getMaster().balanceSwitch(false);
161   }
162 
163   @After
164   public void tearDown() throws Exception  {
165     TEST_UTIL.shutdownMiniCluster();
166   }
167 
168   private void startAndWriteData() throws IOException, InterruptedException {
169     // When the hbase:meta table can be opened, the region servers are running
170     new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
171     this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
172 
173     Table table = createTestTable(this.tableName);
174 
175     server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
176     for (int i = 1; i <= 256; i++) {    // 256 writes should cause 8 log rolls
177       doPut(table, i);
178       if (i % 32 == 0) {
179         // After every 32 writes sleep to let the log roller run
180         try {
181           Thread.sleep(2000);
182         } catch (InterruptedException e) {
183           // continue
184         }
185       }
186     }
187   }
188 
189   /**
190    * Tests that log rolling doesn't hang when no data is written.
191    */
192   @Test(timeout=120000)
193   public void testLogRollOnNothingWritten() throws Exception {
194     final Configuration conf = TEST_UTIL.getConfiguration();
195     final WALFactory wals = new WALFactory(conf, null,
196         ServerName.valueOf("test.com",8080, 1).toString());
197     final WAL newLog = wals.getWAL(new byte[]{});
198     try {
199       // Now roll the log before we write anything.
200       newLog.rollWriter(true);
201     } finally {
202       wals.close();
203     }
204   }
205 
206   /**
207    * Tests that logs are deleted
208    * @throws IOException
209    * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
210    */
211   @Test
212   public void testLogRolling() throws Exception {
213     this.tableName = getName();
214       // TODO: Why does this write data take for ever?
215       startAndWriteData();
216     final WAL log = server.getWAL(null);
217     LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) +
218         " log files");
219 
220       // flush all regions
221       for (Region r: server.getOnlineRegionsLocalContext()) {
222         r.flush(true);
223       }
224 
225       // Now roll the log
226       log.rollWriter();
227 
228     int count = DefaultWALProvider.getNumRolledLogFiles(log);
229     LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
230       assertTrue(("actual count: " + count), count <= 2);
231   }
232 
233   private static String getName() {
234     return "TestLogRolling";
235   }
236 
237   void writeData(Table table, int rownum) throws IOException {
238     doPut(table, rownum);
239 
240     // sleep to let the log roller run (if it needs to)
241     try {
242       Thread.sleep(2000);
243     } catch (InterruptedException e) {
244       // continue
245     }
246   }
247 
248   void validateData(Table table, int rownum) throws IOException {
249     String row = "row" + String.format("%1$04d", rownum);
250     Get get = new Get(Bytes.toBytes(row));
251     get.addFamily(HConstants.CATALOG_FAMILY);
252     Result result = table.get(get);
253     assertTrue(result.size() == 1);
254     assertTrue(Bytes.equals(value,
255                 result.getValue(HConstants.CATALOG_FAMILY, null)));
256     LOG.info("Validated row " + row);
257   }
258 
259   void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
260       throws IOException {
261     for (int i = 0; i < 10; i++) {
262       Put put = new Put(Bytes.toBytes("row"
263           + String.format("%1$04d", (start + i))));
264       put.add(HConstants.CATALOG_FAMILY, null, value);
265       table.put(put);
266     }
267     Put tmpPut = new Put(Bytes.toBytes("tmprow"));
268     tmpPut.add(HConstants.CATALOG_FAMILY, null, value);
269     long startTime = System.currentTimeMillis();
270     long remaining = timeout;
271     while (remaining > 0) {
272       if (log.isLowReplicationRollEnabled() == expect) {
273         break;
274       } else {
275         // Trigger calling FSHlog#checkLowReplication()
276         table.put(tmpPut);
277         try {
278           Thread.sleep(200);
279         } catch (InterruptedException e) {
280           // continue
281         }
282         remaining = timeout - (System.currentTimeMillis() - startTime);
283       }
284     }
285   }
286 
287   /**
288    * Tests that logs are rolled upon detecting datanode death
289    * Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
290    */
291   @Test
292   public void testLogRollOnDatanodeDeath() throws Exception {
293     TEST_UTIL.ensureSomeRegionServersAvailable(2);
294     assertTrue("This test requires WAL file replication set to 2.",
295       fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2);
296     LOG.info("Replication=" +
297       fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
298 
299     this.server = cluster.getRegionServer(0);
300 
301     // Create the test table and open it
302     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(getName()));
303     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
304 
305     admin.createTable(desc);
306     Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
307     assertTrue(((HTable) table).isAutoFlush());
308 
309     server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
310     final FSHLog log = (FSHLog) server.getWAL(null);
311     final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
312 
313     log.registerWALActionsListener(new WALActionsListener.Base() {
314       @Override
315       public void logRollRequested(boolean lowReplication) {
316         if (lowReplication) {
317           lowReplicationHookCalled.lazySet(true);
318         }
319       }
320     });
321 
322     // don't run this test without append support (HDFS-200 & HDFS-142)
323     assertTrue("Need append support for this test", FSUtils
324         .isAppendSupported(TEST_UTIL.getConfiguration()));
325 
326     // add up the datanode count, to ensure proper replication when we kill 1
327     // This function is synchronous; when it returns, the dfs cluster is active
328     // We start 3 servers and then stop 2 to avoid a directory naming conflict
329     //  when we stop/start a namenode later, as mentioned in HBASE-5163
330     List<DataNode> existingNodes = dfsCluster.getDataNodes();
331     int numDataNodes = 3;
332     dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true,
333         null, null);
334     List<DataNode> allNodes = dfsCluster.getDataNodes();
335     for (int i = allNodes.size()-1; i >= 0; i--) {
336       if (existingNodes.contains(allNodes.get(i))) {
337         dfsCluster.stopDataNode( i );
338       }
339     }
340 
341     assertTrue("DataNodes " + dfsCluster.getDataNodes().size() +
342         " default replication " +
343         fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()),
344     dfsCluster.getDataNodes().size() >=
345       fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1);
346 
347     writeData(table, 2);
348 
349     long curTime = System.currentTimeMillis();
350     LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName());
351     long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
352     assertTrue("Log should have a timestamp older than now",
353         curTime > oldFilenum && oldFilenum != -1);
354 
355     assertTrue("The log shouldn't have rolled yet",
356         oldFilenum == DefaultWALProvider.extractFileNumFromWAL(log));
357     final DatanodeInfo[] pipeline = log.getPipeLine();
358     assertTrue(pipeline.length ==
359         fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
360 
361     // kill a datanode in the pipeline to force a log roll on the next sync()
362     // This function is synchronous, when it returns the node is killed.
363     assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
364 
365     // this write should succeed, but trigger a log roll
366     writeData(table, 2);
367     long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
368 
369     assertTrue("Missing datanode should've triggered a log roll",
370         newFilenum > oldFilenum && newFilenum > curTime);
371 
372     assertTrue("The log rolling hook should have been called with the low replication flag",
373         lowReplicationHookCalled.get());
374 
375     // write some more log data (this should use a new hdfs_out)
376     writeData(table, 3);
377     assertTrue("The log should not roll again.",
378         DefaultWALProvider.extractFileNumFromWAL(log) == newFilenum);
379     // kill another datanode in the pipeline, so the replicas will be lower than
380     // the configured value 2.
381     assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
382 
383     batchWriteAndWait(table, log, 3, false, 14000);
384     int replication = log.getLogReplication();
385     assertTrue("LowReplication Roller should've been disabled, current replication="
386             + replication, !log.isLowReplicationRollEnabled());
387 
388     dfsCluster
389         .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
390 
391     // Force roll writer. The new log file will have the default replications,
392     // and the LowReplication Roller will be enabled.
393     log.rollWriter(true);
394     batchWriteAndWait(table, log, 13, true, 10000);
395     replication = log.getLogReplication();
396     assertTrue("New log file should have the default replication instead of " +
397       replication,
398       replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
399     assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled());
400   }
401 
402   /**
403    * Test that WAL is rolled when all data nodes in the pipeline have been
404    * restarted.
405    * @throws Exception
406    */
407   @Test
408   public void testLogRollOnPipelineRestart() throws Exception {
409     LOG.info("Starting testLogRollOnPipelineRestart");
410     assertTrue("This test requires WAL file replication.",
411       fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1);
412     LOG.info("Replication=" +
413       fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
414     // When the hbase:meta table can be opened, the region servers are running
415     Table t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
416     try {
417       this.server = cluster.getRegionServer(0);
418 
419       // Create the test table and open it
420       HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(getName()));
421       desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
422 
423       admin.createTable(desc);
424       Table table = new HTable(TEST_UTIL.getConfiguration(), desc.getTableName());
425 
426       server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
427       final WAL log = server.getWAL(null);
428       final List<Path> paths = new ArrayList<Path>();
429       final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
430 
431       paths.add(DefaultWALProvider.getCurrentFileName(log));
432       log.registerWALActionsListener(new WALActionsListener.Base() {
433 
434         @Override
435         public void preLogRoll(Path oldFile, Path newFile)  {
436           LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);
437           preLogRolledCalled.add(new Integer(1));
438         }
439         @Override
440         public void postLogRoll(Path oldFile, Path newFile) {
441           paths.add(newFile);
442         }
443       });
444 
445       // don't run this test without append support (HDFS-200 & HDFS-142)
446       assertTrue("Need append support for this test", FSUtils
447           .isAppendSupported(TEST_UTIL.getConfiguration()));
448 
449       writeData(table, 1002);
450 
451       long curTime = System.currentTimeMillis();
452       LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log));
453       long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
454       assertTrue("Log should have a timestamp older than now",
455           curTime > oldFilenum && oldFilenum != -1);
456 
457       assertTrue("The log shouldn't have rolled yet", oldFilenum ==
458           DefaultWALProvider.extractFileNumFromWAL(log));
459 
460       // roll all datanodes in the pipeline
461       dfsCluster.restartDataNodes();
462       Thread.sleep(1000);
463       dfsCluster.waitActive();
464       LOG.info("Data Nodes restarted");
465       validateData(table, 1002);
466 
467       // this write should succeed, but trigger a log roll
468       writeData(table, 1003);
469       long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
470 
471       assertTrue("Missing datanode should've triggered a log roll",
472           newFilenum > oldFilenum && newFilenum > curTime);
473       validateData(table, 1003);
474 
475       writeData(table, 1004);
476 
477       // roll all datanode again
478       dfsCluster.restartDataNodes();
479       Thread.sleep(1000);
480       dfsCluster.waitActive();
481       LOG.info("Data Nodes restarted");
482       validateData(table, 1004);
483 
484       // this write should succeed, but trigger a log roll
485       writeData(table, 1005);
486 
487       // force a log roll to read back and verify previously written logs
488       log.rollWriter(true);
489       assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
490           preLogRolledCalled.size() >= 1);
491 
492       // read back the data written
493       Set<String> loggedRows = new HashSet<String>();
494       FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
495       for (Path p : paths) {
496         LOG.debug("recovering lease for " + p);
497         fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p,
498           TEST_UTIL.getConfiguration(), null);
499 
500         LOG.debug("Reading WAL "+FSUtils.getPath(p));
501         WAL.Reader reader = null;
502         try {
503           reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration());
504           WAL.Entry entry;
505           while ((entry = reader.next()) != null) {
506             LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getCells());
507             for (Cell cell : entry.getEdit().getCells()) {
508               loggedRows.add(Bytes.toStringBinary(cell.getRow()));
509             }
510           }
511         } catch (EOFException e) {
512           LOG.debug("EOF reading file "+FSUtils.getPath(p));
513         } finally {
514           if (reader != null) reader.close();
515         }
516       }
517 
518       // verify the written rows are there
519       assertTrue(loggedRows.contains("row1002"));
520       assertTrue(loggedRows.contains("row1003"));
521       assertTrue(loggedRows.contains("row1004"));
522       assertTrue(loggedRows.contains("row1005"));
523 
524       // flush all regions
525       for (Region r: server.getOnlineRegionsLocalContext()) {
526         try {
527           r.flush(true);
528         } catch (Exception e) {
529           // This try/catch was added by HBASE-14317. It is needed
530           // because this issue tightened up the semantic such that
531           // a failed append could not be followed by a successful
532           // sync. What is coming out here is a failed sync, a sync
533           // that used to 'pass'.
534           LOG.info(e);
535         }
536       }
537 
538       ResultScanner scanner = table.getScanner(new Scan());
539       try {
540         for (int i=2; i<=5; i++) {
541           Result r = scanner.next();
542           assertNotNull(r);
543           assertFalse(r.isEmpty());
544           assertEquals("row100"+i, Bytes.toString(r.getRow()));
545         }
546       } finally {
547         scanner.close();
548       }
549 
550       // verify that no region servers aborted
551       for (JVMClusterUtil.RegionServerThread rsThread:
552         TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
553         assertFalse(rsThread.getRegionServer().isAborted());
554       }
555     } finally {
556       if (t != null) t.close();
557     }
558   }
559 
560   /**
561    * Tests that logs are deleted when some region has a compaction
562    * record in WAL and no other records. See HBASE-8597.
563    */
564   @Test
565   public void testCompactionRecordDoesntBlockRolling() throws Exception {
566     Table table = null;
567     Table table2 = null;
568 
569     // When the hbase:meta table can be opened, the region servers are running
570     Table t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
571     try {
572       table = createTestTable(getName());
573       table2 = createTestTable(getName() + "1");
574 
575       server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
576       final WAL log = server.getWAL(null);
577       Region region = server.getOnlineRegions(table2.getName()).get(0);
578       Store s = region.getStore(HConstants.CATALOG_FAMILY);
579 
580       //have to flush namespace to ensure it doesn't affect wall tests
581       admin.flush(TableName.NAMESPACE_TABLE_NAME);
582 
583       // Put some stuff into table2, to make sure we have some files to compact.
584       for (int i = 1; i <= 2; ++i) {
585         doPut(table2, i);
586         admin.flush(table2.getName());
587       }
588       doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL
589       assertEquals("Should have no WAL after initial writes", 0,
590           DefaultWALProvider.getNumRolledLogFiles(log));
591       assertEquals(2, s.getStorefilesCount());
592 
593       // Roll the log and compact table2, to have compaction record in the 2nd WAL.
594       log.rollWriter();
595       assertEquals("Should have WAL; one table is not flushed", 1,
596           DefaultWALProvider.getNumRolledLogFiles(log));
597       admin.flush(table2.getName());
598       region.compact(false);
599       // Wait for compaction in case if flush triggered it before us.
600       Assert.assertNotNull(s);
601       for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
602         Threads.sleepWithoutInterrupt(200);
603       }
604       assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
605 
606       // Write some value to the table so the WAL cannot be deleted until table is flushed.
607       doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table.
608       log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
609       assertEquals("Should have WAL; one table is not flushed", 1,
610           DefaultWALProvider.getNumRolledLogFiles(log));
611 
612       // Flush table to make latest WAL obsolete; write another record, and roll again.
613       admin.flush(table.getName());
614       doPut(table, 1);
615       log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
616       assertEquals("Should have 1 WALs at the end", 1,
617           DefaultWALProvider.getNumRolledLogFiles(log));
618     } finally {
619       if (t != null) t.close();
620       if (table != null) table.close();
621       if (table2 != null) table2.close();
622     }
623   }
624 
625   private void doPut(Table table, int i) throws IOException {
626     Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
627     put.add(HConstants.CATALOG_FAMILY, null, value);
628     table.put(put);
629   }
630 
631   private Table createTestTable(String tableName) throws IOException {
632     // Create the test table and open it
633     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
634     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
635     admin.createTable(desc);
636     return new HTable(TEST_UTIL.getConfiguration(), desc.getTableName());
637   }
638 }
639