1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import static org.junit.Assert.*;
22
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.*;
29 import org.apache.hadoop.hbase.testclassification.MediumTests;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.junit.AfterClass;
32 import org.junit.BeforeClass;
33 import org.junit.Test;
34 import org.junit.experimental.categories.Category;
35
36
37
38 @Category(MediumTests.class)
39 public class TestZKLeaderManager {
40 private static final Log LOG = LogFactory.getLog(TestZKLeaderManager.class);
41
42 private static final String LEADER_ZNODE =
43 "/test/" + TestZKLeaderManager.class.getSimpleName();
44
45 private static class MockAbortable implements Abortable {
46 private boolean aborted;
47
48 @Override
49 public void abort(String why, Throwable e) {
50 aborted = true;
51 LOG.fatal("Aborting during test: "+why, e);
52 fail("Aborted during test: " + why);
53 }
54
55 @Override
56 public boolean isAborted() {
57 return aborted;
58 }
59 }
60
61 private static class MockLeader extends Thread implements Stoppable {
62 private boolean stopped;
63 private ZooKeeperWatcher watcher;
64 private ZKLeaderManager zkLeader;
65 private AtomicBoolean master = new AtomicBoolean(false);
66 private int index;
67
68 public MockLeader(ZooKeeperWatcher watcher, int index) {
69 setDaemon(true);
70 setName("TestZKLeaderManager-leader-" + index);
71 this.index = index;
72 this.watcher = watcher;
73 this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE,
74 Bytes.toBytes(index), this);
75 }
76
77 public boolean isMaster() {
78 return master.get();
79 }
80
81 public int getIndex() {
82 return index;
83 }
84
85 public ZooKeeperWatcher getWatcher() {
86 return watcher;
87 }
88
89 public void run() {
90 while (!stopped) {
91 zkLeader.start();
92 zkLeader.waitToBecomeLeader();
93 master.set(true);
94
95 while (master.get() && !stopped) {
96 try {
97 Thread.sleep(10);
98 } catch (InterruptedException ignored) {}
99 }
100 }
101 }
102
103 public void abdicate() {
104 zkLeader.stepDownAsLeader();
105 master.set(false);
106 }
107
108 @Override
109 public void stop(String why) {
110 stopped = true;
111 abdicate();
112 watcher.close();
113 }
114
115 @Override
116 public boolean isStopped() {
117 return stopped;
118 }
119 }
120
121 private static HBaseTestingUtility TEST_UTIL;
122 private static MockLeader[] CANDIDATES;
123
124 @BeforeClass
125 public static void setupBeforeClass() throws Exception {
126 TEST_UTIL = new HBaseTestingUtility();
127 TEST_UTIL.startMiniZKCluster();
128 Configuration conf = TEST_UTIL.getConfiguration();
129
130
131 MockAbortable abortable = new MockAbortable();
132 CANDIDATES = new MockLeader[3];
133 for (int i = 0; i < 3; i++) {
134 ZooKeeperWatcher watcher = newZK(conf, "server"+i, abortable);
135 CANDIDATES[i] = new MockLeader(watcher, i);
136 CANDIDATES[i].start();
137 }
138 }
139
140 @AfterClass
141 public static void tearDownAfterClass() throws Exception {
142 TEST_UTIL.shutdownMiniZKCluster();
143 }
144
145 @Test
146 public void testLeaderSelection() throws Exception {
147 MockLeader currentLeader = getCurrentLeader();
148
149 assertNotNull("Leader should exist", currentLeader);
150 LOG.debug("Current leader index is "+currentLeader.getIndex());
151
152 byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
153 assertNotNull("Leader znode should contain leader index", znodeData);
154 assertTrue("Leader znode should not be empty", znodeData.length > 0);
155 int storedIndex = Bytes.toInt(znodeData);
156 LOG.debug("Stored leader index in ZK is "+storedIndex);
157 assertEquals("Leader znode should match leader index",
158 currentLeader.getIndex(), storedIndex);
159
160
161 currentLeader.abdicate();
162 assertFalse(currentLeader.isMaster());
163
164
165 currentLeader = getCurrentLeader();
166
167 assertNotNull("New leader should exist after abdication", currentLeader);
168 LOG.debug("New leader index is "+currentLeader.getIndex());
169
170 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
171 assertNotNull("Leader znode should contain leader index", znodeData);
172 assertTrue("Leader znode should not be empty", znodeData.length > 0);
173 storedIndex = Bytes.toInt(znodeData);
174 LOG.debug("Stored leader index in ZK is "+storedIndex);
175 assertEquals("Leader znode should match leader index",
176 currentLeader.getIndex(), storedIndex);
177
178
179 currentLeader.stop("Stopping for test");
180 assertFalse(currentLeader.isMaster());
181
182
183 currentLeader = getCurrentLeader();
184
185 assertNotNull("New leader should exist after stop", currentLeader);
186 LOG.debug("New leader index is "+currentLeader.getIndex());
187
188 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
189 assertNotNull("Leader znode should contain leader index", znodeData);
190 assertTrue("Leader znode should not be empty", znodeData.length > 0);
191 storedIndex = Bytes.toInt(znodeData);
192 LOG.debug("Stored leader index in ZK is "+storedIndex);
193 assertEquals("Leader znode should match leader index",
194 currentLeader.getIndex(), storedIndex);
195
196
197 currentLeader.stop("Stopping for test");
198 assertFalse(currentLeader.isMaster());
199
200
201 currentLeader = getCurrentLeader();
202 assertNotNull("New leader should exist", currentLeader);
203 }
204
205 private MockLeader getCurrentLeader() throws Exception {
206 MockLeader currentLeader = null;
207 outer:
208
209 for (int i = 0; i < 1000; i++) {
210 for (int j = 0; j < CANDIDATES.length; j++) {
211 if (CANDIDATES[j].isMaster()) {
212
213 if (currentLeader != null) {
214 fail("Both candidate "+currentLeader.getIndex()+" and "+j+" claim to be leader!");
215 }
216 currentLeader = CANDIDATES[j];
217 }
218 }
219 if (currentLeader != null) {
220 break outer;
221 }
222 Thread.sleep(10);
223 }
224 return currentLeader;
225 }
226
227 private static ZooKeeperWatcher newZK(Configuration conf, String name,
228 Abortable abort) throws Exception {
229 Configuration copy = HBaseConfiguration.create(conf);
230 ZooKeeperWatcher zk = new ZooKeeperWatcher(copy, name, abort);
231 return zk;
232 }
233
234 }
235