1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.PrintWriter;
23 import java.io.StringWriter;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.RejectedExecutionHandler;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.RemoteExceptionHandler;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.conf.ConfigurationManager;
41 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
42 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
43 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
44 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
45 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
46 import org.apache.hadoop.hbase.security.User;
47 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48 import org.apache.hadoop.hbase.util.Pair;
49 import org.apache.hadoop.hbase.util.StealJobQueue;
50 import org.apache.hadoop.util.StringUtils;
51
52 import com.google.common.annotations.VisibleForTesting;
53 import com.google.common.base.Preconditions;
54
55
56
57
58 @InterfaceAudience.Private
59 public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver {
60 private static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
61
62
63 public final static String LARGE_COMPACTION_THREADS =
64 "hbase.regionserver.thread.compaction.large";
65 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
66
67
68 public final static String SMALL_COMPACTION_THREADS =
69 "hbase.regionserver.thread.compaction.small";
70 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
71
72
73 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
74 public final static int SPLIT_THREADS_DEFAULT = 1;
75
76
77 public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
78 public final static int MERGE_THREADS_DEFAULT = 1;
79
80 public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
81 "hbase.regionserver.regionSplitLimit";
82 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
83
84 private final HRegionServer server;
85 private final Configuration conf;
86
87 private final ThreadPoolExecutor longCompactions;
88 private final ThreadPoolExecutor shortCompactions;
89 private final ThreadPoolExecutor splits;
90 private final ThreadPoolExecutor mergePool;
91
92 private volatile CompactionThroughputController compactionThroughputController;
93
94
95
96
97
98
99 private int regionSplitLimit;
100
101
102 CompactSplitThread(HRegionServer server) {
103 super();
104 this.server = server;
105 this.conf = server.getConfiguration();
106 this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
107 DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
108
109 int largeThreads = Math.max(1, conf.getInt(
110 LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
111 int smallThreads = conf.getInt(
112 SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
113
114 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
115
116
117 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
118
119 final String n = Thread.currentThread().getName();
120
121 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
122 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
123 60, TimeUnit.SECONDS, stealJobQueue,
124 new ThreadFactory() {
125 @Override
126 public Thread newThread(Runnable r) {
127 Thread t = new Thread(r);
128 t.setName(n + "-longCompactions-" + System.currentTimeMillis());
129 return t;
130 }
131 });
132 this.longCompactions.setRejectedExecutionHandler(new Rejection());
133 this.longCompactions.prestartAllCoreThreads();
134 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
135 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
136 new ThreadFactory() {
137 @Override
138 public Thread newThread(Runnable r) {
139 Thread t = new Thread(r);
140 t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
141 return t;
142 }
143 });
144 this.shortCompactions
145 .setRejectedExecutionHandler(new Rejection());
146 this.splits = (ThreadPoolExecutor)
147 Executors.newFixedThreadPool(splitThreads,
148 new ThreadFactory() {
149 @Override
150 public Thread newThread(Runnable r) {
151 Thread t = new Thread(r);
152 t.setName(n + "-splits-" + System.currentTimeMillis());
153 return t;
154 }
155 });
156 int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
157 this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
158 mergeThreads, new ThreadFactory() {
159 @Override
160 public Thread newThread(Runnable r) {
161 Thread t = new Thread(r);
162 t.setName(n + "-merges-" + System.currentTimeMillis());
163 return t;
164 }
165 });
166
167
168 this.compactionThroughputController =
169 CompactionThroughputControllerFactory.create(server, conf);
170 }
171
172 @Override
173 public String toString() {
174 return "compaction_queue=("
175 + longCompactions.getQueue().size() + ":"
176 + shortCompactions.getQueue().size() + ")"
177 + ", split_queue=" + splits.getQueue().size()
178 + ", merge_queue=" + mergePool.getQueue().size();
179 }
180
181 public String dumpQueue() {
182 StringBuffer queueLists = new StringBuffer();
183 queueLists.append("Compaction/Split Queue dump:\n");
184 queueLists.append(" LargeCompation Queue:\n");
185 BlockingQueue<Runnable> lq = longCompactions.getQueue();
186 Iterator<Runnable> it = lq.iterator();
187 while (it.hasNext()) {
188 queueLists.append(" " + it.next().toString());
189 queueLists.append("\n");
190 }
191
192 if (shortCompactions != null) {
193 queueLists.append("\n");
194 queueLists.append(" SmallCompation Queue:\n");
195 lq = shortCompactions.getQueue();
196 it = lq.iterator();
197 while (it.hasNext()) {
198 queueLists.append(" " + it.next().toString());
199 queueLists.append("\n");
200 }
201 }
202
203 queueLists.append("\n");
204 queueLists.append(" Split Queue:\n");
205 lq = splits.getQueue();
206 it = lq.iterator();
207 while (it.hasNext()) {
208 queueLists.append(" " + it.next().toString());
209 queueLists.append("\n");
210 }
211
212 queueLists.append("\n");
213 queueLists.append(" Region Merge Queue:\n");
214 lq = mergePool.getQueue();
215 it = lq.iterator();
216 while (it.hasNext()) {
217 queueLists.append(" " + it.next().toString());
218 queueLists.append("\n");
219 }
220
221 return queueLists.toString();
222 }
223
224 public synchronized void requestRegionsMerge(final Region a,
225 final Region b, final boolean forcible, long masterSystemTime, User user) {
226 try {
227 mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
228 if (LOG.isDebugEnabled()) {
229 LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
230 + forcible + ". " + this);
231 }
232 } catch (RejectedExecutionException ree) {
233 LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
234 + forcible, ree);
235 }
236 }
237
238 public synchronized boolean requestSplit(final Region r) {
239
240 if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
241 byte[] midKey = ((HRegion)r).checkSplit();
242 if (midKey != null) {
243 requestSplit(r, midKey);
244 return true;
245 }
246 }
247 return false;
248 }
249
250 public synchronized void requestSplit(final Region r, byte[] midKey) {
251 requestSplit(r, midKey, null);
252 }
253
254
255
256
257 public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
258 if (midKey == null) {
259 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
260 " not splittable because midkey=null");
261 if (((HRegion)r).shouldForceSplit()) {
262 ((HRegion)r).clearSplit();
263 }
264 return;
265 }
266 try {
267 this.splits.execute(new SplitRequest(r, midKey, this.server, user));
268 if (LOG.isDebugEnabled()) {
269 LOG.debug("Split requested for " + r + ". " + this);
270 }
271 } catch (RejectedExecutionException ree) {
272 LOG.info("Could not execute split for " + r, ree);
273 }
274 }
275
276 @Override
277 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
278 throws IOException {
279 return requestCompaction(r, why, null);
280 }
281
282 @Override
283 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
284 List<Pair<CompactionRequest, Store>> requests) throws IOException {
285 return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
286 }
287
288 @Override
289 public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
290 final String why, CompactionRequest request) throws IOException {
291 return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
292 }
293
294 @Override
295 public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
296 int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
297 return requestCompactionInternal(r, why, p, requests, true, user);
298 }
299
300 private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
301 int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
302 throws IOException {
303
304 List<CompactionRequest> ret = null;
305 if (requests == null) {
306 ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
307 for (Store s : r.getStores()) {
308 CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
309 if (selectNow) ret.add(cr);
310 }
311 } else {
312 Preconditions.checkArgument(selectNow);
313 ret = new ArrayList<CompactionRequest>(requests.size());
314 for (Pair<CompactionRequest, Store> pair : requests) {
315 ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
316 }
317 }
318 return ret;
319 }
320
321 public CompactionRequest requestCompaction(final Region r, final Store s,
322 final String why, int priority, CompactionRequest request, User user) throws IOException {
323 return requestCompactionInternal(r, s, why, priority, request, true, user);
324 }
325
326 public synchronized void requestSystemCompaction(
327 final Region r, final String why) throws IOException {
328 requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
329 }
330
331 public void requestSystemCompaction(
332 final Region r, final Store s, final String why) throws IOException {
333 requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
334 }
335
336
337
338
339
340
341
342
343
344 private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
345 final String why, int priority, CompactionRequest request, boolean selectNow, User user)
346 throws IOException {
347 if (this.server.isStopped()
348 || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
349 return null;
350 }
351
352 CompactionContext compaction = null;
353 if (selectNow) {
354 compaction = selectCompaction(r, s, priority, request, user);
355 if (compaction == null) return null;
356 }
357
358
359
360 ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
361 ? longCompactions : shortCompactions;
362 pool.execute(new CompactionRunner(s, r, compaction, pool, user));
363 if (LOG.isDebugEnabled()) {
364 String type = (pool == shortCompactions) ? "Small " : "Large ";
365 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
366 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
367 }
368 return selectNow ? compaction.getRequest() : null;
369 }
370
371 private CompactionContext selectCompaction(final Region r, final Store s,
372 int priority, CompactionRequest request, User user) throws IOException {
373 CompactionContext compaction = s.requestCompaction(priority, request, user);
374 if (compaction == null) {
375 if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
376 LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
377 " because compaction request was cancelled");
378 }
379 return null;
380 }
381 assert compaction.hasSelection();
382 if (priority != Store.NO_PRIORITY) {
383 compaction.getRequest().setPriority(priority);
384 }
385 return compaction;
386 }
387
388
389
390
391 void interruptIfNecessary() {
392 splits.shutdown();
393 mergePool.shutdown();
394 longCompactions.shutdown();
395 shortCompactions.shutdown();
396 }
397
398 private void waitFor(ThreadPoolExecutor t, String name) {
399 boolean done = false;
400 while (!done) {
401 try {
402 done = t.awaitTermination(60, TimeUnit.SECONDS);
403 LOG.info("Waiting for " + name + " to finish...");
404 if (!done) {
405 t.shutdownNow();
406 }
407 } catch (InterruptedException ie) {
408 LOG.warn("Interrupted waiting for " + name + " to finish...");
409 }
410 }
411 }
412
413 void join() {
414 waitFor(splits, "Split Thread");
415 waitFor(mergePool, "Merge Thread");
416 waitFor(longCompactions, "Large Compaction Thread");
417 waitFor(shortCompactions, "Small Compaction Thread");
418 }
419
420
421
422
423
424
425
426 public int getCompactionQueueSize() {
427 return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
428 }
429
430 public int getLargeCompactionQueueSize() {
431 return longCompactions.getQueue().size();
432 }
433
434
435 public int getSmallCompactionQueueSize() {
436 return shortCompactions.getQueue().size();
437 }
438
439 public int getSplitQueueSize() {
440 return splits.getQueue().size();
441 }
442
443 private boolean shouldSplitRegion() {
444 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
445 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
446 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
447 }
448 return (regionSplitLimit > server.getNumberOfOnlineRegions());
449 }
450
451
452
453
454 public int getRegionSplitLimit() {
455 return this.regionSplitLimit;
456 }
457
458 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
459 justification="Contrived use of compareTo")
460 private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
461 private final Store store;
462 private final HRegion region;
463 private CompactionContext compaction;
464 private int queuedPriority;
465 private ThreadPoolExecutor parent;
466 private User user;
467
468 public CompactionRunner(Store store, Region region,
469 CompactionContext compaction, ThreadPoolExecutor parent, User user) {
470 super();
471 this.store = store;
472 this.region = (HRegion)region;
473 this.compaction = compaction;
474 this.queuedPriority = (this.compaction == null)
475 ? store.getCompactPriority() : compaction.getRequest().getPriority();
476 this.parent = parent;
477 this.user = user;
478 }
479
480 @Override
481 public String toString() {
482 return (this.compaction != null) ? ("Request = " + compaction.getRequest())
483 : ("Store = " + store.toString() + ", pri = " + queuedPriority);
484 }
485
486 private void doCompaction(User user) {
487
488 if (this.compaction == null) {
489 int oldPriority = this.queuedPriority;
490 this.queuedPriority = this.store.getCompactPriority();
491 if (this.queuedPriority > oldPriority) {
492
493
494 this.parent.execute(this);
495 return;
496 }
497 try {
498 this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
499 } catch (IOException ex) {
500 LOG.error("Compaction selection failed " + this, ex);
501 server.checkFileSystem();
502 return;
503 }
504 if (this.compaction == null) return;
505
506
507 assert this.compaction.hasSelection();
508 ThreadPoolExecutor pool = store.throttleCompaction(
509 compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
510
511
512
513 if (this.parent == shortCompactions && pool == longCompactions) {
514 this.store.cancelRequestedCompaction(this.compaction);
515 this.compaction = null;
516 this.parent = pool;
517 this.parent.execute(this);
518 return;
519 }
520 }
521
522 assert this.compaction != null;
523
524 this.compaction.getRequest().beforeExecute();
525 try {
526
527
528 long start = EnvironmentEdgeManager.currentTime();
529 boolean completed =
530 region.compact(compaction, store, compactionThroughputController, user);
531 long now = EnvironmentEdgeManager.currentTime();
532 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
533 this + "; duration=" + StringUtils.formatTimeDiff(now, start));
534 if (completed) {
535
536 if (store.getCompactPriority() <= 0) {
537 requestSystemCompaction(region, store, "Recursive enqueue");
538 } else {
539
540 requestSplit(region);
541 }
542 }
543 } catch (IOException ex) {
544 IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
545 LOG.error("Compaction failed " + this, remoteEx);
546 if (remoteEx != ex) {
547 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
548 }
549 server.checkFileSystem();
550 } catch (Exception ex) {
551 LOG.error("Compaction failed " + this, ex);
552 server.checkFileSystem();
553 } finally {
554 LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
555 }
556 this.compaction.getRequest().afterExecute();
557 }
558
559 @Override
560 public void run() {
561 Preconditions.checkNotNull(server);
562 if (server.isStopped()
563 || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
564 return;
565 }
566 doCompaction(user);
567 }
568
569 private String formatStackTrace(Exception ex) {
570 StringWriter sw = new StringWriter();
571 PrintWriter pw = new PrintWriter(sw);
572 ex.printStackTrace(pw);
573 pw.flush();
574 return sw.toString();
575 }
576
577 @Override
578 public int compareTo(CompactionRunner o) {
579
580 int compareVal = queuedPriority - o.queuedPriority;
581 if (compareVal != 0) return compareVal;
582 CompactionContext tc = this.compaction, oc = o.compaction;
583
584 return (tc == null) ? ((oc == null) ? 0 : 1)
585 : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
586 }
587 }
588
589
590
591
592 private static class Rejection implements RejectedExecutionHandler {
593 @Override
594 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
595 if (runnable instanceof CompactionRunner) {
596 CompactionRunner runner = (CompactionRunner)runnable;
597 LOG.debug("Compaction Rejected: " + runner);
598 runner.store.cancelRequestedCompaction(runner.compaction);
599 }
600 }
601 }
602
603
604
605
606 @Override
607 public void onConfigurationChange(Configuration newConf) {
608
609
610
611
612
613 int largeThreads = Math.max(1, newConf.getInt(
614 LARGE_COMPACTION_THREADS,
615 LARGE_COMPACTION_THREADS_DEFAULT));
616 if (this.longCompactions.getCorePoolSize() != largeThreads) {
617 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
618 " from " + this.longCompactions.getCorePoolSize() + " to " +
619 largeThreads);
620 if(this.longCompactions.getCorePoolSize() < largeThreads) {
621 this.longCompactions.setMaximumPoolSize(largeThreads);
622 this.longCompactions.setCorePoolSize(largeThreads);
623 } else {
624 this.longCompactions.setCorePoolSize(largeThreads);
625 this.longCompactions.setMaximumPoolSize(largeThreads);
626 }
627 }
628
629 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
630 SMALL_COMPACTION_THREADS_DEFAULT);
631 if (this.shortCompactions.getCorePoolSize() != smallThreads) {
632 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
633 " from " + this.shortCompactions.getCorePoolSize() + " to " +
634 smallThreads);
635 if(this.shortCompactions.getCorePoolSize() < smallThreads) {
636 this.shortCompactions.setMaximumPoolSize(smallThreads);
637 this.shortCompactions.setCorePoolSize(smallThreads);
638 } else {
639 this.shortCompactions.setCorePoolSize(smallThreads);
640 this.shortCompactions.setMaximumPoolSize(smallThreads);
641 }
642 }
643
644 int splitThreads = newConf.getInt(SPLIT_THREADS,
645 SPLIT_THREADS_DEFAULT);
646 if (this.splits.getCorePoolSize() != splitThreads) {
647 LOG.info("Changing the value of " + SPLIT_THREADS +
648 " from " + this.splits.getCorePoolSize() + " to " +
649 splitThreads);
650 if(this.splits.getCorePoolSize() < splitThreads) {
651 this.splits.setMaximumPoolSize(splitThreads);
652 this.splits.setCorePoolSize(splitThreads);
653 } else {
654 this.splits.setCorePoolSize(splitThreads);
655 this.splits.setMaximumPoolSize(splitThreads);
656 }
657 }
658
659 int mergeThreads = newConf.getInt(MERGE_THREADS,
660 MERGE_THREADS_DEFAULT);
661 if (this.mergePool.getCorePoolSize() != mergeThreads) {
662 LOG.info("Changing the value of " + MERGE_THREADS +
663 " from " + this.mergePool.getCorePoolSize() + " to " +
664 mergeThreads);
665 if(this.mergePool.getCorePoolSize() < mergeThreads) {
666 this.mergePool.setMaximumPoolSize(mergeThreads);
667 this.mergePool.setCorePoolSize(mergeThreads);
668 } else {
669 this.mergePool.setCorePoolSize(mergeThreads);
670 this.mergePool.setMaximumPoolSize(mergeThreads);
671 }
672 }
673
674 CompactionThroughputController old = this.compactionThroughputController;
675 if (old != null) {
676 old.stop("configuration change");
677 }
678 this.compactionThroughputController =
679 CompactionThroughputControllerFactory.create(server, newConf);
680
681
682
683 this.conf.reloadConfiguration();
684 }
685
686 protected int getSmallCompactionThreadNum() {
687 return this.shortCompactions.getCorePoolSize();
688 }
689
690 protected int getLargeCompactionThreadNum() {
691 return this.longCompactions.getCorePoolSize();
692 }
693
694 protected int getSplitThreadNum() {
695 return this.splits.getCorePoolSize();
696 }
697
698 protected int getMergeThreadNum() {
699 return this.mergePool.getCorePoolSize();
700 }
701
702
703
704
705 @Override
706 public void registerChildren(ConfigurationManager manager) {
707
708 }
709
710
711
712
713 @Override
714 public void deregisterChildren(ConfigurationManager manager) {
715
716 }
717
718 @VisibleForTesting
719 public CompactionThroughputController getCompactionThroughputController() {
720 return compactionThroughputController;
721 }
722
723 @VisibleForTesting
724
725
726
727
728
729 void shutdownLongCompactions(){
730 this.longCompactions.shutdown();
731 }
732 }