1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.*;
22
23 import java.util.List;
24 import java.util.SortedMap;
25 import java.util.SortedSet;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
31 import org.apache.zookeeper.KeeperException;
32 import org.junit.Before;
33 import org.junit.Test;
34
35
36
37
38
39 public abstract class TestReplicationStateBasic {
40
41 protected ReplicationQueues rq1;
42 protected ReplicationQueues rq2;
43 protected ReplicationQueues rq3;
44 protected ReplicationQueuesClient rqc;
45 protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
46 protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString();
47 protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString();
48 protected ReplicationPeers rp;
49 protected static final String ID_ONE = "1";
50 protected static final String ID_TWO = "2";
51 protected static String KEY_ONE;
52 protected static String KEY_TWO;
53
54
55 protected String OUR_ID = "3";
56 protected String OUR_KEY;
57
58 protected static int zkTimeoutCount;
59 protected static final int ZK_MAX_COUNT = 300;
60 protected static final int ZK_SLEEP_INTERVAL = 100;
61
62 private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
63
64 @Before
65 public void setUp() {
66 zkTimeoutCount = 0;
67 }
68
69 @Test
70 public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
71 rqc.init();
72
73 assertEquals(0, rqc.getListOfReplicators().size());
74 assertNull(rqc.getLogsInQueue(server1, "qId1"));
75 assertNull(rqc.getAllQueues(server1));
76
77
78
79
80
81 rq1.init(server1);
82 rq2.init(server2);
83 rq1.addLog("qId1", "trash");
84 rq1.removeLog("qId1", "trash");
85 rq1.addLog("qId2", "filename1");
86 rq1.addLog("qId3", "filename2");
87 rq1.addLog("qId3", "filename3");
88 rq2.addLog("trash", "trash");
89 rq2.removeQueue("trash");
90
91 List<String> reps = rqc.getListOfReplicators();
92 assertEquals(2, reps.size());
93 assertTrue(server1, reps.contains(server1));
94 assertTrue(server2, reps.contains(server2));
95
96 assertNull(rqc.getLogsInQueue("bogus", "bogus"));
97 assertNull(rqc.getLogsInQueue(server1, "bogus"));
98 assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
99 assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
100 assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
101
102 assertNull(rqc.getAllQueues("bogus"));
103 assertEquals(0, rqc.getAllQueues(server2).size());
104 List<String> list = rqc.getAllQueues(server1);
105 assertEquals(3, list.size());
106 assertTrue(list.contains("qId2"));
107 assertTrue(list.contains("qId3"));
108 }
109
110 @Test
111 public void testReplicationQueues() throws ReplicationException {
112 rq1.init(server1);
113 rq2.init(server2);
114 rq3.init(server3);
115
116 rp.init();
117
118
119 assertEquals(3, rq1.getListOfReplicators().size());
120 rq1.removeQueue("bogus");
121 rq1.removeLog("bogus", "bogus");
122 rq1.removeAllQueues();
123 assertNull(rq1.getAllQueues());
124 assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
125 assertNull(rq1.getLogsInQueue("bogus"));
126 assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());
127
128 rq1.setLogPosition("bogus", "bogus", 5L);
129
130 populateQueues();
131
132 assertEquals(3, rq1.getListOfReplicators().size());
133 assertEquals(0, rq2.getLogsInQueue("qId1").size());
134 assertEquals(5, rq3.getLogsInQueue("qId5").size());
135 assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
136 rq3.setLogPosition("qId5", "filename4", 354L);
137 assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
138
139 assertEquals(5, rq3.getLogsInQueue("qId5").size());
140 assertEquals(0, rq2.getLogsInQueue("qId1").size());
141 assertEquals(0, rq1.getAllQueues().size());
142 assertEquals(1, rq2.getAllQueues().size());
143 assertEquals(5, rq3.getAllQueues().size());
144
145 assertEquals(0, rq3.claimQueues(server1).size());
146 assertEquals(2, rq3.getListOfReplicators().size());
147
148 SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
149 assertEquals(5, queues.size());
150 assertEquals(1, rq2.getListOfReplicators().size());
151
152
153 assertEquals(0, rq2.claimQueues(server2).size());
154
155 assertEquals(6, rq2.getAllQueues().size());
156
157 rq2.removeAllQueues();
158
159 assertEquals(0, rq2.getListOfReplicators().size());
160 }
161
162 @Test
163 public void testReplicationPeers() throws Exception {
164 rp.init();
165
166
167 try {
168 rp.removePeer("bogus");
169 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
170 } catch (IllegalArgumentException e) {
171 }
172 try {
173 rp.enablePeer("bogus");
174 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
175 } catch (IllegalArgumentException e) {
176 }
177 try {
178 rp.disablePeer("bogus");
179 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
180 } catch (IllegalArgumentException e) {
181 }
182 try {
183 rp.getStatusOfPeer("bogus");
184 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
185 } catch (IllegalArgumentException e) {
186 }
187 assertFalse(rp.peerAdded("bogus"));
188 rp.peerRemoved("bogus");
189
190 assertNull(rp.getPeerConf("bogus"));
191 assertNumberOfPeers(0);
192
193
194 rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
195 assertNumberOfPeers(1);
196 rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
197 assertNumberOfPeers(2);
198
199
200 try {
201 rp.getStatusOfPeer(ID_ONE);
202 fail("There are no connected peers, should have thrown an IllegalArgumentException");
203 } catch (IllegalArgumentException e) {
204 }
205 assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
206 rp.removePeer(ID_ONE);
207 rp.peerRemoved(ID_ONE);
208 assertNumberOfPeers(1);
209
210
211 rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
212 rp.peerAdded(ID_ONE);
213 assertNumberOfPeers(2);
214 assertTrue(rp.getStatusOfPeer(ID_ONE));
215 rp.disablePeer(ID_ONE);
216 assertConnectedPeerStatus(false, ID_ONE);
217 rp.enablePeer(ID_ONE);
218 assertConnectedPeerStatus(true, ID_ONE);
219
220
221 rp.peerRemoved(ID_ONE);
222 assertNumberOfPeers(2);
223 try {
224 rp.getStatusOfPeer(ID_ONE);
225 fail("There are no connected peers, should have thrown an IllegalArgumentException");
226 } catch (IllegalArgumentException e) {
227 }
228 }
229
230 protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
231
232 if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
233 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
234 }
235 while (true) {
236 if (status == rp.getStatusOfPeer(peerId)) {
237 return;
238 }
239 if (zkTimeoutCount < ZK_MAX_COUNT) {
240 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
241 + ", sleeping and trying again.");
242 Thread.sleep(ZK_SLEEP_INTERVAL);
243 } else {
244 fail("Timed out waiting for ConnectedPeerStatus to be " + status);
245 }
246 }
247 }
248
249 protected void assertNumberOfPeers(int total) {
250 assertEquals(total, rp.getAllPeerConfigs().size());
251 assertEquals(total, rp.getAllPeerIds().size());
252 assertEquals(total, rp.getAllPeerIds().size());
253 }
254
255
256
257
258
259 protected void populateQueues() throws ReplicationException {
260 rq1.addLog("trash", "trash");
261 rq1.removeQueue("trash");
262
263 rq2.addLog("qId1", "trash");
264 rq2.removeLog("qId1", "trash");
265
266 for (int i = 1; i < 6; i++) {
267 for (int j = 0; j < i; j++) {
268 rq3.addLog("qId" + i, "filename" + j);
269 }
270
271 rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null);
272 }
273 }
274 }
275