View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.UUID;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.testclassification.MediumTests;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.Waiter;
35  import org.apache.hadoop.hbase.client.Connection;
36  import org.apache.hadoop.hbase.client.ConnectionFactory;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.wal.WAL.Entry;
40  import org.apache.hadoop.hbase.regionserver.HRegion;
41  import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
44  import org.apache.hadoop.hbase.util.Threads;
45  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
46  import org.junit.AfterClass;
47  import org.junit.Assert;
48  import org.junit.Before;
49  import org.junit.BeforeClass;
50  import org.junit.Test;
51  import org.junit.experimental.categories.Category;
52  
53  /**
54   * Tests ReplicationSource and ReplicationEndpoint interactions
55   */
56  @Category(MediumTests.class)
57  public class TestReplicationEndpoint extends TestReplicationBase {
58    private static final Log LOG = LogFactory.getLog(TestReplicationEndpoint.class);
59  
60    static int numRegionServers;
61  
62    @BeforeClass
63    public static void setUpBeforeClass() throws Exception {
64      TestReplicationBase.setUpBeforeClass();
65      admin.removePeer("2");
66      numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
67    }
68  
69    @AfterClass
70    public static void tearDownAfterClass() throws Exception {
71      TestReplicationBase.tearDownAfterClass();
72      // check stop is called
73      Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
74    }
75  
76    @Before
77    public void setup() throws Exception {
78      ReplicationEndpointForTest.contructedCount.set(0);
79      ReplicationEndpointForTest.startedCount.set(0);
80      ReplicationEndpointForTest.replicateCount.set(0);
81      ReplicationEndpointReturningFalse.replicated.set(false);
82      ReplicationEndpointForTest.lastEntries = null;
83      final List<RegionServerThread> rsThreads =
84          utility1.getMiniHBaseCluster().getRegionServerThreads();
85      for (RegionServerThread rs : rsThreads) {
86        utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName());
87      }
88      // Wait for  all log roll to finish
89      utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
90        @Override
91        public boolean evaluate() throws Exception {
92          for (RegionServerThread rs : rsThreads) {
93            if (!rs.getRegionServer().walRollRequestFinished()) {
94              return false;
95            }
96          }
97          return true;
98        }
99  
100       @Override
101       public String explainFailure() throws Exception {
102         List<String> logRollInProgressRsList = new ArrayList<String>();
103         for (RegionServerThread rs : rsThreads) {
104           if (!rs.getRegionServer().walRollRequestFinished()) {
105             logRollInProgressRsList.add(rs.getRegionServer().toString());
106           }
107         }
108         return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
109       }
110     });
111   }
112 
113   @Test (timeout=120000)
114   public void testCustomReplicationEndpoint() throws Exception {
115     // test installing a custom replication endpoint other than the default one.
116     admin.addPeer("testCustomReplicationEndpoint",
117       new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
118         .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
119 
120     // check whether the class has been constructed and started
121     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
122       @Override
123       public boolean evaluate() throws Exception {
124         return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
125       }
126     });
127 
128     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
129       @Override
130       public boolean evaluate() throws Exception {
131         return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
132       }
133     });
134 
135     Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
136 
137     // now replicate some data.
138     doPut(Bytes.toBytes("row42"));
139 
140     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
141       @Override
142       public boolean evaluate() throws Exception {
143         return ReplicationEndpointForTest.replicateCount.get() >= 1;
144       }
145     });
146 
147     doAssert(Bytes.toBytes("row42"));
148 
149     admin.removePeer("testCustomReplicationEndpoint");
150   }
151 
152   @Test (timeout=120000)
153   public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
154     Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
155     Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
156     int peerCount = admin.getPeersCount();
157     final String id = "testReplicationEndpointReturnsFalseOnReplicate";
158     admin.addPeer(id,
159       new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
160         .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
161     // This test is flakey and then there is so much stuff flying around in here its, hard to
162     // debug.  Peer needs to be up for the edit to make it across. This wait on
163     // peer count seems to be a hack that has us not progress till peer is up.
164     if (admin.getPeersCount() <= peerCount) {
165       LOG.info("Waiting on peercount to go up from " + peerCount);
166       Threads.sleep(100);
167     }
168     // now replicate some data
169     doPut(row);
170 
171     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
172       @Override
173       public boolean evaluate() throws Exception {
174         // Looks like replication endpoint returns false unless we put more than 10 edits. We
175         // only send over one edit.
176         int count = ReplicationEndpointForTest.replicateCount.get();
177         LOG.info("count=" + count);
178         return ReplicationEndpointReturningFalse.replicated.get();
179       }
180     });
181     if (ReplicationEndpointReturningFalse.ex.get() != null) {
182       throw ReplicationEndpointReturningFalse.ex.get();
183     }
184 
185     admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
186   }
187 
188   @Test (timeout=120000)
189   public void testInterClusterReplication() throws Exception {
190     final String id = "testInterClusterReplication";
191 
192     List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
193     int totEdits = 0;
194 
195     // Make sure edits are spread across regions because we do region based batching
196     // before shipping edits.
197     for(HRegion region: regions) {
198       HRegionInfo hri = region.getRegionInfo();
199       byte[] row = hri.getStartKey();
200       for (int i = 0; i < 100; i++) {
201         if (row.length > 0) {
202           Put put = new Put(row);
203           put.addColumn(famName, row, row);
204           region.put(put);
205           totEdits++;
206         }
207       }
208     }
209 
210     admin.addPeer(id,
211         new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
212             .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
213         null);
214 
215     final int numEdits = totEdits;
216     Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
217       @Override
218       public boolean evaluate() throws Exception {
219         return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
220       }
221       @Override
222       public String explainFailure() throws Exception {
223         String failure = "Failed to replicate all edits, expected = " + numEdits
224             + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
225         return failure;
226       }
227     });
228 
229     admin.removePeer("testInterClusterReplication");
230     utility1.deleteTableData(tableName);
231   }
232 
233   @Test (timeout=120000)
234   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
235     admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
236       new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
237         .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
238     // now replicate some data.
239     try (Connection connection = ConnectionFactory.createConnection(conf1)) {
240       doPut(connection, Bytes.toBytes("row1"));
241       doPut(connection, row);
242       doPut(connection, Bytes.toBytes("row2"));
243     }
244 
245     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
246       @Override
247       public boolean evaluate() throws Exception {
248         return ReplicationEndpointForTest.replicateCount.get() >= 1;
249       }
250     });
251 
252     Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
253     admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
254   }
255 
256 
257   private void doPut(byte[] row) throws IOException {
258     try (Connection connection = ConnectionFactory.createConnection(conf1)) {
259       doPut(connection, row);
260     }
261   }
262 
263   private void doPut(final Connection connection, final byte [] row) throws IOException {
264     try (Table t = connection.getTable(tableName)) {
265       Put put = new Put(row);
266       put.add(famName, row, row);
267       t.put(put);
268     }
269   }
270 
271   private static void doAssert(byte[] row) throws Exception {
272     if (ReplicationEndpointForTest.lastEntries == null) {
273       return; // first call
274     }
275     Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
276     List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
277     Assert.assertEquals(1, cells.size());
278     Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
279       cells.get(0).getRowLength(), row, 0, row.length));
280   }
281 
282   public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
283     static UUID uuid = UUID.randomUUID();
284     static AtomicInteger contructedCount = new AtomicInteger();
285     static AtomicInteger startedCount = new AtomicInteger();
286     static AtomicInteger stoppedCount = new AtomicInteger();
287     static AtomicInteger replicateCount = new AtomicInteger();
288     static volatile List<Entry> lastEntries = null;
289 
290     public ReplicationEndpointForTest() {
291       contructedCount.incrementAndGet();
292     }
293 
294     @Override
295     public UUID getPeerUUID() {
296       return uuid;
297     }
298 
299     @Override
300     public boolean replicate(ReplicateContext replicateContext) {
301       replicateCount.incrementAndGet();
302       lastEntries = replicateContext.entries;
303       return true;
304     }
305 
306     @Override
307     protected void doStart() {
308       startedCount.incrementAndGet();
309       notifyStarted();
310     }
311 
312     @Override
313     protected void doStop() {
314       stoppedCount.incrementAndGet();
315       notifyStopped();
316     }
317   }
318 
319   public static class InterClusterReplicationEndpointForTest
320       extends HBaseInterClusterReplicationEndpoint {
321 
322     static AtomicInteger replicateCount = new AtomicInteger();
323     static boolean failedOnce;
324 
325     @Override
326     public boolean replicate(ReplicateContext replicateContext) {
327       boolean success = super.replicate(replicateContext);
328       if (success) {
329         replicateCount.addAndGet(replicateContext.entries.size());
330       }
331       return success;
332     }
333 
334     @Override
335     protected Replicator createReplicator(List<Entry> entries, int ordinal) {
336       // Fail only once, we don't want to slow down the test.
337       if (failedOnce) {
338         return new DummyReplicator(entries, ordinal);
339       } else {
340         failedOnce = true;
341         return new FailingDummyReplicator(entries, ordinal);
342       }
343     }
344 
345     protected class DummyReplicator extends Replicator {
346 
347       private int ordinal;
348 
349       public DummyReplicator(List<Entry> entries, int ordinal) {
350         super(entries, ordinal);
351         this.ordinal = ordinal;
352       }
353 
354       @Override
355       public Integer call() throws IOException {
356         return ordinal;
357       }
358     }
359 
360     protected class FailingDummyReplicator extends DummyReplicator {
361 
362       public FailingDummyReplicator(List<Entry> entries, int ordinal) {
363         super(entries, ordinal);
364       }
365 
366       @Override
367       public Integer call() throws IOException {
368         throw new IOException("Sample Exception: Failed to replicate.");
369       }
370     }
371   }
372 
373   public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
374     static int COUNT = 10;
375     static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
376     static AtomicBoolean replicated = new AtomicBoolean(false);
377     @Override
378     public boolean replicate(ReplicateContext replicateContext) {
379       try {
380         // check row
381         doAssert(row);
382       } catch (Exception e) {
383         ex.set(e);
384       }
385 
386       super.replicate(replicateContext);
387       LOG.info("Replicated " + row + ", count=" + replicateCount.get());
388 
389       replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
390       return replicated.get();
391     }
392   }
393 
394   // return a WALEntry filter which only accepts "row", but not other rows
395   public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
396     static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
397 
398     @Override
399     public boolean replicate(ReplicateContext replicateContext) {
400       try {
401         super.replicate(replicateContext);
402         doAssert(row);
403       } catch (Exception e) {
404         ex.set(e);
405       }
406       return true;
407     }
408 
409     @Override
410     public WALEntryFilter getWALEntryfilter() {
411       return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
412         @Override
413         public Entry filter(Entry entry) {
414           ArrayList<Cell> cells = entry.getEdit().getCells();
415           int size = cells.size();
416           for (int i = size-1; i >= 0; i--) {
417             Cell cell = cells.get(i);
418             if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
419               row, 0, row.length)) {
420               cells.remove(i);
421             }
422           }
423           return entry;
424         }
425       });
426     }
427   }
428 }