1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import com.sun.jersey.api.client.Client;
22 import com.sun.jersey.api.client.ClientResponse;
23 import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.conf.Configured;
29 import org.apache.hadoop.util.ReflectionUtils;
30 import org.codehaus.jackson.JsonNode;
31 import org.codehaus.jackson.map.ObjectMapper;
32
33 import javax.ws.rs.core.MediaType;
34 import javax.ws.rs.core.Response;
35 import javax.ws.rs.core.UriBuilder;
36 import javax.xml.ws.http.HTTPException;
37 import java.io.IOException;
38 import java.net.URI;
39 import java.util.HashMap;
40 import java.util.Map;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 public class RESTApiClusterManager extends Configured implements ClusterManager {
64
65
66 private static final String REST_API_CLUSTER_MANAGER_HOSTNAME =
67 "hbase.it.clustermanager.restapi.hostname";
68 private static final String REST_API_CLUSTER_MANAGER_USERNAME =
69 "hbase.it.clustermanager.restapi.username";
70 private static final String REST_API_CLUSTER_MANAGER_PASSWORD =
71 "hbase.it.clustermanager.restapi.password";
72 private static final String REST_API_CLUSTER_MANAGER_CLUSTER_NAME =
73 "hbase.it.clustermanager.restapi.clustername";
74
75
76 private static final String DEFAULT_SERVER_HOSTNAME = "http://localhost:7180";
77 private static final String DEFAULT_SERVER_USERNAME = "admin";
78 private static final String DEFAULT_SERVER_PASSWORD = "admin";
79 private static final String DEFAULT_CLUSTER_NAME = "Cluster 1";
80
81
82
83 private String serverHostname;
84 private String serverUsername;
85 private String serverPassword;
86 private String clusterName;
87
88
89
90 private static final String API_VERSION = "v6";
91
92
93 private Client client = Client.create();
94
95
96
97 private ClusterManager hBaseClusterManager;
98
99 private static final Log LOG = LogFactory.getLog(RESTApiClusterManager.class);
100
101 RESTApiClusterManager() {
102 hBaseClusterManager = ReflectionUtils.newInstance(HBaseClusterManager.class,
103 new IntegrationTestingUtility().getConfiguration());
104 }
105
106 @Override
107 public void setConf(Configuration conf) {
108 super.setConf(conf);
109 if (conf == null) {
110
111 return;
112 }
113 serverHostname = conf.get(REST_API_CLUSTER_MANAGER_HOSTNAME, DEFAULT_SERVER_HOSTNAME);
114 serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME);
115 serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD);
116 clusterName = conf.get(REST_API_CLUSTER_MANAGER_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
117
118
119 client.addFilter(new HTTPBasicAuthFilter(serverUsername, serverPassword));
120 }
121
122 @Override
123 public void start(ServiceType service, String hostname, int port) throws IOException {
124 performClusterManagerCommand(service, hostname, RoleCommand.START);
125 }
126
127 @Override
128 public void stop(ServiceType service, String hostname, int port) throws IOException {
129 performClusterManagerCommand(service, hostname, RoleCommand.STOP);
130 }
131
132 @Override
133 public void restart(ServiceType service, String hostname, int port) throws IOException {
134 performClusterManagerCommand(service, hostname, RoleCommand.RESTART);
135 }
136
137 @Override
138 public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
139 String serviceName = getServiceName(roleServiceType.get(service));
140 String hostId = getHostId(hostname);
141 String roleState = getRoleState(serviceName, service.toString(), hostId);
142 String healthSummary = getHealthSummary(serviceName, service.toString(), hostId);
143 boolean isRunning = false;
144
145
146
147 if ("STARTED".equals(roleState) && "GOOD".equals(healthSummary)) {
148 isRunning = true;
149 }
150
151 return isRunning;
152 }
153
154 @Override
155 public void kill(ServiceType service, String hostname, int port) throws IOException {
156 hBaseClusterManager.kill(service, hostname, port);
157 }
158
159 @Override
160 public void suspend(ServiceType service, String hostname, int port) throws IOException {
161 hBaseClusterManager.suspend(service, hostname, port);
162 }
163
164 @Override
165 public void resume(ServiceType service, String hostname, int port) throws IOException {
166 hBaseClusterManager.resume(service, hostname, port);
167 }
168
169
170
171
172 private void performClusterManagerCommand(ServiceType role, String hostname, RoleCommand command)
173 throws IOException {
174 LOG.info("Performing " + command + " command against " + role + " on " + hostname + "...");
175 String serviceName = getServiceName(roleServiceType.get(role));
176 String hostId = getHostId(hostname);
177 String roleName = getRoleName(serviceName, role.toString(), hostId);
178 doRoleCommand(serviceName, roleName, command);
179 }
180
181
182 private void doRoleCommand(String serviceName, String roleName, RoleCommand roleCommand) {
183 URI uri = UriBuilder.fromUri(serverHostname)
184 .path("api")
185 .path(API_VERSION)
186 .path("clusters")
187 .path(clusterName)
188 .path("services")
189 .path(serviceName)
190 .path("roleCommands")
191 .path(roleCommand.toString())
192 .build();
193 String body = "{ \"items\": [ \"" + roleName + "\" ] }";
194 LOG.info("Executing POST against " + uri + " with body " + body + "...");
195 ClientResponse response = client.resource(uri)
196 .type(MediaType.APPLICATION_JSON)
197 .post(ClientResponse.class, body);
198
199 int statusCode = response.getStatus();
200 if (statusCode != Response.Status.OK.getStatusCode()) {
201 throw new HTTPException(statusCode);
202 }
203 }
204
205
206 private String getHealthSummary(String serviceName, String roleType, String hostId)
207 throws IOException {
208 return getRolePropertyValue(serviceName, roleType, hostId, "healthSummary");
209 }
210
211
212 private String getHostId(String hostname) throws IOException {
213 String hostId = null;
214
215 URI uri = UriBuilder.fromUri(serverHostname)
216 .path("api")
217 .path(API_VERSION)
218 .path("hosts")
219 .build();
220 JsonNode hosts = getJsonNodeFromURIGet(uri);
221 if (hosts != null) {
222
223 for (JsonNode host : hosts) {
224 if (host.get("hostname").getTextValue().equals(hostname)) {
225 hostId = host.get("hostId").getTextValue();
226 break;
227 }
228 }
229 } else {
230 hostId = null;
231 }
232
233 return hostId;
234 }
235
236
237 private JsonNode getJsonNodeFromURIGet(URI uri) throws IOException {
238 LOG.info("Executing GET against " + uri + "...");
239 ClientResponse response = client.resource(uri)
240 .accept(MediaType.APPLICATION_JSON_TYPE)
241 .get(ClientResponse.class);
242
243 int statusCode = response.getStatus();
244 if (statusCode != Response.Status.OK.getStatusCode()) {
245 throw new HTTPException(statusCode);
246 }
247
248 return new ObjectMapper().readTree(response.getEntity(String.class)).get("items");
249 }
250
251
252 private String getRoleName(String serviceName, String roleType, String hostId)
253 throws IOException {
254 return getRolePropertyValue(serviceName, roleType, hostId, "name");
255 }
256
257
258 private String getRolePropertyValue(String serviceName, String roleType, String hostId,
259 String property) throws IOException {
260 String roleValue = null;
261 URI uri = UriBuilder.fromUri(serverHostname)
262 .path("api")
263 .path(API_VERSION)
264 .path("clusters")
265 .path(clusterName)
266 .path("services")
267 .path(serviceName)
268 .path("roles")
269 .build();
270 JsonNode roles = getJsonNodeFromURIGet(uri);
271 if (roles != null) {
272
273 for (JsonNode role : roles) {
274 if (role.get("hostRef").get("hostId").getTextValue().equals(hostId) &&
275 role.get("type")
276 .getTextValue()
277 .toLowerCase()
278 .equals(roleType.toLowerCase())) {
279 roleValue = role.get(property).getTextValue();
280 break;
281 }
282 }
283 }
284
285 return roleValue;
286 }
287
288
289 private String getRoleState(String serviceName, String roleType, String hostId)
290 throws IOException {
291 return getRolePropertyValue(serviceName, roleType, hostId, "roleState");
292 }
293
294
295 private String getServiceName(Service service) throws IOException {
296 String serviceName = null;
297 URI uri = UriBuilder.fromUri(serverHostname)
298 .path("api")
299 .path(API_VERSION)
300 .path("clusters")
301 .path(clusterName)
302 .path("services")
303 .build();
304 JsonNode services = getJsonNodeFromURIGet(uri);
305 if (services != null) {
306
307 for (JsonNode serviceEntry : services) {
308 if (serviceEntry.get("type").getTextValue().equals(service.toString())) {
309 serviceName = serviceEntry.get("name").getTextValue();
310 break;
311 }
312 }
313 }
314
315 return serviceName;
316 }
317
318
319
320
321
322
323
324
325 private enum RoleCommand {
326 START, STOP, RESTART;
327
328
329 @Override
330 public String toString() {
331 return name().toLowerCase();
332 }
333 }
334
335
336
337
338 private static Map<ServiceType, Service> roleServiceType = new HashMap<ServiceType, Service>();
339 static {
340 roleServiceType.put(ServiceType.HADOOP_NAMENODE, Service.HDFS);
341 roleServiceType.put(ServiceType.HADOOP_DATANODE, Service.HDFS);
342 roleServiceType.put(ServiceType.HADOOP_JOBTRACKER, Service.MAPREDUCE);
343 roleServiceType.put(ServiceType.HADOOP_TASKTRACKER, Service.MAPREDUCE);
344 roleServiceType.put(ServiceType.HBASE_MASTER, Service.HBASE);
345 roleServiceType.put(ServiceType.HBASE_REGIONSERVER, Service.HBASE);
346 }
347
348 private enum Service {
349 HBASE, HDFS, MAPREDUCE
350 }
351 }