View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.io.IOException;
23  import java.util.Arrays;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.CategoryBasedTimeout;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.HBaseTestingUtility;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.client.HTable;
35  import org.apache.hadoop.hbase.client.Put;
36  import org.apache.hadoop.hbase.client.Result;
37  import org.apache.hadoop.hbase.client.ResultScanner;
38  import org.apache.hadoop.hbase.client.Scan;
39  import org.apache.hadoop.hbase.client.Table;
40  import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
41  import org.apache.hadoop.hbase.testclassification.LargeTests;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.mapreduce.Counters;
44  import org.junit.AfterClass;
45  import org.junit.Assert;
46  import org.junit.BeforeClass;
47  import org.junit.Rule;
48  import org.junit.Test;
49  import org.junit.experimental.categories.Category;
50  import org.junit.rules.TestRule;
51  
52  import com.google.common.base.Throwables;
53  
54  /**
55   * Basic test for the SyncTable M/R tool
56   */
57  @Category(LargeTests.class)
58  public class TestSyncTable {
59    @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
60        withTimeout(this.getClass()).withLookingForStuckThread(true).build();
61    private static final Log LOG = LogFactory.getLog(TestSyncTable.class);
62    
63    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();  
64    
65    @BeforeClass
66    public static void beforeClass() throws Exception {
67      TEST_UTIL.setJobWithoutMRCluster();
68      TEST_UTIL.startMiniCluster(3);
69    }
70    
71    @AfterClass
72    public static void afterClass() throws Exception {
73      TEST_UTIL.shutdownMiniCluster();
74    }
75    
76    private static byte[][] generateSplits(int numRows, int numRegions) {
77      byte[][] splitRows = new byte[numRegions-1][];
78      for (int i = 1; i < numRegions; i++) {
79        splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
80      }
81      return splitRows;
82    }
83    
84    @Test
85    public void testSyncTable() throws Exception {
86      String sourceTableName = "testSourceTable";
87      String targetTableName = "testTargetTable";
88      Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
89      
90      writeTestData(sourceTableName, targetTableName);
91      hashSourceTable(sourceTableName, testDir);
92      Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
93      assertEqualTables(90, sourceTableName, targetTableName);
94      
95      assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
96      assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
97      assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
98      assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
99      assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
100     assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
101     
102     TEST_UTIL.deleteTable(sourceTableName);
103     TEST_UTIL.deleteTable(targetTableName);
104     TEST_UTIL.cleanupDataTestDirOnTestFS();
105   }
106 
107   private void assertEqualTables(int expectedRows, String sourceTableName, String targetTableName) 
108       throws Exception {
109     Table sourceTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(sourceTableName));
110     Table targetTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(targetTableName));
111     
112     ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
113     ResultScanner targetScanner = targetTable.getScanner(new Scan());
114     
115     for (int i = 0; i < expectedRows; i++) {
116       Result sourceRow = sourceScanner.next();
117       Result targetRow = targetScanner.next();
118       
119       LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
120           + " cells:" + sourceRow);
121       LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
122           + " cells:" + targetRow);
123       
124       if (sourceRow == null) {
125         Assert.fail("Expected " + expectedRows
126             + " source rows but only found " + i); 
127       }
128       if (targetRow == null) {
129         Assert.fail("Expected " + expectedRows
130             + " target rows but only found " + i); 
131       }
132       Cell[] sourceCells = sourceRow.rawCells();
133       Cell[] targetCells = targetRow.rawCells();
134       if (sourceCells.length != targetCells.length) {
135         LOG.debug("Source cells: " + Arrays.toString(sourceCells));
136         LOG.debug("Target cells: " + Arrays.toString(targetCells));
137         Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
138             + " has " + sourceCells.length
139             + " cells in source table but " + targetCells.length
140             + " cells in target table");
141       }
142       for (int j = 0; j < sourceCells.length; j++) {
143         Cell sourceCell = sourceCells[j];
144         Cell targetCell = targetCells[j];
145         try {
146           if (!CellUtil.matchingRow(sourceCell, targetCell)) {
147             Assert.fail("Rows don't match");
148           }
149           if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
150             Assert.fail("Families don't match");
151           }
152           if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
153             Assert.fail("Qualifiers don't match");
154           }
155           if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
156             Assert.fail("Timestamps don't match");
157           }
158           if (!CellUtil.matchingValue(sourceCell, targetCell)) {
159             Assert.fail("Values don't match");
160           }
161         } catch (Throwable t) {
162           LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
163           Throwables.propagate(t);
164         }
165       }
166     }
167     Result sourceRow = sourceScanner.next();
168     if (sourceRow != null) {
169       Assert.fail("Source table has more than " + expectedRows
170           + " rows.  Next row: " + Bytes.toInt(sourceRow.getRow()));
171     }
172     Result targetRow = targetScanner.next();
173     if (targetRow != null) {
174       Assert.fail("Target table has more than " + expectedRows
175           + " rows.  Next row: " + Bytes.toInt(targetRow.getRow()));
176     }
177     sourceScanner.close();
178     targetScanner.close();
179     sourceTable.close();
180     targetTable.close();
181   }
182 
183   private Counters syncTables(String sourceTableName, String targetTableName,
184       Path testDir) throws Exception {
185     SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
186     int code = syncTable.run(new String[] { 
187         testDir.toString(),
188         sourceTableName,
189         targetTableName
190         });
191     assertEquals("sync table job failed", 0, code);
192     
193     LOG.info("Sync tables completed");
194     return syncTable.counters;
195   }
196 
197   private void hashSourceTable(String sourceTableName, Path testDir)
198       throws Exception, IOException {
199     int numHashFiles = 3;
200     long batchSize = 100;  // should be 2 batches per region
201     int scanBatch = 1;
202     HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
203     int code = hashTable.run(new String[] { 
204         "--batchsize=" + batchSize,
205         "--numhashfiles=" + numHashFiles,
206         "--scanbatch=" + scanBatch,
207         sourceTableName,
208         testDir.toString()});
209     assertEquals("hash table job failed", 0, code);
210     
211     FileSystem fs = TEST_UTIL.getTestFileSystem();
212     
213     HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
214     assertEquals(sourceTableName, tableHash.tableName);
215     assertEquals(batchSize, tableHash.batchSize);
216     assertEquals(numHashFiles, tableHash.numHashFiles);
217     assertEquals(numHashFiles - 1, tableHash.partitions.size());
218 
219     LOG.info("Hash table completed");
220   }
221 
222   private void writeTestData(String sourceTableName, String targetTableName)
223       throws Exception {
224     final byte[] family = Bytes.toBytes("family");
225     final byte[] column1 = Bytes.toBytes("c1");
226     final byte[] column2 = Bytes.toBytes("c2");
227     final byte[] value1 = Bytes.toBytes("val1");
228     final byte[] value2 = Bytes.toBytes("val2");
229     final byte[] value3 = Bytes.toBytes("val3");
230     
231     int numRows = 100;
232     int sourceRegions = 10;
233     int targetRegions = 6;
234     
235     HTable sourceTable = TEST_UTIL.createTable(TableName.valueOf(sourceTableName),
236         family, generateSplits(numRows, sourceRegions));
237 
238     HTable targetTable = TEST_UTIL.createTable(TableName.valueOf(targetTableName),
239         family, generateSplits(numRows, targetRegions));
240 
241     long timestamp = 1430764183454L;
242 
243     int rowIndex = 0;
244     // a bunch of identical rows
245     for (; rowIndex < 40; rowIndex++) {
246       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
247       sourcePut.addColumn(family, column1, timestamp, value1);
248       sourcePut.addColumn(family, column2, timestamp, value2);
249       sourceTable.put(sourcePut);
250      
251       Put targetPut = new Put(Bytes.toBytes(rowIndex));
252       targetPut.addColumn(family, column1, timestamp, value1);
253       targetPut.addColumn(family, column2, timestamp, value2);
254       targetTable.put(targetPut);
255     }
256     // some rows only in the source table
257     // ROWSWITHDIFFS: 10
258     // TARGETMISSINGROWS: 10
259     // TARGETMISSINGCELLS: 20
260     for (; rowIndex < 50; rowIndex++) {
261       Put put = new Put(Bytes.toBytes(rowIndex));
262       put.addColumn(family, column1, timestamp, value1);
263       put.addColumn(family, column2, timestamp, value2);
264       sourceTable.put(put);
265     }
266     // some rows only in the target table
267     // ROWSWITHDIFFS: 10
268     // SOURCEMISSINGROWS: 10
269     // SOURCEMISSINGCELLS: 20
270     for (; rowIndex < 60; rowIndex++) {
271       Put put = new Put(Bytes.toBytes(rowIndex));
272       put.addColumn(family, column1, timestamp, value1);
273       put.addColumn(family, column2, timestamp, value2);
274       targetTable.put(put);
275     }
276     // some rows with 1 missing cell in target table
277     // ROWSWITHDIFFS: 10
278     // TARGETMISSINGCELLS: 10
279     for (; rowIndex < 70; rowIndex++) {
280       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
281       sourcePut.addColumn(family, column1, timestamp, value1);
282       sourcePut.addColumn(family, column2, timestamp, value2);
283       sourceTable.put(sourcePut);
284 
285       Put targetPut = new Put(Bytes.toBytes(rowIndex));
286       targetPut.addColumn(family, column1, timestamp, value1);
287       targetTable.put(targetPut);
288     }
289     // some rows with 1 missing cell in source table
290     // ROWSWITHDIFFS: 10
291     // SOURCEMISSINGCELLS: 10
292     for (; rowIndex < 80; rowIndex++) {
293       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
294       sourcePut.addColumn(family, column1, timestamp, value1);
295       sourceTable.put(sourcePut);
296 
297       Put targetPut = new Put(Bytes.toBytes(rowIndex));
298       targetPut.addColumn(family, column1, timestamp, value1);
299       targetPut.addColumn(family, column2, timestamp, value2);
300       targetTable.put(targetPut);
301     }
302     // some rows differing only in timestamp
303     // ROWSWITHDIFFS: 10
304     // SOURCEMISSINGCELLS: 20
305     // TARGETMISSINGCELLS: 20
306     for (; rowIndex < 90; rowIndex++) {
307       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
308       sourcePut.addColumn(family, column1, timestamp, column1);
309       sourcePut.addColumn(family, column2, timestamp, value2);
310       sourceTable.put(sourcePut);
311 
312       Put targetPut = new Put(Bytes.toBytes(rowIndex));
313       targetPut.addColumn(family, column1, timestamp+1, column1);
314       targetPut.addColumn(family, column2, timestamp-1, value2);
315       targetTable.put(targetPut);
316     }
317     // some rows with different values
318     // ROWSWITHDIFFS: 10
319     // DIFFERENTCELLVALUES: 20
320     for (; rowIndex < numRows; rowIndex++) {
321       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
322       sourcePut.addColumn(family, column1, timestamp, value1);
323       sourcePut.addColumn(family, column2, timestamp, value2);
324       sourceTable.put(sourcePut);
325       
326       Put targetPut = new Put(Bytes.toBytes(rowIndex));
327       targetPut.addColumn(family, column1, timestamp, value3);
328       targetPut.addColumn(family, column2, timestamp, value3);
329       targetTable.put(targetPut);
330     }
331     
332     sourceTable.close();
333     targetTable.close();
334   }
335   
336 
337 }