1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.CoordinatedStateManager;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.LocalHBaseCluster;
32 import org.apache.hadoop.hbase.ServerName;
33 import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
34 import org.apache.hadoop.hbase.master.HMaster;
35 import org.apache.hadoop.hbase.master.ServerManager;
36 import org.apache.hadoop.hbase.testclassification.MediumTests;
37 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
38 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
39 import org.apache.zookeeper.KeeperException;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44
45 @Category(MediumTests.class)
46 public class TestRegionServerReportForDuty {
47
48 private static final Log LOG = LogFactory.getLog(TestRegionServerReportForDuty.class);
49
50 private static final long SLEEP_INTERVAL = 500;
51
52 private HBaseTestingUtility testUtil;
53 private LocalHBaseCluster cluster;
54 private RegionServerThread rs;
55 private RegionServerThread rs2;
56 private MasterThread master;
57 private MasterThread backupMaster;
58
59 @Before
60 public void setUp() throws Exception {
61 testUtil = new HBaseTestingUtility();
62 testUtil.startMiniDFSCluster(1);
63 testUtil.startMiniZKCluster(1);
64 testUtil.createRootDir();
65 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
66 }
67
68 @After
69 public void tearDown() throws Exception {
70 cluster.shutdown();
71 cluster.join();
72 testUtil.shutdownMiniZKCluster();
73 testUtil.shutdownMiniDFSCluster();
74 }
75
76
77
78
79
80 @Test (timeout=180000)
81 public void testReportForDutyWithMasterChange() throws Exception {
82
83
84
85 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
86 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
87 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
88 master = cluster.addMaster();
89 rs = cluster.addRegionServer();
90 LOG.debug("Starting master: " + master.getMaster().getServerName());
91 master.start();
92 rs.start();
93
94 waitForClusterOnline(master);
95
96
97 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
98 rs2 = cluster.addRegionServer();
99
100
101
102 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
103 rs2.start();
104
105 waitForSecondRsStarted();
106
107
108 master.getMaster().stop("Stopping master");
109
110
111
112 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
113 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
114 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
115 backupMaster = cluster.addMaster();
116 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
117 backupMaster.start();
118
119 waitForClusterOnline(backupMaster);
120
121
122 assertTrue(backupMaster.getMaster().isActiveMaster());
123 assertTrue(backupMaster.getMaster().isInitialized());
124 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2);
125
126 }
127
128 private void waitForClusterOnline(MasterThread master) throws InterruptedException {
129 while (true) {
130 if (master.getMaster().isInitialized()) {
131 break;
132 }
133 Thread.sleep(SLEEP_INTERVAL);
134 LOG.debug("Waiting for master to come online ...");
135 }
136 rs.waitForServerOnline();
137 }
138
139 private void waitForSecondRsStarted() throws InterruptedException {
140 while (true) {
141 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
142 break;
143 }
144 Thread.sleep(SLEEP_INTERVAL);
145 LOG.debug("Waiting 2nd RS to be started ...");
146 }
147 }
148
149
150
151
152
153
154 public static class MyRegionServer extends MiniHBaseClusterRegionServer {
155
156 private ServerName sn;
157
158
159 private boolean rpcStubCreatedFlag = false;
160 private boolean masterChanged = false;
161
162 public MyRegionServer(Configuration conf, CoordinatedStateManager cp)
163 throws IOException, KeeperException,
164 InterruptedException {
165 super(conf, cp);
166 }
167
168 @Override
169 protected synchronized ServerName createRegionServerStatusStub() {
170 sn = super.createRegionServerStatusStub();
171 rpcStubCreatedFlag = true;
172
173
174 while (!masterChanged) {
175 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
176 if (newSn != null && !newSn.equals(sn)) {
177 masterChanged = true;
178 break;
179 }
180 try {
181 Thread.sleep(SLEEP_INTERVAL);
182 } catch (InterruptedException e) {
183 return null;
184 }
185 LOG.debug("Waiting for master switch over ... ");
186 }
187 return sn;
188 }
189
190 public boolean getRpcStubCreatedFlag() {
191 return rpcStubCreatedFlag;
192 }
193 }
194 }