View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.ExecutorCompletionService;
27  import java.util.concurrent.Future;
28  import java.util.concurrent.LinkedBlockingQueue;
29  import java.util.concurrent.ThreadPoolExecutor;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Abortable;
36  import org.apache.hadoop.hbase.DaemonThreadFactory;
37  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
38  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39  import org.apache.hadoop.hbase.errorhandling.ForeignException;
40  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
41  import org.apache.zookeeper.KeeperException;
42  
43  public class SimpleRSProcedureManager extends RegionServerProcedureManager {
44  
45    private static final Log LOG = LogFactory.getLog(SimpleRSProcedureManager.class);
46  
47    private RegionServerServices rss;
48    private ProcedureMemberRpcs memberRpcs;
49    private ProcedureMember member;
50  
51    @Override
52    public void initialize(RegionServerServices rss) throws KeeperException {
53      this.rss = rss;
54      ZooKeeperWatcher zkw = rss.getZooKeeper();
55      this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
56  
57      ThreadPoolExecutor pool =
58          ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
59      this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
60      LOG.info("Initialized: " + rss.getServerName().toString());
61    }
62  
63    @Override
64    public void start() {
65      this.memberRpcs.start(rss.getServerName().toString(), member);
66      LOG.info("Started.");
67    }
68  
69    @Override
70    public void stop(boolean force) throws IOException {
71      LOG.info("stop: " + force);
72      try {
73        this.member.close();
74      } finally {
75        this.memberRpcs.close();
76      }
77    }
78  
79    @Override
80    public String getProcedureSignature() {
81      return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
82    }
83  
84    /**
85     * If in a running state, creates the specified subprocedure for handling a procedure.
86     * @return Subprocedure to submit to the ProcedureMemeber.
87     */
88    public Subprocedure buildSubprocedure(String name) {
89  
90      // don't run a procedure if the parent is stop(ping)
91      if (rss.isStopping() || rss.isStopped()) {
92        throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
93            + ", because stopping/stopped!");
94      }
95  
96      LOG.info("Attempting to run a procedure.");
97      ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
98      Configuration conf = rss.getConfiguration();
99  
100     SimpleSubprocedurePool taskManager =
101         new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
102     return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
103   }
104 
105   /**
106    * Build the actual procedure runner that will do all the 'hard' work
107    */
108   public class SimleSubprocedureBuilder implements SubprocedureFactory {
109 
110     @Override
111     public Subprocedure buildSubprocedure(String name, byte[] data) {
112       LOG.info("Building procedure: " + name);
113       return SimpleRSProcedureManager.this.buildSubprocedure(name);
114     }
115   }
116 
117   public class SimpleSubprocedurePool implements Closeable, Abortable {
118 
119     private final ExecutorCompletionService<Void> taskPool;
120     private final ThreadPoolExecutor executor;
121     private volatile boolean aborted;
122     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
123     private final String name;
124 
125     public SimpleSubprocedurePool(String name, Configuration conf) {
126       this.name = name;
127       executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
128           new LinkedBlockingQueue<Runnable>(),
129           new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
130       taskPool = new ExecutorCompletionService<Void>(executor);
131     }
132 
133     /**
134      * Submit a task to the pool.
135      */
136     public void submitTask(final Callable<Void> task) {
137       Future<Void> f = this.taskPool.submit(task);
138       futures.add(f);
139     }
140 
141     /**
142      * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
143      *
144      * @return <tt>true</tt> on success, <tt>false</tt> otherwise
145      * @throws ForeignException
146      */
147     public boolean waitForOutstandingTasks() throws ForeignException {
148       LOG.debug("Waiting for procedure to finish.");
149 
150       try {
151         for (Future<Void> f: futures) {
152           f.get();
153         }
154         return true;
155       } catch (InterruptedException e) {
156         if (aborted) throw new ForeignException(
157             "Interrupted and found to be aborted while waiting for tasks!", e);
158         Thread.currentThread().interrupt();
159       } catch (ExecutionException e) {
160         if (e.getCause() instanceof ForeignException) {
161           throw (ForeignException) e.getCause();
162         }
163         throw new ForeignException(name, e.getCause());
164       } finally {
165         // close off remaining tasks
166         for (Future<Void> f: futures) {
167           if (!f.isDone()) {
168             f.cancel(true);
169           }
170         }
171       }
172       return false;
173     }
174 
175     /**
176      * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
177      * finish
178      */
179     @Override
180     public void close() {
181       executor.shutdown();
182     }
183 
184     @Override
185     public void abort(String why, Throwable e) {
186       if (this.aborted) return;
187 
188       this.aborted = true;
189       LOG.warn("Aborting because: " + why, e);
190       this.executor.shutdownNow();
191     }
192 
193     @Override
194     public boolean isAborted() {
195       return this.aborted;
196     }
197   }
198 
199   public class SimpleSubprocedure extends Subprocedure {
200     private final RegionServerServices rss;
201     private final SimpleSubprocedurePool taskManager;
202 
203     public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
204         ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
205       super(member, name, errorListener, 500, 60000);
206       LOG.info("Constructing a SimpleSubprocedure.");
207       this.rss = rss;
208       this.taskManager = taskManager;
209     }
210 
211     /**
212      * Callable task.
213      * TODO. We don't need a thread pool to execute roll log. This can be simplified
214      * with no use of subprocedurepool.
215      */
216     class RSSimpleTask implements Callable<Void> {
217       RSSimpleTask() {}
218 
219       @Override
220       public Void call() throws Exception {
221         LOG.info("Execute subprocedure on " + rss.getServerName().toString());
222         return null;
223       }
224 
225     }
226 
227     private void execute() throws ForeignException {
228 
229       monitor.rethrowException();
230 
231       // running a task (e.g., roll log, flush table) on region server
232       taskManager.submitTask(new RSSimpleTask());
233       monitor.rethrowException();
234 
235       // wait for everything to complete.
236       taskManager.waitForOutstandingTasks();
237       monitor.rethrowException();
238 
239     }
240 
241     @Override
242     public void acquireBarrier() throws ForeignException {
243       // do nothing, executing in inside barrier step.
244     }
245 
246     /**
247      * do a log roll.
248      */
249     @Override
250     public byte[] insideBarrier() throws ForeignException {
251       execute();
252       return SimpleMasterProcedureManager.SIMPLE_DATA.getBytes();
253     }
254 
255     /**
256      * Cancel threads if they haven't finished.
257      */
258     @Override
259     public void cleanup(Exception e) {
260       taskManager.abort("Aborting simple subprocedure tasks due to error", e);
261     }
262   }
263 
264 }