1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21
22 import java.io.IOException;
23 import java.util.Arrays;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.CategoryBasedTimeout;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.HBaseTestingUtility;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.client.Put;
36 import org.apache.hadoop.hbase.client.Result;
37 import org.apache.hadoop.hbase.client.ResultScanner;
38 import org.apache.hadoop.hbase.client.Scan;
39 import org.apache.hadoop.hbase.client.Table;
40 import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
41 import org.apache.hadoop.hbase.testclassification.LargeTests;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.mapreduce.Counters;
44 import org.junit.AfterClass;
45 import org.junit.Assert;
46 import org.junit.BeforeClass;
47 import org.junit.Rule;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50 import org.junit.rules.TestRule;
51
52 import com.google.common.base.Throwables;
53
54
55
56
57 @Category(LargeTests.class)
58 public class TestSyncTable {
59 @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
60 withTimeout(this.getClass()).withLookingForStuckThread(true).build();
61 private static final Log LOG = LogFactory.getLog(TestSyncTable.class);
62
63 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
64
65 @BeforeClass
66 public static void beforeClass() throws Exception {
67 TEST_UTIL.setJobWithoutMRCluster();
68 TEST_UTIL.startMiniCluster(3);
69 }
70
71 @AfterClass
72 public static void afterClass() throws Exception {
73 TEST_UTIL.shutdownMiniCluster();
74 }
75
76 private static byte[][] generateSplits(int numRows, int numRegions) {
77 byte[][] splitRows = new byte[numRegions-1][];
78 for (int i = 1; i < numRegions; i++) {
79 splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
80 }
81 return splitRows;
82 }
83
84 @Test
85 public void testSyncTable() throws Exception {
86 String sourceTableName = "testSourceTable";
87 String targetTableName = "testTargetTable";
88 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
89
90 writeTestData(sourceTableName, targetTableName);
91 hashSourceTable(sourceTableName, testDir);
92 Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
93 assertEqualTables(90, sourceTableName, targetTableName);
94
95 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
96 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
97 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
98 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
99 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
100 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
101
102 TEST_UTIL.deleteTable(sourceTableName);
103 TEST_UTIL.deleteTable(targetTableName);
104 TEST_UTIL.cleanupDataTestDirOnTestFS();
105 }
106
107 private void assertEqualTables(int expectedRows, String sourceTableName, String targetTableName)
108 throws Exception {
109 Table sourceTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(sourceTableName));
110 Table targetTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(targetTableName));
111
112 ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
113 ResultScanner targetScanner = targetTable.getScanner(new Scan());
114
115 for (int i = 0; i < expectedRows; i++) {
116 Result sourceRow = sourceScanner.next();
117 Result targetRow = targetScanner.next();
118
119 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
120 + " cells:" + sourceRow);
121 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
122 + " cells:" + targetRow);
123
124 if (sourceRow == null) {
125 Assert.fail("Expected " + expectedRows
126 + " source rows but only found " + i);
127 }
128 if (targetRow == null) {
129 Assert.fail("Expected " + expectedRows
130 + " target rows but only found " + i);
131 }
132 Cell[] sourceCells = sourceRow.rawCells();
133 Cell[] targetCells = targetRow.rawCells();
134 if (sourceCells.length != targetCells.length) {
135 LOG.debug("Source cells: " + Arrays.toString(sourceCells));
136 LOG.debug("Target cells: " + Arrays.toString(targetCells));
137 Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
138 + " has " + sourceCells.length
139 + " cells in source table but " + targetCells.length
140 + " cells in target table");
141 }
142 for (int j = 0; j < sourceCells.length; j++) {
143 Cell sourceCell = sourceCells[j];
144 Cell targetCell = targetCells[j];
145 try {
146 if (!CellUtil.matchingRow(sourceCell, targetCell)) {
147 Assert.fail("Rows don't match");
148 }
149 if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
150 Assert.fail("Families don't match");
151 }
152 if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
153 Assert.fail("Qualifiers don't match");
154 }
155 if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
156 Assert.fail("Timestamps don't match");
157 }
158 if (!CellUtil.matchingValue(sourceCell, targetCell)) {
159 Assert.fail("Values don't match");
160 }
161 } catch (Throwable t) {
162 LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
163 Throwables.propagate(t);
164 }
165 }
166 }
167 Result sourceRow = sourceScanner.next();
168 if (sourceRow != null) {
169 Assert.fail("Source table has more than " + expectedRows
170 + " rows. Next row: " + Bytes.toInt(sourceRow.getRow()));
171 }
172 Result targetRow = targetScanner.next();
173 if (targetRow != null) {
174 Assert.fail("Target table has more than " + expectedRows
175 + " rows. Next row: " + Bytes.toInt(targetRow.getRow()));
176 }
177 sourceScanner.close();
178 targetScanner.close();
179 sourceTable.close();
180 targetTable.close();
181 }
182
183 private Counters syncTables(String sourceTableName, String targetTableName,
184 Path testDir) throws Exception {
185 SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
186 int code = syncTable.run(new String[] {
187 testDir.toString(),
188 sourceTableName,
189 targetTableName
190 });
191 assertEquals("sync table job failed", 0, code);
192
193 LOG.info("Sync tables completed");
194 return syncTable.counters;
195 }
196
197 private void hashSourceTable(String sourceTableName, Path testDir)
198 throws Exception, IOException {
199 int numHashFiles = 3;
200 long batchSize = 100;
201 int scanBatch = 1;
202 HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
203 int code = hashTable.run(new String[] {
204 "--batchsize=" + batchSize,
205 "--numhashfiles=" + numHashFiles,
206 "--scanbatch=" + scanBatch,
207 sourceTableName,
208 testDir.toString()});
209 assertEquals("hash table job failed", 0, code);
210
211 FileSystem fs = TEST_UTIL.getTestFileSystem();
212
213 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
214 assertEquals(sourceTableName, tableHash.tableName);
215 assertEquals(batchSize, tableHash.batchSize);
216 assertEquals(numHashFiles, tableHash.numHashFiles);
217 assertEquals(numHashFiles - 1, tableHash.partitions.size());
218
219 LOG.info("Hash table completed");
220 }
221
222 private void writeTestData(String sourceTableName, String targetTableName)
223 throws Exception {
224 final byte[] family = Bytes.toBytes("family");
225 final byte[] column1 = Bytes.toBytes("c1");
226 final byte[] column2 = Bytes.toBytes("c2");
227 final byte[] value1 = Bytes.toBytes("val1");
228 final byte[] value2 = Bytes.toBytes("val2");
229 final byte[] value3 = Bytes.toBytes("val3");
230
231 int numRows = 100;
232 int sourceRegions = 10;
233 int targetRegions = 6;
234
235 HTable sourceTable = TEST_UTIL.createTable(TableName.valueOf(sourceTableName),
236 family, generateSplits(numRows, sourceRegions));
237
238 HTable targetTable = TEST_UTIL.createTable(TableName.valueOf(targetTableName),
239 family, generateSplits(numRows, targetRegions));
240
241 long timestamp = 1430764183454L;
242
243 int rowIndex = 0;
244
245 for (; rowIndex < 40; rowIndex++) {
246 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
247 sourcePut.addColumn(family, column1, timestamp, value1);
248 sourcePut.addColumn(family, column2, timestamp, value2);
249 sourceTable.put(sourcePut);
250
251 Put targetPut = new Put(Bytes.toBytes(rowIndex));
252 targetPut.addColumn(family, column1, timestamp, value1);
253 targetPut.addColumn(family, column2, timestamp, value2);
254 targetTable.put(targetPut);
255 }
256
257
258
259
260 for (; rowIndex < 50; rowIndex++) {
261 Put put = new Put(Bytes.toBytes(rowIndex));
262 put.addColumn(family, column1, timestamp, value1);
263 put.addColumn(family, column2, timestamp, value2);
264 sourceTable.put(put);
265 }
266
267
268
269
270 for (; rowIndex < 60; rowIndex++) {
271 Put put = new Put(Bytes.toBytes(rowIndex));
272 put.addColumn(family, column1, timestamp, value1);
273 put.addColumn(family, column2, timestamp, value2);
274 targetTable.put(put);
275 }
276
277
278
279 for (; rowIndex < 70; rowIndex++) {
280 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
281 sourcePut.addColumn(family, column1, timestamp, value1);
282 sourcePut.addColumn(family, column2, timestamp, value2);
283 sourceTable.put(sourcePut);
284
285 Put targetPut = new Put(Bytes.toBytes(rowIndex));
286 targetPut.addColumn(family, column1, timestamp, value1);
287 targetTable.put(targetPut);
288 }
289
290
291
292 for (; rowIndex < 80; rowIndex++) {
293 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
294 sourcePut.addColumn(family, column1, timestamp, value1);
295 sourceTable.put(sourcePut);
296
297 Put targetPut = new Put(Bytes.toBytes(rowIndex));
298 targetPut.addColumn(family, column1, timestamp, value1);
299 targetPut.addColumn(family, column2, timestamp, value2);
300 targetTable.put(targetPut);
301 }
302
303
304
305
306 for (; rowIndex < 90; rowIndex++) {
307 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
308 sourcePut.addColumn(family, column1, timestamp, column1);
309 sourcePut.addColumn(family, column2, timestamp, value2);
310 sourceTable.put(sourcePut);
311
312 Put targetPut = new Put(Bytes.toBytes(rowIndex));
313 targetPut.addColumn(family, column1, timestamp+1, column1);
314 targetPut.addColumn(family, column2, timestamp-1, value2);
315 targetTable.put(targetPut);
316 }
317
318
319
320 for (; rowIndex < numRows; rowIndex++) {
321 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
322 sourcePut.addColumn(family, column1, timestamp, value1);
323 sourcePut.addColumn(family, column2, timestamp, value2);
324 sourceTable.put(sourcePut);
325
326 Put targetPut = new Put(Bytes.toBytes(rowIndex));
327 targetPut.addColumn(family, column1, timestamp, value3);
328 targetPut.addColumn(family, column2, timestamp, value3);
329 targetTable.put(targetPut);
330 }
331
332 sourceTable.close();
333 targetTable.close();
334 }
335
336
337 }