View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.nio.ByteBuffer;
23  import java.util.concurrent.ConcurrentLinkedDeque;
24  
25  import org.apache.hadoop.hbase.testclassification.SmallTests;
26  import org.junit.After;
27  import org.junit.Before;
28  import org.junit.Test;
29  import org.junit.experimental.categories.Category;
30  
31  @Category({ SmallTests.class })
32  public class TestBoundedByteBufferPool {
33    final int maxByteBufferSizeToCache = 10;
34    final int initialByteBufferSize = 1;
35    final int maxToCache = 10;
36    BoundedByteBufferPool reservoir;
37  
38    @Before
39    public void before() {
40      this.reservoir =
41        new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache);
42    }
43  
44    @After
45    public void after() {
46      this.reservoir = null;
47    }
48  
49    @Test
50    public void testEquivalence() {
51      ByteBuffer bb = ByteBuffer.allocate(1);
52      this.reservoir.putBuffer(bb);
53      this.reservoir.putBuffer(bb);
54      this.reservoir.putBuffer(bb);
55      assertEquals(3, this.reservoir.buffers.size());
56    }
57  
58    @Test
59    public void testGetPut() {
60      ByteBuffer bb = this.reservoir.getBuffer();
61      assertEquals(initialByteBufferSize, bb.capacity());
62      assertEquals(0, this.reservoir.buffers.size());
63      this.reservoir.putBuffer(bb);
64      assertEquals(1, this.reservoir.buffers.size());
65      // Now remove a buffer and don't put it back so reservoir is empty.
66      this.reservoir.getBuffer();
67      assertEquals(0, this.reservoir.buffers.size());
68      // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works.
69      // Need to add then remove, then get a new bytebuffer so reservoir internally is doing
70      // allocation
71      final int newCapacity = 2;
72      this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
73      assertEquals(1, reservoir.buffers.size());
74      this.reservoir.getBuffer();
75      assertEquals(0, this.reservoir.buffers.size());
76      bb = this.reservoir.getBuffer();
77      assertEquals(newCapacity, bb.capacity());
78      // Assert that adding a too-big buffer won't happen
79      assertEquals(0, this.reservoir.buffers.size());
80      this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2));
81      assertEquals(0, this.reservoir.buffers.size());
82      // Assert we can't add more than max allowed instances.
83      for (int i = 0; i < maxToCache; i++) {
84        this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
85      }
86      assertEquals(maxToCache, this.reservoir.buffers.size());
87    }
88  
89    @Test
90    public void testBufferSizeGrowWithMultiThread() throws Exception {
91      final ConcurrentLinkedDeque<ByteBuffer> bufferQueue = new ConcurrentLinkedDeque<ByteBuffer>();
92      int takeBufferThreadsCount = 30;
93      int putBufferThreadsCount = 1;
94      Thread takeBufferThreads[] = new Thread[takeBufferThreadsCount];
95      for (int i = 0; i < takeBufferThreadsCount; i++) {
96        takeBufferThreads[i] = new Thread(new Runnable() {
97          @Override
98          public void run() {
99            while (true) {
100             ByteBuffer buffer = reservoir.getBuffer();
101             try {
102               Thread.sleep(5);
103             } catch (InterruptedException e) {
104               break;
105             }
106             bufferQueue.offer(buffer);
107             if (Thread.currentThread().isInterrupted()) break;
108           }
109         }
110       });
111     }
112 
113     Thread putBufferThread[] = new Thread[putBufferThreadsCount];
114     for (int i = 0; i < putBufferThreadsCount; i++) {
115       putBufferThread[i] = new Thread(new Runnable() {
116         @Override
117         public void run() {
118           while (true) {
119             ByteBuffer buffer = bufferQueue.poll();
120             if (buffer != null) {
121               reservoir.putBuffer(buffer);
122             }
123             if (Thread.currentThread().isInterrupted()) break;
124           }
125         }
126       });
127     }
128 
129     for (int i = 0; i < takeBufferThreadsCount; i++) {
130       takeBufferThreads[i].start();
131     }
132     for (int i = 0; i < putBufferThreadsCount; i++) {
133       putBufferThread[i].start();
134     }
135     Thread.sleep(2 * 1000);// Let the threads run for 2 secs
136     for (int i = 0; i < takeBufferThreadsCount; i++) {
137       takeBufferThreads[i].interrupt();
138       takeBufferThreads[i].join();
139     }
140     for (int i = 0; i < putBufferThreadsCount; i++) {
141       putBufferThread[i].interrupt();
142       putBufferThread[i].join();
143     }
144     // None of the BBs we got from pool is growing while in use. So we should not change the
145     // runningAverage in pool
146     assertEquals(initialByteBufferSize, this.reservoir.runningAverage);
147   }
148 }