View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertNotSame;
24  import static org.junit.Assert.assertNull;
25  import static org.junit.Assert.assertTrue;
26  
27  import java.io.IOException;
28  import java.util.NavigableSet;
29  
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValueTestUtil;
37  import org.apache.hadoop.hbase.testclassification.SmallTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
40  import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
41  import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
42  import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.FSUtils;
45  import org.junit.Test;
46  import org.junit.experimental.categories.Category;
47  
48  // imports for things that haven't moved from regionserver.wal yet.
49  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
50  
51  /**
52   * Simple testing of a few WAL methods.
53   */
54  @Category(SmallTests.class)
55  public class TestWALMethods {
56    private static final byte[] TEST_REGION = Bytes.toBytes("test_region");;
57    private static final TableName TEST_TABLE =
58        TableName.valueOf("test_table");
59  
60    private final HBaseTestingUtility util = new HBaseTestingUtility();
61  
62    /**
63     * Assert that getSplitEditFilesSorted returns files in expected order and
64     * that it skips moved-aside files.
65     * @throws IOException
66     */
67    @Test public void testGetSplitEditFilesSorted() throws IOException {
68      FileSystem fs = FileSystem.get(util.getConfiguration());
69      Path regiondir = util.getDataTestDir("regiondir");
70      fs.delete(regiondir, true);
71      fs.mkdirs(regiondir);
72      Path recoverededits = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
73      String first = WALSplitter.formatRecoveredEditsFileName(-1);
74      createFile(fs, recoverededits, first);
75      createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(0));
76      createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(1));
77      createFile(fs, recoverededits, WALSplitter
78          .formatRecoveredEditsFileName(11));
79      createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(2));
80      createFile(fs, recoverededits, WALSplitter
81          .formatRecoveredEditsFileName(50));
82      String last = WALSplitter.formatRecoveredEditsFileName(Long.MAX_VALUE);
83      createFile(fs, recoverededits, last);
84      createFile(fs, recoverededits,
85        Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
86  
87      final Configuration walConf = new Configuration(util.getConfiguration());
88      FSUtils.setRootDir(walConf, regiondir);
89      (new WALFactory(walConf, null, "dummyLogName")).getWAL(new byte[]{});
90  
91      NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
92      assertEquals(7, files.size());
93      assertEquals(files.pollFirst().getName(), first);
94      assertEquals(files.pollLast().getName(), last);
95      assertEquals(files.pollFirst().getName(),
96        WALSplitter
97          .formatRecoveredEditsFileName(0));
98      assertEquals(files.pollFirst().getName(),
99        WALSplitter
100         .formatRecoveredEditsFileName(1));
101     assertEquals(files.pollFirst().getName(),
102       WALSplitter
103         .formatRecoveredEditsFileName(2));
104     assertEquals(files.pollFirst().getName(),
105       WALSplitter
106         .formatRecoveredEditsFileName(11));
107   }
108 
109   private void createFile(final FileSystem fs, final Path testdir,
110       final String name)
111   throws IOException {
112     FSDataOutputStream fdos = fs.create(new Path(testdir, name), true);
113     fdos.close();
114   }
115 
116   @Test
117   public void testRegionEntryBuffer() throws Exception {
118     WALSplitter.RegionEntryBuffer reb = new WALSplitter.RegionEntryBuffer(
119         TEST_TABLE, TEST_REGION);
120     assertEquals(0, reb.heapSize());
121 
122     reb.appendEntry(createTestLogEntry(1));
123     assertTrue(reb.heapSize() > 0);
124   }
125 
126   @Test
127   public void testEntrySink() throws Exception {
128     Configuration conf = new Configuration();
129     RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
130       RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
131 
132     EntryBuffers sink = new EntryBuffers(new PipelineController(), 1*1024*1024);
133     for (int i = 0; i < 1000; i++) {
134       WAL.Entry entry = createTestLogEntry(i);
135       sink.appendEntry(entry);
136     }
137 
138     assertTrue(sink.totalBuffered > 0);
139     long amountInChunk = sink.totalBuffered;
140     // Get a chunk
141     RegionEntryBuffer chunk = sink.getChunkToWrite();
142     assertEquals(chunk.heapSize(), amountInChunk);
143 
144     // Make sure it got marked that a thread is "working on this"
145     assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION));
146 
147     // Insert some more entries
148     for (int i = 0; i < 500; i++) {
149       WAL.Entry entry = createTestLogEntry(i);
150       sink.appendEntry(entry);
151     }
152     // Asking for another chunk shouldn't work since the first one
153     // is still writing
154     assertNull(sink.getChunkToWrite());
155 
156     // If we say we're done writing the first chunk, then we should be able
157     // to get the second
158     sink.doneWriting(chunk);
159 
160     RegionEntryBuffer chunk2 = sink.getChunkToWrite();
161     assertNotNull(chunk2);
162     assertNotSame(chunk, chunk2);
163     long amountInChunk2 = sink.totalBuffered;
164     // The second chunk had fewer rows than the first
165     assertTrue(amountInChunk2 < amountInChunk);
166 
167     sink.doneWriting(chunk2);
168     assertEquals(0, sink.totalBuffered);
169   }
170 
171   private WAL.Entry createTestLogEntry(int i) {
172     long seq = i;
173     long now = i * 1000;
174 
175     WALEdit edit = new WALEdit();
176     edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val"));
177     WALKey key = new WALKey(TEST_REGION, TEST_TABLE, seq, now,
178         HConstants.DEFAULT_CLUSTER_ID);
179     WAL.Entry entry = new WAL.Entry(key, edit);
180     return entry;
181   }
182 
183 }
184