View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.util.concurrent.BlockingQueue;
22  import java.util.concurrent.LinkedBlockingQueue;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.util.ByteRange;
30  import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
31  
32  import com.google.common.base.Preconditions;
33  
34  /**
35   * A memstore-local allocation buffer.
36   * <p>
37   * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
38   * big (2MB) byte[] chunks from and then doles it out to threads that request
39   * slices into the array.
40   * <p>
41   * The purpose of this class is to combat heap fragmentation in the
42   * regionserver. By ensuring that all KeyValues in a given memstore refer
43   * only to large chunks of contiguous memory, we ensure that large blocks
44   * get freed up when the memstore is flushed.
45   * <p>
46   * Without the MSLAB, the byte array allocated during insertion end up
47   * interleaved throughout the heap, and the old generation gets progressively
48   * more fragmented until a stop-the-world compacting collection occurs.
49   * <p>
50   * TODO: we should probably benchmark whether word-aligning the allocations
51   * would provide a performance improvement - probably would speed up the
52   * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
53   * anyway
54   */
55  @InterfaceAudience.Private
56  public class HeapMemStoreLAB implements MemStoreLAB {
57  
58    static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
59    static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
60    static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
61    static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
62                                                     // allocator
63  
64    private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
65    // A queue of chunks contained by this memstore
66    private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
67    final int chunkSize;
68    final int maxAlloc;
69    private final MemStoreChunkPool chunkPool;
70  
71    // This flag is for closing this instance, its set when clearing snapshot of
72    // memstore
73    private volatile boolean closed = false;
74    // This flag is for reclaiming chunks. Its set when putting chunks back to
75    // pool
76    private AtomicBoolean reclaimed = new AtomicBoolean(false);
77    // Current count of open scanners which reading data from this MemStoreLAB
78    private final AtomicInteger openScannerCount = new AtomicInteger();
79  
80    // Used in testing
81    public HeapMemStoreLAB() {
82      this(new Configuration());
83    }
84  
85    public HeapMemStoreLAB(Configuration conf) {
86      chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
87      maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
88      this.chunkPool = MemStoreChunkPool.getPool(conf);
89  
90      // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
91      Preconditions.checkArgument(
92        maxAlloc <= chunkSize,
93        MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
94    }
95  
96    /**
97     * Allocate a slice of the given length.
98     *
99     * If the size is larger than the maximum size specified for this
100    * allocator, returns null.
101    */
102   @Override
103   public ByteRange allocateBytes(int size) {
104     Preconditions.checkArgument(size >= 0, "negative size");
105 
106     // Callers should satisfy large allocations directly from JVM since they
107     // don't cause fragmentation as badly.
108     if (size > maxAlloc) {
109       return null;
110     }
111 
112     while (true) {
113       Chunk c = getOrMakeChunk();
114 
115       // Try to allocate from this chunk
116       int allocOffset = c.alloc(size);
117       if (allocOffset != -1) {
118         // We succeeded - this is the common case - small alloc
119         // from a big buffer
120         return new SimpleMutableByteRange(c.data, allocOffset, size);
121       }
122 
123       // not enough space!
124       // try to retire this chunk
125       tryRetireChunk(c);
126     }
127   }
128 
129   /**
130    * Close this instance since it won't be used any more, try to put the chunks
131    * back to pool
132    */
133   @Override
134   public void close() {
135     this.closed = true;
136     // We could put back the chunks to pool for reusing only when there is no
137     // opening scanner which will read their data
138     if (chunkPool != null && openScannerCount.get() == 0
139         && reclaimed.compareAndSet(false, true)) {
140       chunkPool.putbackChunks(this.chunkQueue);
141     }
142   }
143 
144   /**
145    * Called when opening a scanner on the data of this MemStoreLAB
146    */
147   @Override
148   public void incScannerCount() {
149     this.openScannerCount.incrementAndGet();
150   }
151 
152   /**
153    * Called when closing a scanner on the data of this MemStoreLAB
154    */
155   @Override
156   public void decScannerCount() {
157     int count = this.openScannerCount.decrementAndGet();
158     if (chunkPool != null && count == 0 && this.closed
159         && reclaimed.compareAndSet(false, true)) {
160       chunkPool.putbackChunks(this.chunkQueue);
161     }
162   }
163 
164   /**
165    * Try to retire the current chunk if it is still
166    * <code>c</code>. Postcondition is that curChunk.get()
167    * != c
168    */
169   private void tryRetireChunk(Chunk c) {
170     curChunk.compareAndSet(c, null);
171     // If the CAS succeeds, that means that we won the race
172     // to retire the chunk. We could use this opportunity to
173     // update metrics on external fragmentation.
174     //
175     // If the CAS fails, that means that someone else already
176     // retired the chunk for us.
177   }
178 
179   /**
180    * Get the current chunk, or, if there is no current chunk,
181    * allocate a new one from the JVM.
182    */
183   private Chunk getOrMakeChunk() {
184     while (true) {
185       // Try to get the chunk
186       Chunk c = curChunk.get();
187       if (c != null) {
188         return c;
189       }
190 
191       // No current chunk, so we want to allocate one. We race
192       // against other allocators to CAS in an uninitialized chunk
193       // (which is cheap to allocate)
194       c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
195       if (curChunk.compareAndSet(null, c)) {
196         // we won race - now we need to actually do the expensive
197         // allocation step
198         c.init();
199         this.chunkQueue.add(c);
200         return c;
201       } else if (chunkPool != null) {
202         chunkPool.putbackChunk(c);
203       }
204       // someone else won race - that's fine, we'll try to grab theirs
205       // in the next iteration of the loop.
206     }
207   }
208 
209   /**
210    * A chunk of memory out of which allocations are sliced.
211    */
212   static class Chunk {
213     /** Actual underlying data */
214     private byte[] data;
215 
216     private static final int UNINITIALIZED = -1;
217     private static final int OOM = -2;
218     /**
219      * Offset for the next allocation, or the sentinel value -1
220      * which implies that the chunk is still uninitialized.
221      * */
222     private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
223 
224     /** Total number of allocations satisfied from this buffer */
225     private AtomicInteger allocCount = new AtomicInteger();
226 
227     /** Size of chunk in bytes */
228     private final int size;
229 
230     /**
231      * Create an uninitialized chunk. Note that memory is not allocated yet, so
232      * this is cheap.
233      * @param size in bytes
234      */
235     Chunk(int size) {
236       this.size = size;
237     }
238 
239     /**
240      * Actually claim the memory for this chunk. This should only be called from
241      * the thread that constructed the chunk. It is thread-safe against other
242      * threads calling alloc(), who will block until the allocation is complete.
243      */
244     public void init() {
245       assert nextFreeOffset.get() == UNINITIALIZED;
246       try {
247         if (data == null) {
248           data = new byte[size];
249         }
250       } catch (OutOfMemoryError e) {
251         boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
252         assert failInit; // should be true.
253         throw e;
254       }
255       // Mark that it's ready for use
256       boolean initted = nextFreeOffset.compareAndSet(
257           UNINITIALIZED, 0);
258       // We should always succeed the above CAS since only one thread
259       // calls init()!
260       Preconditions.checkState(initted,
261           "Multiple threads tried to init same chunk");
262     }
263 
264     /**
265      * Reset the offset to UNINITIALIZED before before reusing an old chunk
266      */
267     void reset() {
268       if (nextFreeOffset.get() != UNINITIALIZED) {
269         nextFreeOffset.set(UNINITIALIZED);
270         allocCount.set(0);
271       }
272     }
273 
274     /**
275      * Try to allocate <code>size</code> bytes from the chunk.
276      * @return the offset of the successful allocation, or -1 to indicate not-enough-space
277      */
278     public int alloc(int size) {
279       while (true) {
280         int oldOffset = nextFreeOffset.get();
281         if (oldOffset == UNINITIALIZED) {
282           // The chunk doesn't have its data allocated yet.
283           // Since we found this in curChunk, we know that whoever
284           // CAS-ed it there is allocating it right now. So spin-loop
285           // shouldn't spin long!
286           Thread.yield();
287           continue;
288         }
289         if (oldOffset == OOM) {
290           // doh we ran out of ram. return -1 to chuck this away.
291           return -1;
292         }
293 
294         if (oldOffset + size > data.length) {
295           return -1; // alloc doesn't fit
296         }
297 
298         // Try to atomically claim this chunk
299         if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
300           // we got the alloc
301           allocCount.incrementAndGet();
302           return oldOffset;
303         }
304         // we raced and lost alloc, try again
305       }
306     }
307 
308     @Override
309     public String toString() {
310       return "Chunk@" + System.identityHashCode(this) +
311         " allocs=" + allocCount.get() + "waste=" +
312         (data.length - nextFreeOffset.get());
313     }
314   }
315 }