1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce.replication;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.hbase.*;
28 import org.apache.hadoop.hbase.client.HConnectable;
29 import org.apache.hadoop.hbase.client.HConnection;
30 import org.apache.hadoop.hbase.client.HConnectionManager;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
39 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
40 import org.apache.hadoop.hbase.mapreduce.TableMapper;
41 import org.apache.hadoop.hbase.mapreduce.TableSplit;
42 import org.apache.hadoop.hbase.replication.ReplicationException;
43 import org.apache.hadoop.hbase.replication.ReplicationFactory;
44 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
45 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
46 import org.apache.hadoop.hbase.replication.ReplicationPeers;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.Pair;
49 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50 import org.apache.hadoop.mapreduce.Job;
51 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
52 import org.apache.hadoop.util.Tool;
53 import org.apache.hadoop.util.ToolRunner;
54
55
56
57
58
59
60
61
62
63
64
65 public class VerifyReplication extends Configured implements Tool {
66
67 private static final Log LOG =
68 LogFactory.getLog(VerifyReplication.class);
69
70 public final static String NAME = "verifyrep";
71 private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
72 static long startTime = 0;
73 static long endTime = Long.MAX_VALUE;
74 static int versions = -1;
75 static String tableName = null;
76 static String families = null;
77 static String peerId = null;
78
79
80
81
82 public static class Verifier
83 extends TableMapper<ImmutableBytesWritable, Put> {
84
85 public static enum Counters {
86 GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
87
88 private ResultScanner replicatedScanner;
89 private Result currentCompareRowInPeerTable;
90 private Table replicatedTable;
91
92
93
94
95
96
97
98
99
100 @Override
101 public void map(ImmutableBytesWritable row, final Result value,
102 Context context)
103 throws IOException {
104 if (replicatedScanner == null) {
105 Configuration conf = context.getConfiguration();
106 final Scan scan = new Scan();
107 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
108 long startTime = conf.getLong(NAME + ".startTime", 0);
109 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
110 String families = conf.get(NAME + ".families", null);
111 if(families != null) {
112 String[] fams = families.split(",");
113 for(String fam : fams) {
114 scan.addFamily(Bytes.toBytes(fam));
115 }
116 }
117 scan.setTimeRange(startTime, endTime);
118 int versions = conf.getInt(NAME+".versions", -1);
119 LOG.info("Setting number of version inside map as: " + versions);
120 if (versions >= 0) {
121 scan.setMaxVersions(versions);
122 }
123
124 final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
125 HConnectionManager.execute(new HConnectable<Void>(conf) {
126 @Override
127 public Void connect(HConnection conn) throws IOException {
128 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
129 Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
130 zkClusterKey, PEER_CONFIG_PREFIX);
131
132 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
133 replicatedTable = new HTable(peerConf, tableName);
134 scan.setStartRow(value.getRow());
135 scan.setStopRow(tableSplit.getEndRow());
136 replicatedScanner = replicatedTable.getScanner(scan);
137 return null;
138 }
139 });
140 currentCompareRowInPeerTable = replicatedScanner.next();
141 }
142 while (true) {
143 if (currentCompareRowInPeerTable == null) {
144
145 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
146 break;
147 }
148 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
149 if (rowCmpRet == 0) {
150
151 try {
152 Result.compareResults(value, currentCompareRowInPeerTable);
153 context.getCounter(Counters.GOODROWS).increment(1);
154 } catch (Exception e) {
155 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
156 LOG.error("Exception while comparing row : " + e);
157 }
158 currentCompareRowInPeerTable = replicatedScanner.next();
159 break;
160 } else if (rowCmpRet < 0) {
161
162 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
163 break;
164 } else {
165
166 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
167 currentCompareRowInPeerTable);
168 currentCompareRowInPeerTable = replicatedScanner.next();
169 }
170 }
171 }
172
173 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
174 context.getCounter(counter).increment(1);
175 context.getCounter(Counters.BADROWS).increment(1);
176 LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
177 }
178
179 @Override
180 protected void cleanup(Context context) {
181 if (replicatedScanner != null) {
182 try {
183 while (currentCompareRowInPeerTable != null) {
184 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
185 currentCompareRowInPeerTable);
186 currentCompareRowInPeerTable = replicatedScanner.next();
187 }
188 } catch (Exception e) {
189 LOG.error("fail to scan peer table in cleanup", e);
190 } finally {
191 replicatedScanner.close();
192 replicatedScanner = null;
193 }
194 }
195 if (replicatedTable != null) {
196 TableName tableName = replicatedTable.getName();
197 try {
198 replicatedTable.close();
199 } catch (IOException ioe) {
200 LOG.warn("Exception closing " + tableName, ioe);
201 }
202 }
203 }
204 }
205
206 private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
207 final Configuration conf) throws IOException {
208 ZooKeeperWatcher localZKW = null;
209 ReplicationPeerZKImpl peer = null;
210 try {
211 localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
212 new Abortable() {
213 @Override public void abort(String why, Throwable e) {}
214 @Override public boolean isAborted() {return false;}
215 });
216
217 ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
218 rp.init();
219
220 Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
221 if (pair == null) {
222 throw new IOException("Couldn't get peer conf!");
223 }
224
225 return pair;
226 } catch (ReplicationException e) {
227 throw new IOException(
228 "An error occured while trying to connect to the remove peer cluster", e);
229 } finally {
230 if (peer != null) {
231 peer.close();
232 }
233 if (localZKW != null) {
234 localZKW.close();
235 }
236 }
237 }
238
239
240
241
242
243
244
245
246
247 public static Job createSubmittableJob(Configuration conf, String[] args)
248 throws IOException {
249 if (!doCommandLine(args)) {
250 return null;
251 }
252 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
253 HConstants.REPLICATION_ENABLE_DEFAULT)) {
254 throw new IOException("Replication needs to be enabled to verify it.");
255 }
256 conf.set(NAME+".peerId", peerId);
257 conf.set(NAME+".tableName", tableName);
258 conf.setLong(NAME+".startTime", startTime);
259 conf.setLong(NAME+".endTime", endTime);
260 if (families != null) {
261 conf.set(NAME+".families", families);
262 }
263
264 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
265 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
266 String peerQuorumAddress = peerConfig.getClusterKey();
267 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
268 peerConfig.getConfiguration());
269 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
270 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
271 peerConfig.getConfiguration().entrySet());
272
273 conf.setInt(NAME + ".versions", versions);
274 LOG.info("Number of version: " + versions);
275
276 Job job = new Job(conf, NAME + "_" + tableName);
277 job.setJarByClass(VerifyReplication.class);
278
279 Scan scan = new Scan();
280 scan.setTimeRange(startTime, endTime);
281 if (versions >= 0) {
282 scan.setMaxVersions(versions);
283 LOG.info("Number of versions set to " + versions);
284 }
285 if(families != null) {
286 String[] fams = families.split(",");
287 for(String fam : fams) {
288 scan.addFamily(Bytes.toBytes(fam));
289 }
290 }
291 TableMapReduceUtil.initTableMapperJob(tableName, scan,
292 Verifier.class, null, null, job);
293
294 Configuration peerClusterConf = peerConfigPair.getSecond();
295
296 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
297
298 job.setOutputFormatClass(NullOutputFormat.class);
299 job.setNumReduceTasks(0);
300 return job;
301 }
302
303 private static boolean doCommandLine(final String[] args) {
304 if (args.length < 2) {
305 printUsage(null);
306 return false;
307 }
308 try {
309 for (int i = 0; i < args.length; i++) {
310 String cmd = args[i];
311 if (cmd.equals("-h") || cmd.startsWith("--h")) {
312 printUsage(null);
313 return false;
314 }
315
316 final String startTimeArgKey = "--starttime=";
317 if (cmd.startsWith(startTimeArgKey)) {
318 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
319 continue;
320 }
321
322 final String endTimeArgKey = "--endtime=";
323 if (cmd.startsWith(endTimeArgKey)) {
324 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
325 continue;
326 }
327
328 final String versionsArgKey = "--versions=";
329 if (cmd.startsWith(versionsArgKey)) {
330 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
331 continue;
332 }
333
334 final String familiesArgKey = "--families=";
335 if (cmd.startsWith(familiesArgKey)) {
336 families = cmd.substring(familiesArgKey.length());
337 continue;
338 }
339
340 if (i == args.length-2) {
341 peerId = cmd;
342 }
343
344 if (i == args.length-1) {
345 tableName = cmd;
346 }
347 }
348 } catch (Exception e) {
349 e.printStackTrace();
350 printUsage("Can't start because " + e.getMessage());
351 return false;
352 }
353 return true;
354 }
355
356
357
358
359 private static void printUsage(final String errorMsg) {
360 if (errorMsg != null && errorMsg.length() > 0) {
361 System.err.println("ERROR: " + errorMsg);
362 }
363 System.err.println("Usage: verifyrep [--starttime=X]" +
364 " [--stoptime=Y] [--families=A] <peerid> <tablename>");
365 System.err.println();
366 System.err.println("Options:");
367 System.err.println(" starttime beginning of the time range");
368 System.err.println(" without endtime means from starttime to forever");
369 System.err.println(" endtime end of the time range");
370 System.err.println(" versions number of cell versions to verify");
371 System.err.println(" families comma-separated list of families to copy");
372 System.err.println();
373 System.err.println("Args:");
374 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
375 System.err.println(" tablename Name of the table to verify");
376 System.err.println();
377 System.err.println("Examples:");
378 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
379 System.err.println(" $ bin/hbase " +
380 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
381 " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
382 }
383
384 @Override
385 public int run(String[] args) throws Exception {
386 Configuration conf = this.getConf();
387 Job job = createSubmittableJob(conf, args);
388 if (job != null) {
389 return job.waitForCompletion(true) ? 0 : 1;
390 }
391 return 1;
392 }
393
394
395
396
397
398
399
400 public static void main(String[] args) throws Exception {
401 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
402 System.exit(res);
403 }
404 }