View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication;
21  
22  import static org.junit.Assert.assertArrayEquals;
23  import static org.junit.Assert.assertEquals;
24  import static org.junit.Assert.assertNotNull;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.util.Arrays;
29  import java.util.concurrent.CountDownLatch;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.*;
36  import org.apache.hadoop.hbase.client.Delete;
37  import org.apache.hadoop.hbase.client.Get;
38  import org.apache.hadoop.hbase.client.Admin;
39  import org.apache.hadoop.hbase.client.HBaseAdmin;
40  import org.apache.hadoop.hbase.client.HTable;
41  import org.apache.hadoop.hbase.client.Put;
42  import org.apache.hadoop.hbase.client.Result;
43  import org.apache.hadoop.hbase.client.Table;
44  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
45  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
46  import org.apache.hadoop.hbase.regionserver.HRegion;
47  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
48  import org.apache.hadoop.hbase.testclassification.LargeTests;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
51  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
52  import org.junit.BeforeClass;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  
56  @Category(LargeTests.class)
57  public class TestMultiSlaveReplication {
58  
59    private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
60  
61    private static Configuration conf1;
62    private static Configuration conf2;
63    private static Configuration conf3;
64  
65    private static HBaseTestingUtility utility1;
66    private static HBaseTestingUtility utility2;
67    private static HBaseTestingUtility utility3;
68    private static final long SLEEP_TIME = 500;
69    private static final int NB_RETRIES = 100;
70  
71    private static final TableName tableName = TableName.valueOf("test");
72    private static final byte[] famName = Bytes.toBytes("f");
73    private static final byte[] row = Bytes.toBytes("row");
74    private static final byte[] row1 = Bytes.toBytes("row1");
75    private static final byte[] row2 = Bytes.toBytes("row2");
76    private static final byte[] row3 = Bytes.toBytes("row3");
77    private static final byte[] noRepfamName = Bytes.toBytes("norep");
78  
79    private static HTableDescriptor table;
80  
81    @BeforeClass
82    public static void setUpBeforeClass() throws Exception {
83      conf1 = HBaseConfiguration.create();
84      conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
85      // smaller block size and capacity to trigger more operations
86      // and test them
87      conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
88      conf1.setInt("replication.source.size.capacity", 1024);
89      conf1.setLong("replication.source.sleepforretries", 100);
90      conf1.setInt("hbase.regionserver.maxlogs", 10);
91      conf1.setLong("hbase.master.logcleaner.ttl", 10);
92      conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
93      conf1.setBoolean("dfs.support.append", true);
94      conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
95      conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
96          "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
97  
98      utility1 = new HBaseTestingUtility(conf1);
99      utility1.startMiniZKCluster();
100     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
101     new ZooKeeperWatcher(conf1, "cluster1", null, true);
102 
103     conf2 = new Configuration(conf1);
104     conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
105 
106     conf3 = new Configuration(conf1);
107     conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
108 
109     utility2 = new HBaseTestingUtility(conf2);
110     utility2.setZkCluster(miniZK);
111     new ZooKeeperWatcher(conf2, "cluster2", null, true);
112 
113     utility3 = new HBaseTestingUtility(conf3);
114     utility3.setZkCluster(miniZK);
115     new ZooKeeperWatcher(conf3, "cluster3", null, true);
116 
117     table = new HTableDescriptor(tableName);
118     HColumnDescriptor fam = new HColumnDescriptor(famName);
119     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
120     table.addFamily(fam);
121     fam = new HColumnDescriptor(noRepfamName);
122     table.addFamily(fam);
123   }
124 
125   @Test(timeout=300000)
126   public void testMultiSlaveReplication() throws Exception {
127     LOG.info("testCyclicReplication");
128     MiniHBaseCluster master = utility1.startMiniCluster();
129     utility2.startMiniCluster();
130     utility3.startMiniCluster();
131     ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
132 
133     new HBaseAdmin(conf1).createTable(table);
134     new HBaseAdmin(conf2).createTable(table);
135     new HBaseAdmin(conf3).createTable(table);
136     Table htable1 = new HTable(conf1, tableName);
137     htable1.setWriteBufferSize(1024);
138     Table htable2 = new HTable(conf2, tableName);
139     htable2.setWriteBufferSize(1024);
140     Table htable3 = new HTable(conf3, tableName);
141     htable3.setWriteBufferSize(1024);
142 
143     admin1.addPeer("1", utility2.getClusterKey());
144 
145     // put "row" and wait 'til it got around, then delete
146     putAndWait(row, famName, htable1, htable2);
147     deleteAndWait(row, htable1, htable2);
148     // check it wasn't replication to cluster 3
149     checkRow(row,0,htable3);
150 
151     putAndWait(row2, famName, htable1, htable2);
152 
153     // now roll the region server's logs
154     rollWALAndWait(utility1, htable1.getName(), row2);
155 
156     // after the log was rolled put a new row
157     putAndWait(row3, famName, htable1, htable2);
158 
159     admin1.addPeer("2", utility3.getClusterKey());
160 
161     // put a row, check it was replicated to all clusters
162     putAndWait(row1, famName, htable1, htable2, htable3);
163     // delete and verify
164     deleteAndWait(row1, htable1, htable2, htable3);
165 
166     // make sure row2 did not get replicated after
167     // cluster 3 was added
168     checkRow(row2,0,htable3);
169 
170     // row3 will get replicated, because it was in the
171     // latest log
172     checkRow(row3,1,htable3);
173 
174     Put p = new Put(row);
175     p.add(famName, row, row);
176     htable1.put(p);
177     // now roll the logs again
178     rollWALAndWait(utility1, htable1.getName(), row);
179 
180     // cleanup "row2", also conveniently use this to wait replication
181     // to finish
182     deleteAndWait(row2, htable1, htable2, htable3);
183     // Even if the log was rolled in the middle of the replication
184     // "row" is still replication.
185     checkRow(row, 1, htable2);
186     // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it,
187     // we should wait before checking.
188     checkWithWait(row, 1, htable3);
189 
190     // cleanup the rest
191     deleteAndWait(row, htable1, htable2, htable3);
192     deleteAndWait(row3, htable1, htable2, htable3);
193 
194     utility3.shutdownMiniCluster();
195     utility2.shutdownMiniCluster();
196     utility1.shutdownMiniCluster();
197   }
198 
199   private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
200       final byte[] row) throws IOException {
201     final Admin admin = utility.getHBaseAdmin();
202     final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
203 
204     // find the region that corresponds to the given row.
205     HRegion region = null;
206     for (HRegion candidate : cluster.getRegions(table)) {
207       if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
208         region = candidate;
209         break;
210       }
211     }
212     assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
213 
214     final CountDownLatch latch = new CountDownLatch(1);
215 
216     // listen for successful log rolls
217     final WALActionsListener listener = new WALActionsListener.Base() {
218           @Override
219           public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
220             latch.countDown();
221           }
222         };
223     region.getWAL().registerWALActionsListener(listener);
224 
225     // request a roll
226     admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
227       region.getRegionInfo().getRegionName()));
228 
229     // wait
230     try {
231       latch.await();
232     } catch (InterruptedException exception) {
233       LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
234           "replication tests fail, it's probably because we should still be waiting.");
235       Thread.currentThread().interrupt();
236     }
237     region.getWAL().unregisterWALActionsListener(listener);
238   }
239 
240  
241   private void checkWithWait(byte[] row, int count, Table table) throws Exception {
242     Get get = new Get(row);
243     for (int i = 0; i < NB_RETRIES; i++) {
244       if (i == NB_RETRIES - 1) {
245         fail("Waited too much time while getting the row.");
246       }
247       boolean rowReplicated = false;
248       Result res = table.get(get);
249       if (res.size() >= 1) {
250         LOG.info("Row is replicated");
251         rowReplicated = true;
252         assertEquals("Table '" + table + "' did not have the expected number of  results.",
253             count, res.size());
254         break;
255       }
256       if (rowReplicated) {
257         break;
258       } else {
259         Thread.sleep(SLEEP_TIME);
260       }
261     }
262   }
263 
264   private void checkRow(byte[] row, int count, Table... tables) throws IOException {
265     Get get = new Get(row);
266     for (Table table : tables) {
267       Result res = table.get(get);
268       assertEquals("Table '" + table + "' did not have the expected number of results.",
269           count, res.size());
270     }
271   }
272 
273   private void deleteAndWait(byte[] row, Table source, Table... targets)
274   throws Exception {
275     Delete del = new Delete(row);
276     source.delete(del);
277 
278     Get get = new Get(row);
279     for (int i = 0; i < NB_RETRIES; i++) {
280       if (i==NB_RETRIES-1) {
281         fail("Waited too much time for del replication");
282       }
283       boolean removedFromAll = true;
284       for (Table target : targets) {
285         Result res = target.get(get);
286         if (res.size() >= 1) {
287           LOG.info("Row not deleted");
288           removedFromAll = false;
289           break;
290         }
291       }
292       if (removedFromAll) {
293         break;
294       } else {
295         Thread.sleep(SLEEP_TIME);
296       }
297     }
298   }
299 
300   private void putAndWait(byte[] row, byte[] fam, Table source, Table... targets)
301   throws Exception {
302     Put put = new Put(row);
303     put.add(fam, row, row);
304     source.put(put);
305 
306     Get get = new Get(row);
307     for (int i = 0; i < NB_RETRIES; i++) {
308       if (i==NB_RETRIES-1) {
309         fail("Waited too much time for put replication");
310       }
311       boolean replicatedToAll = true;
312       for (Table target : targets) {
313         Result res = target.get(get);
314         if (res.size() == 0) {
315           LOG.info("Row not available");
316           replicatedToAll = false;
317           break;
318         } else {
319           assertArrayEquals(res.value(), row);
320         }
321       }
322       if (replicatedToAll) {
323         break;
324       } else {
325         Thread.sleep(SLEEP_TIME);
326       }
327     }
328   }
329 
330 }
331