1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.UUID;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.testclassification.MediumTests;
33 import org.apache.hadoop.hbase.HRegionInfo;
34 import org.apache.hadoop.hbase.Waiter;
35 import org.apache.hadoop.hbase.client.Connection;
36 import org.apache.hadoop.hbase.client.ConnectionFactory;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.wal.WAL.Entry;
40 import org.apache.hadoop.hbase.regionserver.HRegion;
41 import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
44 import org.apache.hadoop.hbase.util.Threads;
45 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
46 import org.junit.AfterClass;
47 import org.junit.Assert;
48 import org.junit.Before;
49 import org.junit.BeforeClass;
50 import org.junit.Test;
51 import org.junit.experimental.categories.Category;
52
53
54
55
56 @Category(MediumTests.class)
57 public class TestReplicationEndpoint extends TestReplicationBase {
58 private static final Log LOG = LogFactory.getLog(TestReplicationEndpoint.class);
59
60 static int numRegionServers;
61
62 @BeforeClass
63 public static void setUpBeforeClass() throws Exception {
64 TestReplicationBase.setUpBeforeClass();
65 admin.removePeer("2");
66 numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
67 }
68
69 @AfterClass
70 public static void tearDownAfterClass() throws Exception {
71 TestReplicationBase.tearDownAfterClass();
72
73 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
74 }
75
76 @Before
77 public void setup() throws Exception {
78 ReplicationEndpointForTest.contructedCount.set(0);
79 ReplicationEndpointForTest.startedCount.set(0);
80 ReplicationEndpointForTest.replicateCount.set(0);
81 ReplicationEndpointReturningFalse.replicated.set(false);
82 ReplicationEndpointForTest.lastEntries = null;
83 final List<RegionServerThread> rsThreads =
84 utility1.getMiniHBaseCluster().getRegionServerThreads();
85 for (RegionServerThread rs : rsThreads) {
86 utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName());
87 }
88
89 utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
90 @Override
91 public boolean evaluate() throws Exception {
92 for (RegionServerThread rs : rsThreads) {
93 if (!rs.getRegionServer().walRollRequestFinished()) {
94 return false;
95 }
96 }
97 return true;
98 }
99
100 @Override
101 public String explainFailure() throws Exception {
102 List<String> logRollInProgressRsList = new ArrayList<String>();
103 for (RegionServerThread rs : rsThreads) {
104 if (!rs.getRegionServer().walRollRequestFinished()) {
105 logRollInProgressRsList.add(rs.getRegionServer().toString());
106 }
107 }
108 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
109 }
110 });
111 }
112
113 @Test (timeout=120000)
114 public void testCustomReplicationEndpoint() throws Exception {
115
116 admin.addPeer("testCustomReplicationEndpoint",
117 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
118 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
119
120
121 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
122 @Override
123 public boolean evaluate() throws Exception {
124 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
125 }
126 });
127
128 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
129 @Override
130 public boolean evaluate() throws Exception {
131 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
132 }
133 });
134
135 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
136
137
138 doPut(Bytes.toBytes("row42"));
139
140 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
141 @Override
142 public boolean evaluate() throws Exception {
143 return ReplicationEndpointForTest.replicateCount.get() >= 1;
144 }
145 });
146
147 doAssert(Bytes.toBytes("row42"));
148
149 admin.removePeer("testCustomReplicationEndpoint");
150 }
151
152 @Test (timeout=120000)
153 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
154 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
155 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
156 int peerCount = admin.getPeersCount();
157 final String id = "testReplicationEndpointReturnsFalseOnReplicate";
158 admin.addPeer(id,
159 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
160 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
161
162
163
164 if (admin.getPeersCount() <= peerCount) {
165 LOG.info("Waiting on peercount to go up from " + peerCount);
166 Threads.sleep(100);
167 }
168
169 doPut(row);
170
171 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
172 @Override
173 public boolean evaluate() throws Exception {
174
175
176 int count = ReplicationEndpointForTest.replicateCount.get();
177 LOG.info("count=" + count);
178 return ReplicationEndpointReturningFalse.replicated.get();
179 }
180 });
181 if (ReplicationEndpointReturningFalse.ex.get() != null) {
182 throw ReplicationEndpointReturningFalse.ex.get();
183 }
184
185 admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
186 }
187
188 @Test (timeout=120000)
189 public void testInterClusterReplication() throws Exception {
190 final String id = "testInterClusterReplication";
191
192 List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
193 int totEdits = 0;
194
195
196
197 for(HRegion region: regions) {
198 HRegionInfo hri = region.getRegionInfo();
199 byte[] row = hri.getStartKey();
200 for (int i = 0; i < 100; i++) {
201 if (row.length > 0) {
202 Put put = new Put(row);
203 put.addColumn(famName, row, row);
204 region.put(put);
205 totEdits++;
206 }
207 }
208 }
209
210 admin.addPeer(id,
211 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
212 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
213 null);
214
215 final int numEdits = totEdits;
216 Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
217 @Override
218 public boolean evaluate() throws Exception {
219 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
220 }
221 @Override
222 public String explainFailure() throws Exception {
223 String failure = "Failed to replicate all edits, expected = " + numEdits
224 + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
225 return failure;
226 }
227 });
228
229 admin.removePeer("testInterClusterReplication");
230 utility1.deleteTableData(tableName);
231 }
232
233 @Test (timeout=120000)
234 public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
235 admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
236 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
237 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
238
239 try (Connection connection = ConnectionFactory.createConnection(conf1)) {
240 doPut(connection, Bytes.toBytes("row1"));
241 doPut(connection, row);
242 doPut(connection, Bytes.toBytes("row2"));
243 }
244
245 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
246 @Override
247 public boolean evaluate() throws Exception {
248 return ReplicationEndpointForTest.replicateCount.get() >= 1;
249 }
250 });
251
252 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
253 admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
254 }
255
256
257 private void doPut(byte[] row) throws IOException {
258 try (Connection connection = ConnectionFactory.createConnection(conf1)) {
259 doPut(connection, row);
260 }
261 }
262
263 private void doPut(final Connection connection, final byte [] row) throws IOException {
264 try (Table t = connection.getTable(tableName)) {
265 Put put = new Put(row);
266 put.add(famName, row, row);
267 t.put(put);
268 }
269 }
270
271 private static void doAssert(byte[] row) throws Exception {
272 if (ReplicationEndpointForTest.lastEntries == null) {
273 return;
274 }
275 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
276 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
277 Assert.assertEquals(1, cells.size());
278 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
279 cells.get(0).getRowLength(), row, 0, row.length));
280 }
281
282 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
283 static UUID uuid = UUID.randomUUID();
284 static AtomicInteger contructedCount = new AtomicInteger();
285 static AtomicInteger startedCount = new AtomicInteger();
286 static AtomicInteger stoppedCount = new AtomicInteger();
287 static AtomicInteger replicateCount = new AtomicInteger();
288 static volatile List<Entry> lastEntries = null;
289
290 public ReplicationEndpointForTest() {
291 contructedCount.incrementAndGet();
292 }
293
294 @Override
295 public UUID getPeerUUID() {
296 return uuid;
297 }
298
299 @Override
300 public boolean replicate(ReplicateContext replicateContext) {
301 replicateCount.incrementAndGet();
302 lastEntries = replicateContext.entries;
303 return true;
304 }
305
306 @Override
307 protected void doStart() {
308 startedCount.incrementAndGet();
309 notifyStarted();
310 }
311
312 @Override
313 protected void doStop() {
314 stoppedCount.incrementAndGet();
315 notifyStopped();
316 }
317 }
318
319 public static class InterClusterReplicationEndpointForTest
320 extends HBaseInterClusterReplicationEndpoint {
321
322 static AtomicInteger replicateCount = new AtomicInteger();
323 static boolean failedOnce;
324
325 @Override
326 public boolean replicate(ReplicateContext replicateContext) {
327 boolean success = super.replicate(replicateContext);
328 if (success) {
329 replicateCount.addAndGet(replicateContext.entries.size());
330 }
331 return success;
332 }
333
334 @Override
335 protected Replicator createReplicator(List<Entry> entries, int ordinal) {
336
337 if (failedOnce) {
338 return new DummyReplicator(entries, ordinal);
339 } else {
340 failedOnce = true;
341 return new FailingDummyReplicator(entries, ordinal);
342 }
343 }
344
345 protected class DummyReplicator extends Replicator {
346
347 private int ordinal;
348
349 public DummyReplicator(List<Entry> entries, int ordinal) {
350 super(entries, ordinal);
351 this.ordinal = ordinal;
352 }
353
354 @Override
355 public Integer call() throws IOException {
356 return ordinal;
357 }
358 }
359
360 protected class FailingDummyReplicator extends DummyReplicator {
361
362 public FailingDummyReplicator(List<Entry> entries, int ordinal) {
363 super(entries, ordinal);
364 }
365
366 @Override
367 public Integer call() throws IOException {
368 throw new IOException("Sample Exception: Failed to replicate.");
369 }
370 }
371 }
372
373 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
374 static int COUNT = 10;
375 static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
376 static AtomicBoolean replicated = new AtomicBoolean(false);
377 @Override
378 public boolean replicate(ReplicateContext replicateContext) {
379 try {
380
381 doAssert(row);
382 } catch (Exception e) {
383 ex.set(e);
384 }
385
386 super.replicate(replicateContext);
387 LOG.info("Replicated " + row + ", count=" + replicateCount.get());
388
389 replicated.set(replicateCount.get() > COUNT);
390 return replicated.get();
391 }
392 }
393
394
395 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
396 static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
397
398 @Override
399 public boolean replicate(ReplicateContext replicateContext) {
400 try {
401 super.replicate(replicateContext);
402 doAssert(row);
403 } catch (Exception e) {
404 ex.set(e);
405 }
406 return true;
407 }
408
409 @Override
410 public WALEntryFilter getWALEntryfilter() {
411 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
412 @Override
413 public Entry filter(Entry entry) {
414 ArrayList<Cell> cells = entry.getEdit().getCells();
415 int size = cells.size();
416 for (int i = size-1; i >= 0; i--) {
417 Cell cell = cells.get(i);
418 if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
419 row, 0, row.length)) {
420 cells.remove(i);
421 }
422 }
423 return entry;
424 }
425 });
426 }
427 }
428 }