View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile.bucket;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.List;
29  import java.util.Random;
30  import java.util.concurrent.locks.ReentrantReadWriteLock;
31  
32  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
33  import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
34  import org.apache.hadoop.hbase.io.hfile.Cacheable;
35  import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
36  import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
37  import org.apache.hadoop.hbase.testclassification.SmallTests;
38  import org.junit.After;
39  import org.junit.Before;
40  import org.junit.Test;
41  import org.junit.experimental.categories.Category;
42  import org.junit.runner.RunWith;
43  import org.junit.runners.Parameterized;
44  
45  /**
46   * Basic test of BucketCache.Puts and gets.
47   * <p>
48   * Tests will ensure that blocks' data correctness under several threads concurrency
49   */
50  @RunWith(Parameterized.class)
51  @Category(SmallTests.class)
52  public class TestBucketCache {
53  
54    private static final Random RAND = new Random();
55  
56    @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
57    public static Iterable<Object[]> data() {
58      return Arrays.asList(new Object[][] {
59          { 8192, null }, // TODO: why is 8k the default blocksize for these tests?
60          {
61              16 * 1024,
62              new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
63                  28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
64                  128 * 1024 + 1024 } } });
65    }
66  
67    @Parameterized.Parameter(0)
68    public int constructedBlockSize;
69  
70    @Parameterized.Parameter(1)
71    public int[] constructedBlockSizes;
72  
73    BucketCache cache;
74    final int CACHE_SIZE = 1000000;
75    final int NUM_BLOCKS = 100;
76    final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
77    final int NUM_THREADS = 100;
78    final int NUM_QUERIES = 10000;
79  
80    final long capacitySize = 32 * 1024 * 1024;
81    final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
82    final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
83    String ioEngineName = "heap";
84    String persistencePath = null;
85  
86    private class MockedBucketCache extends BucketCache {
87  
88      public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
89          int writerThreads, int writerQLen, String persistencePath) throws FileNotFoundException,
90          IOException {
91        super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
92            persistencePath);
93        super.wait_when_cache = true;
94      }
95  
96      @Override
97      public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
98          boolean cacheDataInL1) {
99        if (super.getBlock(cacheKey, true, false, true) != null) {
100         throw new RuntimeException("Cached an already cached block");
101       }
102       super.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1);
103     }
104 
105     @Override
106     public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
107       if (super.getBlock(cacheKey, true, false, true) != null) {
108         throw new RuntimeException("Cached an already cached block");
109       }
110       super.cacheBlock(cacheKey, buf);
111     }
112   }
113 
114   @Before
115   public void setup() throws FileNotFoundException, IOException {
116     cache =
117         new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
118             constructedBlockSizes, writeThreads, writerQLen, persistencePath);
119   }
120 
121   @After
122   public void tearDown() {
123     cache.shutdown();
124   }
125 
126   /**
127    * Return a random element from {@code a}.
128    */
129   private static <T> T randFrom(List<T> a) {
130     return a.get(RAND.nextInt(a.size()));
131   }
132 
133   @Test
134   public void testBucketAllocator() throws BucketAllocatorException {
135     BucketAllocator mAllocator = cache.getAllocator();
136     /*
137      * Test the allocator first
138      */
139     final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
140 
141     boolean full = false;
142     ArrayList<Long> allocations = new ArrayList<Long>();
143     // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
144     // the cache is completely filled.
145     List<Integer> tmp = new ArrayList<Integer>(BLOCKSIZES);
146     while (!full) {
147       Integer blockSize = null;
148       try {
149         blockSize = randFrom(tmp);
150         allocations.add(mAllocator.allocateBlock(blockSize));
151       } catch (CacheFullException cfe) {
152         tmp.remove(blockSize);
153         if (tmp.isEmpty()) full = true;
154       }
155     }
156 
157     for (Integer blockSize : BLOCKSIZES) {
158       BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
159       IndexStatistics indexStatistics = bucketSizeInfo.statistics();
160       assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
161     }
162 
163     for (long offset : allocations) {
164       assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset));
165     }
166     assertEquals(0, mAllocator.getUsedSize());
167   }
168 
169   @Test
170   public void testCacheSimple() throws Exception {
171     CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
172   }
173 
174   @Test
175   public void testCacheMultiThreadedSingleKey() throws Exception {
176     CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * NUM_QUERIES);
177   }
178 
179   @Test
180   public void testHeapSizeChanges() throws Exception {
181     cache.stopWriterThreads();
182     CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
183   }
184 
185   // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
186   // threads will flush it to the bucket and put reference entry in backingMap.
187   private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
188       Cacheable block) throws InterruptedException {
189     cache.cacheBlock(cacheKey, block);
190     while (!cache.backingMap.containsKey(cacheKey)) {
191       Thread.sleep(100);
192     }
193   }
194 
195   @Test
196   public void testMemoryLeak() throws Exception {
197     final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
198     cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
199         new byte[10]));
200     long lockId = cache.backingMap.get(cacheKey).offset();
201     ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
202     lock.writeLock().lock();
203     Thread evictThread = new Thread("evict-block") {
204 
205       @Override
206       public void run() {
207         cache.evictBlock(cacheKey);
208       }
209 
210     };
211     evictThread.start();
212     cache.offsetLock.waitForWaiters(lockId, 1);
213     cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
214     cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
215         new byte[10]));
216     lock.writeLock().unlock();
217     evictThread.join();
218     assertEquals(1L, cache.getBlockCount());
219     assertTrue(cache.getCurrentSize() > 0L);
220     assertTrue("We should have a block!", cache.iterator().hasNext());
221   }
222 }