View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotEquals;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNull;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.List;
31  
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
42  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
43  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44  import org.apache.hadoop.hbase.testclassification.LargeTests;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.wal.WAL;
47  import org.apache.hadoop.hbase.wal.WALFactory;
48  import org.apache.hadoop.hbase.wal.WALKey;
49  import org.apache.hadoop.hdfs.MiniDFSCluster;
50  import org.junit.After;
51  import org.junit.AfterClass;
52  import org.junit.Before;
53  import org.junit.BeforeClass;
54  import org.junit.Rule;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  import org.junit.rules.TestName;
58  import org.junit.runner.RunWith;
59  import org.junit.runners.Parameterized;
60  import org.junit.runners.Parameterized.Parameters;
61  @Category({LargeTests.class})
62  @RunWith(Parameterized.class)
63  public class TestReplicationWALReaderManager {
64  
65    private static HBaseTestingUtility TEST_UTIL;
66    private static Configuration conf;
67    private static FileSystem fs;
68    private static MiniDFSCluster cluster;
69    private static final TableName tableName = TableName.valueOf("tablename");
70    private static final byte [] family = Bytes.toBytes("column");
71    private static final byte [] qualifier = Bytes.toBytes("qualifier");
72    private static final HRegionInfo info = new HRegionInfo(tableName,
73        HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
74    private static final HTableDescriptor htd = new HTableDescriptor(tableName);
75  
76    private WAL log;
77    private ReplicationWALReaderManager logManager;
78    private PathWatcher pathWatcher;
79    private int nbRows;
80    private int walEditKVs;
81    @Rule public TestName tn = new TestName();
82    private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
83  
84    @Parameters
85    public static Collection<Object[]> parameters() {
86      // Try out different combinations of row count and KeyValue count
87      int[] NB_ROWS = { 1500, 60000 };
88      int[] NB_KVS = { 1, 100 };
89      // whether compression is used
90      Boolean[] BOOL_VALS = { false, true };
91      List<Object[]> parameters = new ArrayList<Object[]>();
92      for (int nbRows : NB_ROWS) {
93        for (int walEditKVs : NB_KVS) {
94          for (boolean b : BOOL_VALS) {
95            Object[] arr = new Object[3];
96            arr[0] = nbRows;
97            arr[1] = walEditKVs;
98            arr[2] = b;
99            parameters.add(arr);
100         }
101       }
102     }
103     return parameters;
104   }
105 
106   public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
107     this.nbRows = nbRows;
108     this.walEditKVs = walEditKVs;
109     TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
110       enableCompression);
111     mvcc.advanceTo(1);
112   }
113   
114   @BeforeClass
115   public static void setUpBeforeClass() throws Exception {
116     TEST_UTIL = new HBaseTestingUtility();
117     conf = TEST_UTIL.getConfiguration();
118     TEST_UTIL.startMiniDFSCluster(3);
119 
120     cluster = TEST_UTIL.getDFSCluster();
121     fs = cluster.getFileSystem();
122   }
123 
124   @AfterClass
125   public static void tearDownAfterClass() throws Exception {
126     TEST_UTIL.shutdownMiniCluster();
127   }
128 
129   @Before
130   public void setUp() throws Exception {
131     logManager = new ReplicationWALReaderManager(fs, conf);
132     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
133     pathWatcher = new PathWatcher();
134     listeners.add(pathWatcher);
135     final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
136     log = wals.getWAL(info.getEncodedNameAsBytes());
137   }
138 
139   @After
140   public void tearDown() throws Exception {
141     log.close();
142   }
143 
144   @Test
145   public void test() throws Exception {
146     // Grab the path that was generated when the log rolled as part of its creation
147     Path path = pathWatcher.currentPath;
148 
149     assertEquals(0, logManager.getPosition());
150 
151     appendToLog();
152 
153     // There's one edit in the log, read it. Reading past it needs to return nulls
154     assertNotNull(logManager.openReader(path));
155     logManager.seek();
156     WAL.Entry entry = logManager.readNextAndSetPosition();
157     assertNotNull(entry);
158     entry = logManager.readNextAndSetPosition();
159     assertNull(entry);
160     logManager.closeReader();
161     long oldPos = logManager.getPosition();
162 
163     appendToLog();
164 
165     // Read the newly added entry, make sure we made progress
166     assertNotNull(logManager.openReader(path));
167     logManager.seek();
168     entry = logManager.readNextAndSetPosition();
169     assertNotEquals(oldPos, logManager.getPosition());
170     assertNotNull(entry);
171     logManager.closeReader();
172     oldPos = logManager.getPosition();
173 
174     log.rollWriter();
175 
176     // We rolled but we still should see the end of the first log and not get data
177     assertNotNull(logManager.openReader(path));
178     logManager.seek();
179     entry = logManager.readNextAndSetPosition();
180     assertEquals(oldPos, logManager.getPosition());
181     assertNull(entry);
182     logManager.finishCurrentFile();
183 
184     path = pathWatcher.currentPath;
185 
186     for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
187     log.rollWriter();
188     logManager.openReader(path);
189     logManager.seek();
190     for (int i = 0; i < nbRows; i++) {
191       WAL.Entry e = logManager.readNextAndSetPosition();
192       if (e == null) {
193         fail("Should have enough entries");
194       }
195     }
196   }
197 
198   private void appendToLog() throws IOException {
199     appendToLogPlus(1);
200   }
201 
202   private void appendToLogPlus(int count) throws IOException {
203     final long txid = log.append(htd, info,
204         new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
205         getWALEdits(count), true);
206     log.sync(txid);
207   }
208 
209   private WALEdit getWALEdits(int count) {
210     WALEdit edit = new WALEdit();
211     for (int i = 0; i < count; i++) {
212       edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
213         System.currentTimeMillis(), qualifier));
214     }
215     return edit;
216   }
217 
218   class PathWatcher extends WALActionsListener.Base {
219 
220     Path currentPath;
221 
222     @Override
223     public void preLogRoll(Path oldPath, Path newPath) throws IOException {
224       currentPath = newPath;
225     }
226   }
227 }