View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.chaos.monkies;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.List;
24  
25  import org.apache.commons.lang.math.RandomUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.IntegrationTestingUtility;
29  import org.apache.hadoop.hbase.chaos.policies.Policy;
30  import org.apache.hadoop.hbase.util.Pair;
31  
32  /**
33   * Chaos monkey that given multiple policies will run actions against the cluster.
34   */
35  public class PolicyBasedChaosMonkey extends ChaosMonkey {
36  
37    private static final Log LOG = LogFactory.getLog(PolicyBasedChaosMonkey.class);
38    private static final long ONE_SEC = 1000;
39    private static final long FIVE_SEC = 5 * ONE_SEC;
40    private static final long ONE_MIN = 60 * ONE_SEC;
41  
42    public static final long TIMEOUT = ONE_MIN;
43  
44    final IntegrationTestingUtility util;
45  
46    /**
47     * Construct a new ChaosMonkey
48     * @param util the HBaseIntegrationTestingUtility already configured
49     * @param policies custom policies to use
50     */
51    public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) {
52      this.util = util;
53      this.policies = policies;
54    }
55  
56    public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) {
57      this.util = util;
58      this.policies = policies.toArray(new Policy[policies.size()]);
59    }
60  
61  
62    /** Selects a random item from the given items */
63    public static <T> T selectRandomItem(T[] items) {
64      return items[RandomUtils.nextInt(items.length)];
65    }
66  
67    /** Selects a random item from the given items with weights*/
68    public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) {
69      int totalWeight = 0;
70      for (Pair<T, Integer> pair : items) {
71        totalWeight += pair.getSecond();
72      }
73  
74      int cutoff = RandomUtils.nextInt(totalWeight);
75      int cummulative = 0;
76      T item = null;
77  
78      //warn: O(n)
79      for (int i=0; i<items.size(); i++) {
80        int curWeight = items.get(i).getSecond();
81        if ( cutoff < cummulative + curWeight) {
82          item = items.get(i).getFirst();
83          break;
84        }
85        cummulative += curWeight;
86      }
87  
88      return item;
89    }
90  
91    /** Selects and returns ceil(ratio * items.length) random items from the given array */
92    public static <T> List<T> selectRandomItems(T[] items, float ratio) {
93      int remaining = (int)Math.ceil(items.length * ratio);
94  
95      List<T> selectedItems = new ArrayList<T>(remaining);
96  
97      for (int i=0; i<items.length && remaining > 0; i++) {
98        if (RandomUtils.nextFloat() < ((float)remaining/(items.length-i))) {
99          selectedItems.add(items[i]);
100         remaining--;
101       }
102     }
103 
104     return selectedItems;
105   }
106 
107   private Policy[] policies;
108   private Thread[] monkeyThreads;
109 
110   @Override
111   public void start() throws Exception {
112     monkeyThreads = new Thread[policies.length];
113 
114     for (int i=0; i<policies.length; i++) {
115       policies[i].init(new Policy.PolicyContext(this.util));
116       Thread monkeyThread = new Thread(policies[i]);
117       monkeyThread.start();
118       monkeyThreads[i] = monkeyThread;
119     }
120   }
121 
122   @Override
123   public void stop(String why) {
124     if (policies == null) {
125       return;
126     }
127 
128     for (Policy policy : policies) {
129       policy.stop(why);
130     }
131   }
132 
133   @Override
134   public boolean isStopped() {
135     return policies[0].isStopped();
136   }
137 
138   /**
139    * Wait for ChaosMonkey to stop.
140    * @throws InterruptedException
141    */
142   @Override
143   public void waitForStop() throws InterruptedException {
144     if (monkeyThreads == null) {
145       return;
146     }
147     for (Thread monkeyThread : monkeyThreads) {
148       // TODO: bound the wait time per policy
149       monkeyThread.join();
150     }
151   }
152 
153   @Override
154   public boolean isDestructive() {
155     // TODO: we can look at the actions, and decide to do the restore cluster or not based on them.
156     return true;
157   }
158 }