1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile.bucket;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.List;
29 import java.util.Random;
30 import java.util.concurrent.locks.ReentrantReadWriteLock;
31
32 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
33 import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
34 import org.apache.hadoop.hbase.io.hfile.Cacheable;
35 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
36 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
37 import org.apache.hadoop.hbase.testclassification.SmallTests;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.junit.experimental.categories.Category;
42 import org.junit.runner.RunWith;
43 import org.junit.runners.Parameterized;
44
45
46
47
48
49
50 @RunWith(Parameterized.class)
51 @Category(SmallTests.class)
52 public class TestBucketCache {
53
54 private static final Random RAND = new Random();
55
56 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
57 public static Iterable<Object[]> data() {
58 return Arrays.asList(new Object[][] {
59 { 8192, null },
60 {
61 16 * 1024,
62 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
63 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
64 128 * 1024 + 1024 } } });
65 }
66
67 @Parameterized.Parameter(0)
68 public int constructedBlockSize;
69
70 @Parameterized.Parameter(1)
71 public int[] constructedBlockSizes;
72
73 BucketCache cache;
74 final int CACHE_SIZE = 1000000;
75 final int NUM_BLOCKS = 100;
76 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
77 final int NUM_THREADS = 100;
78 final int NUM_QUERIES = 10000;
79
80 final long capacitySize = 32 * 1024 * 1024;
81 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
82 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
83 String ioEngineName = "heap";
84 String persistencePath = null;
85
86 private class MockedBucketCache extends BucketCache {
87
88 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
89 int writerThreads, int writerQLen, String persistencePath) throws FileNotFoundException,
90 IOException {
91 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
92 persistencePath);
93 super.wait_when_cache = true;
94 }
95
96 @Override
97 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
98 boolean cacheDataInL1) {
99 if (super.getBlock(cacheKey, true, false, true) != null) {
100 throw new RuntimeException("Cached an already cached block");
101 }
102 super.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1);
103 }
104
105 @Override
106 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
107 if (super.getBlock(cacheKey, true, false, true) != null) {
108 throw new RuntimeException("Cached an already cached block");
109 }
110 super.cacheBlock(cacheKey, buf);
111 }
112 }
113
114 @Before
115 public void setup() throws FileNotFoundException, IOException {
116 cache =
117 new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
118 constructedBlockSizes, writeThreads, writerQLen, persistencePath);
119 }
120
121 @After
122 public void tearDown() {
123 cache.shutdown();
124 }
125
126
127
128
129 private static <T> T randFrom(List<T> a) {
130 return a.get(RAND.nextInt(a.size()));
131 }
132
133 @Test
134 public void testBucketAllocator() throws BucketAllocatorException {
135 BucketAllocator mAllocator = cache.getAllocator();
136
137
138
139 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
140
141 boolean full = false;
142 ArrayList<Long> allocations = new ArrayList<Long>();
143
144
145 List<Integer> tmp = new ArrayList<Integer>(BLOCKSIZES);
146 while (!full) {
147 Integer blockSize = null;
148 try {
149 blockSize = randFrom(tmp);
150 allocations.add(mAllocator.allocateBlock(blockSize));
151 } catch (CacheFullException cfe) {
152 tmp.remove(blockSize);
153 if (tmp.isEmpty()) full = true;
154 }
155 }
156
157 for (Integer blockSize : BLOCKSIZES) {
158 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
159 IndexStatistics indexStatistics = bucketSizeInfo.statistics();
160 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
161 }
162
163 for (long offset : allocations) {
164 assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset));
165 }
166 assertEquals(0, mAllocator.getUsedSize());
167 }
168
169 @Test
170 public void testCacheSimple() throws Exception {
171 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
172 }
173
174 @Test
175 public void testCacheMultiThreadedSingleKey() throws Exception {
176 CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * NUM_QUERIES);
177 }
178
179 @Test
180 public void testHeapSizeChanges() throws Exception {
181 cache.stopWriterThreads();
182 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
183 }
184
185
186
187 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
188 Cacheable block) throws InterruptedException {
189 cache.cacheBlock(cacheKey, block);
190 while (!cache.backingMap.containsKey(cacheKey)) {
191 Thread.sleep(100);
192 }
193 }
194
195 @Test
196 public void testMemoryLeak() throws Exception {
197 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
198 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
199 new byte[10]));
200 long lockId = cache.backingMap.get(cacheKey).offset();
201 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
202 lock.writeLock().lock();
203 Thread evictThread = new Thread("evict-block") {
204
205 @Override
206 public void run() {
207 cache.evictBlock(cacheKey);
208 }
209
210 };
211 evictThread.start();
212 cache.offsetLock.waitForWaiters(lockId, 1);
213 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
214 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
215 new byte[10]));
216 lock.writeLock().unlock();
217 evictThread.join();
218 assertEquals(1L, cache.getBlockCount());
219 assertTrue(cache.getCurrentSize() > 0L);
220 assertTrue("We should have a block!", cache.iterator().hasNext());
221 }
222 }