1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.NavigableSet;
26 import java.util.Set;
27 import java.util.TreeSet;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.testclassification.LargeTests;
37 import org.apache.hadoop.hbase.MiniHBaseCluster;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.RegionLocator;
42 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
45 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
46 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
47 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48 import org.apache.zookeeper.KeeperException;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51
52
53
54
55 @Category(LargeTests.class)
56 public class TestRollingRestart {
57 private static final Log LOG = LogFactory.getLog(TestRollingRestart.class);
58
59 @Test (timeout=500000)
60 public void testBasicRollingRestart() throws Exception {
61
62
63 final int NUM_MASTERS = 2;
64 final int NUM_RS = 3;
65 final int NUM_REGIONS_TO_CREATE = 20;
66
67 int expectedNumRS = 3;
68
69
70 log("Starting cluster");
71 Configuration conf = HBaseConfiguration.create();
72 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
73 TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
74 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
75 log("Waiting for active/ready master");
76 cluster.waitForActiveAndReadyMaster();
77 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testRollingRestart",
78 null);
79 HMaster master = cluster.getMaster();
80
81
82 TableName table = TableName.valueOf("tableRestart");
83 byte [] family = Bytes.toBytes("family");
84 log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
85 HTable ht = TEST_UTIL.createMultiRegionTable(table, family, NUM_REGIONS_TO_CREATE);
86 int numRegions = -1;
87 try (RegionLocator r = ht.getRegionLocator()) {
88 numRegions = r.getStartKeys().length;
89 }
90 numRegions += 1;
91 log("Waiting for no more RIT\n");
92 blockUntilNoRIT(zkw, master);
93 log("Disabling table\n");
94 TEST_UTIL.getHBaseAdmin().disableTable(table);
95 log("Waiting for no more RIT\n");
96 blockUntilNoRIT(zkw, master);
97 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
98 log("Verifying only catalog and namespace regions are assigned\n");
99 if (regions.size() != 2) {
100 for (String oregion : regions) log("Region still online: " + oregion);
101 }
102 assertEquals(2, regions.size());
103 log("Enabling table\n");
104 TEST_UTIL.getHBaseAdmin().enableTable(table);
105 log("Waiting for no more RIT\n");
106 blockUntilNoRIT(zkw, master);
107 log("Verifying there are " + numRegions + " assigned on cluster\n");
108 regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
109 assertRegionsAssigned(cluster, regions);
110 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
111
112
113 log("Adding a fourth RS");
114 RegionServerThread restarted = cluster.startRegionServer();
115 expectedNumRS++;
116 restarted.waitForServerOnline();
117 log("Additional RS is online");
118 log("Waiting for no more RIT");
119 blockUntilNoRIT(zkw, master);
120 log("Verifying there are " + numRegions + " assigned on cluster");
121 assertRegionsAssigned(cluster, regions);
122 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
123
124
125 List<MasterThread> masterThreads = cluster.getMasterThreads();
126 MasterThread activeMaster = null;
127 MasterThread backupMaster = null;
128 assertEquals(2, masterThreads.size());
129 if (masterThreads.get(0).getMaster().isActiveMaster()) {
130 activeMaster = masterThreads.get(0);
131 backupMaster = masterThreads.get(1);
132 } else {
133 activeMaster = masterThreads.get(1);
134 backupMaster = masterThreads.get(0);
135 }
136
137
138 log("Stopping backup master\n\n");
139 backupMaster.getMaster().stop("Stop of backup during rolling restart");
140 cluster.hbaseCluster.waitOnMaster(backupMaster);
141
142
143 log("Stopping primary master\n\n");
144 activeMaster.getMaster().stop("Stop of active during rolling restart");
145 cluster.hbaseCluster.waitOnMaster(activeMaster);
146
147
148 log("Restarting primary master\n\n");
149 activeMaster = cluster.startMaster();
150 cluster.waitForActiveAndReadyMaster();
151 master = activeMaster.getMaster();
152
153
154 log("Restarting backup master\n\n");
155 backupMaster = cluster.startMaster();
156
157 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
158
159
160
161
162 List<RegionServerThread> regionServers =
163 cluster.getLiveRegionServerThreads();
164 int num = 1;
165 int total = regionServers.size();
166 for (RegionServerThread rst : regionServers) {
167 ServerName serverName = rst.getRegionServer().getServerName();
168 log("Stopping region server " + num + " of " + total + " [ " +
169 serverName + "]");
170 rst.getRegionServer().stop("Stopping RS during rolling restart");
171 cluster.hbaseCluster.waitOnRegionServer(rst);
172 log("Waiting for RS shutdown to be handled by master");
173 waitForRSShutdownToStartAndFinish(activeMaster, serverName);
174 log("RS shutdown done, waiting for no more RIT");
175 blockUntilNoRIT(zkw, master);
176 log("Verifying there are " + numRegions + " assigned on cluster");
177 assertRegionsAssigned(cluster, regions);
178 expectedNumRS--;
179 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
180 log("Restarting region server " + num + " of " + total);
181 restarted = cluster.startRegionServer();
182 restarted.waitForServerOnline();
183 expectedNumRS++;
184 log("Region server " + num + " is back online");
185 log("Waiting for no more RIT");
186 blockUntilNoRIT(zkw, master);
187 log("Verifying there are " + numRegions + " assigned on cluster");
188 assertRegionsAssigned(cluster, regions);
189 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
190 num++;
191 }
192 Thread.sleep(1000);
193 assertRegionsAssigned(cluster, regions);
194
195
196
197 ht.close();
198
199 TEST_UTIL.shutdownMiniCluster();
200 }
201
202 private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
203 throws KeeperException, InterruptedException {
204 ZKAssign.blockUntilNoRIT(zkw);
205 master.assignmentManager.waitUntilNoRegionsInTransition(60000);
206 }
207
208 private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
209 ServerName serverName) throws InterruptedException {
210 ServerManager sm = activeMaster.getMaster().getServerManager();
211
212 while (!sm.getDeadServers().isDeadServer(serverName)) {
213 log("Waiting for [" + serverName + "] to be listed as dead in master");
214 Thread.sleep(1);
215 }
216 log("Server [" + serverName + "] marked as dead, waiting for it to " +
217 "finish dead processing");
218 while (sm.areDeadServersInProgress()) {
219 log("Server [" + serverName + "] still being processed, waiting");
220 Thread.sleep(100);
221 }
222 log("Server [" + serverName + "] done with server shutdown processing");
223 }
224
225 private void log(String msg) {
226 LOG.debug("\n\nTRR: " + msg + "\n");
227 }
228
229 private int getNumberOfOnlineRegions(MiniHBaseCluster cluster) {
230 int numFound = 0;
231 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
232 numFound += rst.getRegionServer().getNumberOfOnlineRegions();
233 }
234 for (MasterThread mt : cluster.getMasterThreads()) {
235 numFound += mt.getMaster().getNumberOfOnlineRegions();
236 }
237 return numFound;
238 }
239
240 private void assertRegionsAssigned(MiniHBaseCluster cluster,
241 Set<String> expectedRegions) throws IOException {
242 int numFound = getNumberOfOnlineRegions(cluster);
243 if (expectedRegions.size() > numFound) {
244 log("Expected to find " + expectedRegions.size() + " but only found"
245 + " " + numFound);
246 NavigableSet<String> foundRegions =
247 HBaseTestingUtility.getAllOnlineRegions(cluster);
248 for (String region : expectedRegions) {
249 if (!foundRegions.contains(region)) {
250 log("Missing region: " + region);
251 }
252 }
253 assertEquals(expectedRegions.size(), numFound);
254 } else if (expectedRegions.size() < numFound) {
255 int doubled = numFound - expectedRegions.size();
256 log("Expected to find " + expectedRegions.size() + " but found"
257 + " " + numFound + " (" + doubled + " double assignments?)");
258 NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster);
259 for (String region : doubleRegions) {
260 log("Region is double assigned: " + region);
261 }
262 assertEquals(expectedRegions.size(), numFound);
263 } else {
264 log("Success! Found expected number of " + numFound + " regions");
265 }
266 }
267
268 private NavigableSet<String> getDoubleAssignedRegions(
269 MiniHBaseCluster cluster) throws IOException {
270 NavigableSet<String> online = new TreeSet<String>();
271 NavigableSet<String> doubled = new TreeSet<String>();
272 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
273 for (HRegionInfo region : ProtobufUtil.getOnlineRegions(
274 rst.getRegionServer().getRSRpcServices())) {
275 if(!online.add(region.getRegionNameAsString())) {
276 doubled.add(region.getRegionNameAsString());
277 }
278 }
279 }
280 return doubled;
281 }
282
283
284 }
285