1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.util.ArrayDeque;
21 import java.util.Arrays;
22 import java.util.Collection;
23 import java.util.Deque;
24 import java.util.HashMap;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.Random;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.ClusterStatus;
36 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
37 import org.apache.hadoop.hbase.HRegionInfo;
38 import org.apache.hadoop.hbase.RegionLoad;
39 import org.apache.hadoop.hbase.ServerLoad;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.master.MasterServices;
42 import org.apache.hadoop.hbase.master.RegionPlan;
43 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
44 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
45 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
46 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
47 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
95 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
96 justification="Complaint is about costFunctions not being synchronized; not end of the world")
97 public class StochasticLoadBalancer extends BaseLoadBalancer {
98
99 protected static final String STEPS_PER_REGION_KEY =
100 "hbase.master.balancer.stochastic.stepsPerRegion";
101 protected static final String MAX_STEPS_KEY =
102 "hbase.master.balancer.stochastic.maxSteps";
103 protected static final String MAX_RUNNING_TIME_KEY =
104 "hbase.master.balancer.stochastic.maxRunningTime";
105 protected static final String KEEP_REGION_LOADS =
106 "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
107
108 private static final Random RANDOM = new Random(System.currentTimeMillis());
109 private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
110
111 Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
112
113
114 private int maxSteps = 1000000;
115 private int stepsPerRegion = 800;
116 private long maxRunningTime = 30 * 1000 * 1;
117 private int numRegionLoadsToRemember = 15;
118
119 private CandidateGenerator[] candidateGenerators;
120 private CostFromRegionLoadFunction[] regionLoadFunctions;
121
122 private CostFunction[] costFunctions;
123
124
125
126 private LocalityBasedCandidateGenerator localityCandidateGenerator;
127 private LocalityCostFunction localityCost;
128 private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
129 private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
130
131 @Override
132 public void onConfigurationChange(Configuration conf) {
133 setConf(conf);
134 }
135
136 @Override
137 public synchronized void setConf(Configuration conf) {
138 super.setConf(conf);
139 LOG.info("loading config");
140
141 maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
142
143 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
144 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
145
146 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
147
148 if (localityCandidateGenerator == null) {
149 localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
150 }
151 localityCost = new LocalityCostFunction(conf, services);
152
153 if (candidateGenerators == null) {
154 candidateGenerators = new CandidateGenerator[] {
155 new RandomCandidateGenerator(),
156 new LoadCandidateGenerator(),
157 localityCandidateGenerator,
158 new RegionReplicaRackCandidateGenerator(),
159 };
160 }
161
162 regionLoadFunctions = new CostFromRegionLoadFunction[] {
163 new ReadRequestCostFunction(conf),
164 new WriteRequestCostFunction(conf),
165 new MemstoreSizeCostFunction(conf),
166 new StoreFileCostFunction(conf)
167 };
168
169 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
170 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
171
172 costFunctions = new CostFunction[]{
173 new RegionCountSkewCostFunction(conf),
174 new PrimaryRegionCountSkewCostFunction(conf),
175 new MoveCostFunction(conf),
176 localityCost,
177 new TableSkewCostFunction(conf),
178 regionReplicaHostCostFunction,
179 regionReplicaRackCostFunction,
180 regionLoadFunctions[0],
181 regionLoadFunctions[1],
182 regionLoadFunctions[2],
183 regionLoadFunctions[3],
184 };
185 }
186
187 @Override
188 protected void setSlop(Configuration conf) {
189 this.slop = conf.getFloat("hbase.regions.slop", 0.001F);
190 }
191
192 @Override
193 public synchronized void setClusterStatus(ClusterStatus st) {
194 super.setClusterStatus(st);
195 updateRegionLoad();
196 for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
197 cost.setClusterStatus(st);
198 }
199 }
200
201 @Override
202 public synchronized void setMasterServices(MasterServices masterServices) {
203 super.setMasterServices(masterServices);
204 this.localityCost.setServices(masterServices);
205 this.localityCandidateGenerator.setServices(masterServices);
206
207 }
208
209 @Override
210 protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
211 regionReplicaHostCostFunction.init(c);
212 if (regionReplicaHostCostFunction.cost() > 0) return true;
213 regionReplicaRackCostFunction.init(c);
214 if (regionReplicaRackCostFunction.cost() > 0) return true;
215 return false;
216 }
217
218
219
220
221
222 @Override
223 public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
224 List<HRegionInfo>> clusterState) {
225 List<RegionPlan> plans = balanceMasterRegions(clusterState);
226 if (plans != null || clusterState == null || clusterState.size() <= 1) {
227 return plans;
228 }
229 if (masterServerName != null && clusterState.containsKey(masterServerName)) {
230 if (clusterState.size() <= 2) {
231 return null;
232 }
233 clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState);
234 clusterState.remove(masterServerName);
235 }
236
237
238
239
240
241 RegionLocationFinder finder = null;
242 if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
243 finder = this.regionFinder;
244 }
245
246
247
248
249 Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
250 if (!needsBalance(cluster)) {
251 return null;
252 }
253
254 long startTime = EnvironmentEdgeManager.currentTime();
255
256 initCosts(cluster);
257
258 double currentCost = computeCost(cluster, Double.MAX_VALUE);
259
260 double initCost = currentCost;
261 double newCost = currentCost;
262
263 long computedMaxSteps = Math.min(this.maxSteps,
264 ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
265
266 long step;
267
268 for (step = 0; step < computedMaxSteps; step++) {
269 int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
270 CandidateGenerator p = candidateGenerators[generatorIdx];
271 Cluster.Action action = p.generate(cluster);
272
273 if (action.type == Type.NULL) {
274 continue;
275 }
276
277 cluster.doAction(action);
278 updateCostsWithAction(cluster, action);
279
280 newCost = computeCost(cluster, currentCost);
281
282
283 if (newCost < currentCost) {
284 currentCost = newCost;
285 } else {
286
287
288 Action undoAction = action.undoAction();
289 cluster.doAction(undoAction);
290 updateCostsWithAction(cluster, undoAction);
291 }
292
293 if (EnvironmentEdgeManager.currentTime() - startTime >
294 maxRunningTime) {
295 break;
296 }
297 }
298 long endTime = EnvironmentEdgeManager.currentTime();
299
300 metricsBalancer.balanceCluster(endTime - startTime);
301
302 if (initCost > currentCost) {
303 plans = createRegionPlans(cluster);
304 if (LOG.isDebugEnabled()) {
305 LOG.debug("Finished computing new load balance plan. Computation took "
306 + (endTime - startTime) + "ms to try " + step
307 + " different iterations. Found a solution that moves "
308 + plans.size() + " regions; Going from a computed cost of "
309 + initCost + " to a new cost of " + currentCost);
310 }
311 return plans;
312 }
313 if (LOG.isDebugEnabled()) {
314 LOG.debug("Could not find a better load balance plan. Tried "
315 + step + " different configurations in " + (endTime - startTime)
316 + "ms, and did not find anything with a computed cost less than " + initCost);
317 }
318 return null;
319 }
320
321
322
323
324
325
326
327
328 private List<RegionPlan> createRegionPlans(Cluster cluster) {
329 List<RegionPlan> plans = new LinkedList<RegionPlan>();
330 for (int regionIndex = 0;
331 regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
332 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
333 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
334
335 if (initialServerIndex != newServerIndex) {
336 HRegionInfo region = cluster.regions[regionIndex];
337 ServerName initialServer = cluster.servers[initialServerIndex];
338 ServerName newServer = cluster.servers[newServerIndex];
339
340 if (LOG.isTraceEnabled()) {
341 LOG.trace("Moving Region " + region.getEncodedName() + " from server "
342 + initialServer.getHostname() + " to " + newServer.getHostname());
343 }
344 RegionPlan rp = new RegionPlan(region, initialServer, newServer);
345 plans.add(rp);
346 }
347 }
348 return plans;
349 }
350
351
352
353
354 private synchronized void updateRegionLoad() {
355
356
357 Map<String, Deque<RegionLoad>> oldLoads = loads;
358 loads = new HashMap<String, Deque<RegionLoad>>();
359
360 for (ServerName sn : clusterStatus.getServers()) {
361 ServerLoad sl = clusterStatus.getLoad(sn);
362 if (sl == null) {
363 continue;
364 }
365 for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
366 Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
367 if (rLoads == null) {
368
369 rLoads = new ArrayDeque<RegionLoad>();
370 } else if (rLoads.size() >= numRegionLoadsToRemember) {
371 rLoads.remove();
372 }
373 rLoads.add(entry.getValue());
374 loads.put(Bytes.toString(entry.getKey()), rLoads);
375
376 }
377 }
378
379 for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
380 cost.setLoads(loads);
381 }
382 }
383
384 protected void initCosts(Cluster cluster) {
385 for (CostFunction c:costFunctions) {
386 c.init(cluster);
387 }
388 }
389
390 protected void updateCostsWithAction(Cluster cluster, Action action) {
391 for (CostFunction c : costFunctions) {
392 c.postAction(action);
393 }
394 }
395
396
397
398
399
400
401
402
403
404
405 protected double computeCost(Cluster cluster, double previousCost) {
406 double total = 0;
407
408 for (CostFunction c:costFunctions) {
409 if (c.getMultiplier() <= 0) {
410 continue;
411 }
412
413 total += c.getMultiplier() * c.cost();
414
415 if (total > previousCost) {
416 return total;
417 }
418 }
419 return total;
420 }
421
422
423 abstract static class CandidateGenerator {
424 abstract Cluster.Action generate(Cluster cluster);
425
426
427
428
429
430
431
432
433
434
435
436
437
438 protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
439
440 if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
441
442 return -1;
443 }
444 int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
445 return cluster.regionsPerServer[server][rand];
446
447 }
448 protected int pickRandomServer(Cluster cluster) {
449 if (cluster.numServers < 1) {
450 return -1;
451 }
452
453 return RANDOM.nextInt(cluster.numServers);
454 }
455
456 protected int pickRandomRack(Cluster cluster) {
457 if (cluster.numRacks < 1) {
458 return -1;
459 }
460
461 return RANDOM.nextInt(cluster.numRacks);
462 }
463
464 protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
465 if (cluster.numServers < 2) {
466 return -1;
467 }
468 while (true) {
469 int otherServerIndex = pickRandomServer(cluster);
470 if (otherServerIndex != serverIndex) {
471 return otherServerIndex;
472 }
473 }
474 }
475
476 protected int pickOtherRandomRack(Cluster cluster, int rackIndex) {
477 if (cluster.numRacks < 2) {
478 return -1;
479 }
480 while (true) {
481 int otherRackIndex = pickRandomRack(cluster);
482 if (otherRackIndex != rackIndex) {
483 return otherRackIndex;
484 }
485 }
486 }
487
488 protected Cluster.Action pickRandomRegions(Cluster cluster,
489 int thisServer,
490 int otherServer) {
491 if (thisServer < 0 || otherServer < 0) {
492 return Cluster.NullAction;
493 }
494
495
496 int thisRegionCount = cluster.getNumRegions(thisServer);
497 int otherRegionCount = cluster.getNumRegions(otherServer);
498
499
500 double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
501 double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
502
503 int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
504 int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
505
506 return getAction(thisServer, thisRegion, otherServer, otherRegion);
507 }
508
509 protected Cluster.Action getAction (int fromServer, int fromRegion,
510 int toServer, int toRegion) {
511 if (fromServer < 0 || toServer < 0) {
512 return Cluster.NullAction;
513 }
514 if (fromRegion > 0 && toRegion > 0) {
515 return new Cluster.SwapRegionsAction(fromServer, fromRegion,
516 toServer, toRegion);
517 } else if (fromRegion > 0) {
518 return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
519 } else if (toRegion > 0) {
520 return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
521 } else {
522 return Cluster.NullAction;
523 }
524 }
525 }
526
527 static class RandomCandidateGenerator extends CandidateGenerator {
528
529 @Override
530 Cluster.Action generate(Cluster cluster) {
531
532 int thisServer = pickRandomServer(cluster);
533
534
535 int otherServer = pickOtherRandomServer(cluster, thisServer);
536
537 return pickRandomRegions(cluster, thisServer, otherServer);
538 }
539 }
540
541 static class LoadCandidateGenerator extends CandidateGenerator {
542
543 @Override
544 Cluster.Action generate(Cluster cluster) {
545 cluster.sortServersByRegionCount();
546 int thisServer = pickMostLoadedServer(cluster, -1);
547 int otherServer = pickLeastLoadedServer(cluster, thisServer);
548
549 return pickRandomRegions(cluster, thisServer, otherServer);
550 }
551
552 private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
553 Integer[] servers = cluster.serverIndicesSortedByRegionCount;
554
555 int index = 0;
556 while (servers[index] == null || servers[index] == thisServer) {
557 index++;
558 if (index == servers.length) {
559 return -1;
560 }
561 }
562 return servers[index];
563 }
564
565 private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
566 Integer[] servers = cluster.serverIndicesSortedByRegionCount;
567
568 int index = servers.length - 1;
569 while (servers[index] == null || servers[index] == thisServer) {
570 index--;
571 if (index < 0) {
572 return -1;
573 }
574 }
575 return servers[index];
576 }
577 }
578
579 static class LocalityBasedCandidateGenerator extends CandidateGenerator {
580
581 private MasterServices masterServices;
582
583 LocalityBasedCandidateGenerator(MasterServices masterServices) {
584 this.masterServices = masterServices;
585 }
586
587 @Override
588 Cluster.Action generate(Cluster cluster) {
589 if (this.masterServices == null) {
590 int thisServer = pickRandomServer(cluster);
591
592 int otherServer = pickOtherRandomServer(cluster, thisServer);
593 return pickRandomRegions(cluster, thisServer, otherServer);
594 }
595
596 cluster.calculateRegionServerLocalities();
597
598 int thisServer = pickLowestLocalityServer(cluster);
599 int thisRegion;
600 if (thisServer == -1) {
601 LOG.warn("Could not pick lowest locality region server");
602 return Cluster.NullAction;
603 } else {
604
605 thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer);
606 }
607
608 if (thisRegion == -1) {
609 return Cluster.NullAction;
610 }
611
612
613 int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion);
614
615 if (otherServer == -1) {
616 return Cluster.NullAction;
617 }
618
619
620 int otherRegion = -1;
621
622 return getAction(thisServer, thisRegion, otherServer, otherRegion);
623 }
624
625 private int pickLowestLocalityServer(Cluster cluster) {
626 return cluster.getLowestLocalityRegionServer();
627 }
628
629 private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) {
630 return cluster.getLowestLocalityRegionOnServer(server);
631 }
632
633 void setServices(MasterServices services) {
634 this.masterServices = services;
635 }
636 }
637
638
639
640
641
642 static class RegionReplicaCandidateGenerator extends CandidateGenerator {
643
644 RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
645
646
647
648
649
650
651
652
653
654
655 int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup
656 , int[] regionIndexToPrimaryIndex) {
657 int currentPrimary = -1;
658 int currentPrimaryIndex = -1;
659 int selectedPrimaryIndex = -1;
660 double currentLargestRandom = -1;
661
662
663
664 for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
665 int primary = j < primariesOfRegionsPerGroup.length
666 ? primariesOfRegionsPerGroup[j] : -1;
667 if (primary != currentPrimary) {
668 int numReplicas = j - currentPrimaryIndex;
669 if (numReplicas > 1) {
670
671 double currentRandom = RANDOM.nextDouble();
672
673
674 if (currentRandom > currentLargestRandom) {
675 selectedPrimaryIndex = currentPrimary;
676 currentLargestRandom = currentRandom;
677 }
678 }
679 currentPrimary = primary;
680 currentPrimaryIndex = j;
681 }
682 }
683
684
685
686 for (int j = 0; j < regionsPerGroup.length; j++) {
687 int regionIndex = regionsPerGroup[j];
688 if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
689
690 if (selectedPrimaryIndex != regionIndex) {
691 return regionIndex;
692 }
693 }
694 }
695 return -1;
696 }
697
698 @Override
699 Cluster.Action generate(Cluster cluster) {
700 int serverIndex = pickRandomServer(cluster);
701 if (cluster.numServers <= 1 || serverIndex == -1) {
702 return Cluster.NullAction;
703 }
704
705 int regionIndex = selectCoHostedRegionPerGroup(
706 cluster.primariesOfRegionsPerServer[serverIndex],
707 cluster.regionsPerServer[serverIndex],
708 cluster.regionIndexToPrimaryIndex);
709
710
711 if (regionIndex == -1) {
712
713 return randomGenerator.generate(cluster);
714 }
715
716 int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
717 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
718 return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
719 }
720 }
721
722
723
724
725
726 static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
727 @Override
728 Cluster.Action generate(Cluster cluster) {
729 int rackIndex = pickRandomRack(cluster);
730 if (cluster.numRacks <= 1 || rackIndex == -1) {
731 return super.generate(cluster);
732 }
733
734 int regionIndex = selectCoHostedRegionPerGroup(
735 cluster.primariesOfRegionsPerRack[rackIndex],
736 cluster.regionsPerRack[rackIndex],
737 cluster.regionIndexToPrimaryIndex);
738
739
740 if (regionIndex == -1) {
741
742 return randomGenerator.generate(cluster);
743 }
744
745 int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
746 int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
747
748 int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
749 int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
750 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
751 return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
752 }
753 }
754
755
756
757
758 abstract static class CostFunction {
759
760 private float multiplier = 0;
761
762 protected Cluster cluster;
763
764 CostFunction(Configuration c) {
765
766 }
767
768 float getMultiplier() {
769 return multiplier;
770 }
771
772 void setMultiplier(float m) {
773 this.multiplier = m;
774 }
775
776
777
778
779 void init(Cluster cluster) {
780 this.cluster = cluster;
781 }
782
783
784
785
786
787 void postAction(Action action) {
788 switch (action.type) {
789 case NULL: break;
790 case ASSIGN_REGION:
791 AssignRegionAction ar = (AssignRegionAction) action;
792 regionMoved(ar.region, -1, ar.server);
793 break;
794 case MOVE_REGION:
795 MoveRegionAction mra = (MoveRegionAction) action;
796 regionMoved(mra.region, mra.fromServer, mra.toServer);
797 break;
798 case SWAP_REGIONS:
799 SwapRegionsAction a = (SwapRegionsAction) action;
800 regionMoved(a.fromRegion, a.fromServer, a.toServer);
801 regionMoved(a.toRegion, a.toServer, a.fromServer);
802 break;
803 default:
804 throw new RuntimeException("Uknown action:" + action.type);
805 }
806 }
807
808 protected void regionMoved(int region, int oldServer, int newServer) {
809 }
810
811 abstract double cost();
812
813
814
815
816
817
818
819
820
821 protected double costFromArray(double[] stats) {
822 double totalCost = 0;
823 double total = getSum(stats);
824
825 double count = stats.length;
826 double mean = total/count;
827
828
829
830 double max = ((count - 1) * mean) + (total - mean);
831
832
833 double min;
834 if (count > total) {
835 min = ((count - total) * mean) + ((1 - mean) * total);
836 } else {
837
838 int numHigh = (int) (total - (Math.floor(mean) * count));
839 int numLow = (int) (count - numHigh);
840
841 min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
842
843 }
844 min = Math.max(0, min);
845 for (int i=0; i<stats.length; i++) {
846 double n = stats[i];
847 double diff = Math.abs(mean - n);
848 totalCost += diff;
849 }
850
851 double scaled = scale(min, max, totalCost);
852 return scaled;
853 }
854
855 private double getSum(double[] stats) {
856 double total = 0;
857 for(double s:stats) {
858 total += s;
859 }
860 return total;
861 }
862
863
864
865
866
867
868
869
870
871 protected double scale(double min, double max, double value) {
872 if (max <= min || value <= min) {
873 return 0;
874 }
875 if ((max - min) == 0) return 0;
876
877 return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
878 }
879 }
880
881
882
883
884
885 static class MoveCostFunction extends CostFunction {
886 private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
887 private static final String MAX_MOVES_PERCENT_KEY =
888 "hbase.master.balancer.stochastic.maxMovePercent";
889 private static final float DEFAULT_MOVE_COST = 100;
890 private static final int DEFAULT_MAX_MOVES = 600;
891 private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
892
893 private final float maxMovesPercent;
894
895 MoveCostFunction(Configuration conf) {
896 super(conf);
897
898
899
900 this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
901
902 maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
903 }
904
905 @Override
906 double cost() {
907
908 int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
909 DEFAULT_MAX_MOVES);
910
911 double moveCost = cluster.numMovedRegions;
912
913
914
915 if (moveCost > maxMoves) {
916 return 1000000;
917 }
918
919 return scale(0, cluster.numRegions, moveCost);
920 }
921 }
922
923
924
925
926
927 static class RegionCountSkewCostFunction extends CostFunction {
928 private static final String REGION_COUNT_SKEW_COST_KEY =
929 "hbase.master.balancer.stochastic.regionCountCost";
930 private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
931
932 private double[] stats = null;
933
934 RegionCountSkewCostFunction(Configuration conf) {
935 super(conf);
936
937 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
938 }
939
940 @Override
941 double cost() {
942 if (stats == null || stats.length != cluster.numServers) {
943 stats = new double[cluster.numServers];
944 }
945
946 for (int i =0; i < cluster.numServers; i++) {
947 stats[i] = cluster.regionsPerServer[i].length;
948 }
949
950 return costFromArray(stats);
951 }
952 }
953
954
955
956
957
958 static class PrimaryRegionCountSkewCostFunction extends CostFunction {
959 private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
960 "hbase.master.balancer.stochastic.primaryRegionCountCost";
961 private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
962
963 private double[] stats = null;
964
965 PrimaryRegionCountSkewCostFunction(Configuration conf) {
966 super(conf);
967
968 this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
969 DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
970 }
971
972 @Override
973 double cost() {
974 if (!cluster.hasRegionReplicas) {
975 return 0;
976 }
977 if (stats == null || stats.length != cluster.numServers) {
978 stats = new double[cluster.numServers];
979 }
980
981 for (int i =0; i < cluster.numServers; i++) {
982 stats[i] = 0;
983 for (int regionIdx : cluster.regionsPerServer[i]) {
984 if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
985 stats[i] ++;
986 }
987 }
988 }
989
990 return costFromArray(stats);
991 }
992 }
993
994
995
996
997
998 static class TableSkewCostFunction extends CostFunction {
999
1000 private static final String TABLE_SKEW_COST_KEY =
1001 "hbase.master.balancer.stochastic.tableSkewCost";
1002 private static final float DEFAULT_TABLE_SKEW_COST = 35;
1003
1004 TableSkewCostFunction(Configuration conf) {
1005 super(conf);
1006 this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
1007 }
1008
1009 @Override
1010 double cost() {
1011 double max = cluster.numRegions;
1012 double min = ((double) cluster.numRegions) / cluster.numServers;
1013 double value = 0;
1014
1015 for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
1016 value += cluster.numMaxRegionsPerTable[i];
1017 }
1018
1019 return scale(min, max, value);
1020 }
1021 }
1022
1023
1024
1025
1026
1027 static class LocalityCostFunction extends CostFunction {
1028
1029 private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
1030 private static final float DEFAULT_LOCALITY_COST = 25;
1031
1032 private MasterServices services;
1033
1034 LocalityCostFunction(Configuration conf, MasterServices srv) {
1035 super(conf);
1036 this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
1037 this.services = srv;
1038 }
1039
1040 void setServices(MasterServices srvc) {
1041 this.services = srvc;
1042 }
1043
1044 @Override
1045 double cost() {
1046 double max = 0;
1047 double cost = 0;
1048
1049
1050 if (this.services == null) {
1051 return cost;
1052 }
1053
1054 for (int i = 0; i < cluster.regionLocations.length; i++) {
1055 max += 1;
1056 int serverIndex = cluster.regionIndexToServerIndex[i];
1057 int[] regionLocations = cluster.regionLocations[i];
1058
1059
1060
1061 if (regionLocations == null) {
1062 continue;
1063 }
1064
1065 int index = -1;
1066 for (int j = 0; j < regionLocations.length; j++) {
1067 if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
1068 index = j;
1069 break;
1070 }
1071 }
1072
1073 if (index < 0) {
1074 cost += 1;
1075 } else {
1076 cost += (1 - cluster.getLocalityOfRegion(i, index));
1077 }
1078 }
1079 return scale(0, max, cost);
1080 }
1081 }
1082
1083
1084
1085
1086
1087 abstract static class CostFromRegionLoadFunction extends CostFunction {
1088
1089 private ClusterStatus clusterStatus = null;
1090 private Map<String, Deque<RegionLoad>> loads = null;
1091 private double[] stats = null;
1092 CostFromRegionLoadFunction(Configuration conf) {
1093 super(conf);
1094 }
1095
1096 void setClusterStatus(ClusterStatus status) {
1097 this.clusterStatus = status;
1098 }
1099
1100 void setLoads(Map<String, Deque<RegionLoad>> l) {
1101 this.loads = l;
1102 }
1103
1104 @Override
1105 double cost() {
1106 if (clusterStatus == null || loads == null) {
1107 return 0;
1108 }
1109
1110 if (stats == null || stats.length != cluster.numServers) {
1111 stats = new double[cluster.numServers];
1112 }
1113
1114 for (int i =0; i < stats.length; i++) {
1115
1116 long cost = 0;
1117
1118
1119 for(int regionIndex:cluster.regionsPerServer[i]) {
1120 Collection<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
1121
1122
1123 if (regionLoadList != null) {
1124 cost += getRegionLoadCost(regionLoadList);
1125 }
1126 }
1127
1128
1129 stats[i] = cost;
1130 }
1131
1132
1133 return costFromArray(stats);
1134 }
1135
1136 protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) {
1137 double cost = 0;
1138
1139 for (RegionLoad rl : regionLoadList) {
1140 double toAdd = getCostFromRl(rl);
1141
1142 if (cost == 0) {
1143 cost = toAdd;
1144 } else {
1145 cost = (.5 * cost) + (.5 * toAdd);
1146 }
1147 }
1148
1149 return cost;
1150 }
1151
1152 protected abstract double getCostFromRl(RegionLoad rl);
1153 }
1154
1155
1156
1157
1158
1159
1160 static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
1161
1162 private static final String READ_REQUEST_COST_KEY =
1163 "hbase.master.balancer.stochastic.readRequestCost";
1164 private static final float DEFAULT_READ_REQUEST_COST = 5;
1165
1166 ReadRequestCostFunction(Configuration conf) {
1167 super(conf);
1168 this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
1169 }
1170
1171
1172 @Override
1173 protected double getCostFromRl(RegionLoad rl) {
1174 return rl.getReadRequestsCount();
1175 }
1176 }
1177
1178
1179
1180
1181
1182 static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
1183
1184 private static final String WRITE_REQUEST_COST_KEY =
1185 "hbase.master.balancer.stochastic.writeRequestCost";
1186 private static final float DEFAULT_WRITE_REQUEST_COST = 5;
1187
1188 WriteRequestCostFunction(Configuration conf) {
1189 super(conf);
1190 this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
1191 }
1192
1193 @Override
1194 protected double getCostFromRl(RegionLoad rl) {
1195 return rl.getWriteRequestsCount();
1196 }
1197 }
1198
1199
1200
1201
1202
1203
1204
1205 static class RegionReplicaHostCostFunction extends CostFunction {
1206 private static final String REGION_REPLICA_HOST_COST_KEY =
1207 "hbase.master.balancer.stochastic.regionReplicaHostCostKey";
1208 private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
1209
1210 long maxCost = 0;
1211 long[] costsPerGroup;
1212 int[][] primariesOfRegionsPerGroup;
1213
1214 public RegionReplicaHostCostFunction(Configuration conf) {
1215 super(conf);
1216 this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
1217 DEFAULT_REGION_REPLICA_HOST_COST_KEY));
1218 }
1219
1220 @Override
1221 void init(Cluster cluster) {
1222 super.init(cluster);
1223
1224 maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
1225 costsPerGroup = new long[cluster.numHosts];
1226 primariesOfRegionsPerGroup = cluster.multiServersPerHost
1227 ? cluster.primariesOfRegionsPerHost
1228 : cluster.primariesOfRegionsPerServer;
1229 for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
1230 costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
1231 }
1232 }
1233
1234 long getMaxCost(Cluster cluster) {
1235 if (!cluster.hasRegionReplicas) {
1236 return 0;
1237 }
1238
1239 int[] primariesOfRegions = new int[cluster.numRegions];
1240 System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
1241 cluster.regions.length);
1242
1243 Arrays.sort(primariesOfRegions);
1244
1245
1246 return costPerGroup(primariesOfRegions);
1247 }
1248
1249 @Override
1250 double cost() {
1251 if (maxCost <= 0) {
1252 return 0;
1253 }
1254
1255 long totalCost = 0;
1256 for (int i = 0 ; i < costsPerGroup.length; i++) {
1257 totalCost += costsPerGroup[i];
1258 }
1259 return scale(0, maxCost, totalCost);
1260 }
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270 protected long costPerGroup(int[] primariesOfRegions) {
1271 long cost = 0;
1272 int currentPrimary = -1;
1273 int currentPrimaryIndex = -1;
1274
1275
1276 for (int j = 0 ; j <= primariesOfRegions.length; j++) {
1277 int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
1278 if (primary != currentPrimary) {
1279 int numReplicas = j - currentPrimaryIndex;
1280
1281 if (numReplicas > 1) {
1282 cost += (numReplicas - 1) * (numReplicas - 1);
1283 }
1284 currentPrimary = primary;
1285 currentPrimaryIndex = j;
1286 }
1287 }
1288
1289 return cost;
1290 }
1291
1292 @Override
1293 protected void regionMoved(int region, int oldServer, int newServer) {
1294 if (maxCost <= 0) {
1295 return;
1296 }
1297 if (cluster.multiServersPerHost) {
1298 int oldHost = cluster.serverIndexToHostIndex[oldServer];
1299 int newHost = cluster.serverIndexToHostIndex[newServer];
1300 if (newHost != oldHost) {
1301 costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
1302 costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
1303 }
1304 } else {
1305 costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
1306 costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
1307 }
1308 }
1309 }
1310
1311
1312
1313
1314
1315
1316 static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
1317 private static final String REGION_REPLICA_RACK_COST_KEY =
1318 "hbase.master.balancer.stochastic.regionReplicaRackCostKey";
1319 private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
1320
1321 public RegionReplicaRackCostFunction(Configuration conf) {
1322 super(conf);
1323 this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
1324 }
1325
1326 @Override
1327 void init(Cluster cluster) {
1328 this.cluster = cluster;
1329 if (cluster.numRacks <= 1) {
1330 maxCost = 0;
1331 return;
1332 }
1333
1334 maxCost = getMaxCost(cluster);
1335 costsPerGroup = new long[cluster.numRacks];
1336 for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
1337 costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
1338 }
1339 }
1340
1341 @Override
1342 protected void regionMoved(int region, int oldServer, int newServer) {
1343 if (maxCost <= 0) {
1344 return;
1345 }
1346 int oldRack = cluster.serverIndexToRackIndex[oldServer];
1347 int newRack = cluster.serverIndexToRackIndex[newServer];
1348 if (newRack != oldRack) {
1349 costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
1350 costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
1351 }
1352 }
1353 }
1354
1355
1356
1357
1358
1359 static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
1360
1361 private static final String MEMSTORE_SIZE_COST_KEY =
1362 "hbase.master.balancer.stochastic.memstoreSizeCost";
1363 private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
1364
1365 MemstoreSizeCostFunction(Configuration conf) {
1366 super(conf);
1367 this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
1368 }
1369
1370 @Override
1371 protected double getCostFromRl(RegionLoad rl) {
1372 return rl.getMemStoreSizeMB();
1373 }
1374 }
1375
1376
1377
1378
1379 static class StoreFileCostFunction extends CostFromRegionLoadFunction {
1380
1381 private static final String STOREFILE_SIZE_COST_KEY =
1382 "hbase.master.balancer.stochastic.storefileSizeCost";
1383 private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
1384
1385 StoreFileCostFunction(Configuration conf) {
1386 super(conf);
1387 this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
1388 }
1389
1390 @Override
1391 protected double getCostFromRl(RegionLoad rl) {
1392 return rl.getStorefileSizeMB();
1393 }
1394 }
1395 }