View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.nio.ByteBuffer;
29  import java.util.Arrays;
30  import java.util.HashSet;
31  import java.util.Random;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.MultithreadedTestUtil;
38  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
39  import org.apache.hadoop.hbase.io.HeapSize;
40  import org.apache.hadoop.hbase.io.compress.Compression;
41  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
42  import org.apache.hadoop.hbase.util.ChecksumType;
43  
44  public class CacheTestUtils {
45  
46    private static final boolean includesMemstoreTS = true;
47  
48    /**
49     * Just checks if heapsize grows when something is cached, and gets smaller
50     * when the same object is evicted
51     */
52  
53    public static void testHeapSizeChanges(final BlockCache toBeTested,
54        final int blockSize) {
55      HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
56      long heapSize = ((HeapSize) toBeTested).heapSize();
57      toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
58  
59      /*When we cache something HeapSize should always increase */
60      assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
61  
62      toBeTested.evictBlock(blocks[0].blockName);
63  
64      /*Post eviction, heapsize should be the same */
65      assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
66    }
67    public static void testCacheMultiThreaded(final BlockCache toBeTested,
68        final int blockSize, final int numThreads, final int numQueries,
69        final double passingScore) throws Exception {
70  
71      Configuration conf = new Configuration();
72      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
73          conf);
74  
75      final AtomicInteger totalQueries = new AtomicInteger();
76      final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<HFileBlockPair>();
77      final AtomicInteger hits = new AtomicInteger();
78      final AtomicInteger miss = new AtomicInteger();
79  
80      HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
81      blocksToTest.addAll(Arrays.asList(blocks));
82  
83      for (int i = 0; i < numThreads; i++) {
84        TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
85          @Override
86          public void doAnAction() throws Exception {
87            if (!blocksToTest.isEmpty()) {
88              HFileBlockPair ourBlock = blocksToTest.poll();
89              // if we run out of blocks to test, then we should stop the tests.
90              if (ourBlock == null) {
91                ctx.setStopFlag(true);
92                return;
93              }
94              toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
95              Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
96                  false, false, true);
97              if (retrievedBlock != null) {
98                assertEquals(ourBlock.block, retrievedBlock);
99                toBeTested.evictBlock(ourBlock.blockName);
100               hits.incrementAndGet();
101               assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
102             } else {
103               miss.incrementAndGet();
104             }
105             totalQueries.incrementAndGet();
106           }
107         }
108       };
109       t.setDaemon(true);
110       ctx.addThread(t);
111     }
112     ctx.startThreads();
113     while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
114       Thread.sleep(10);
115     }
116     ctx.stop();
117     if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
118       fail("Too many nulls returned. Hits: " + hits.get() + " Misses: "
119           + miss.get());
120     }
121   }
122 
123   public static void testCacheSimple(BlockCache toBeTested, int blockSize,
124       int numBlocks) throws Exception {
125 
126     HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize);
127     // Confirm empty
128     for (HFileBlockPair block : blocks) {
129       assertNull(toBeTested.getBlock(block.blockName, true, false, true));
130     }
131 
132     // Add blocks
133     for (HFileBlockPair block : blocks) {
134       toBeTested.cacheBlock(block.blockName, block.block);
135     }
136 
137     // Check if all blocks are properly cached and contain the right
138     // information, or the blocks are null.
139     // MapMaker makes no guarantees when it will evict, so neither can we.
140 
141     for (HFileBlockPair block : blocks) {
142       HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
143       if (buf != null) {
144         assertEquals(block.block, buf);
145       }
146 
147     }
148 
149     // Re-add some duplicate blocks. Hope nothing breaks.
150 
151     for (HFileBlockPair block : blocks) {
152       try {
153         if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
154           toBeTested.cacheBlock(block.blockName, block.block);
155           if (!(toBeTested instanceof BucketCache)) {
156             // BucketCache won't throw exception when caching already cached
157             // block
158             fail("Cache should not allow re-caching a block");
159           }
160         }
161       } catch (RuntimeException re) {
162         // expected
163       }
164     }
165 
166   }
167 
168   public static void hammerSingleKey(final BlockCache toBeTested,
169       int BlockSize, int numThreads, int numQueries) throws Exception {
170     final BlockCacheKey key = new BlockCacheKey("key", 0);
171     final byte[] buf = new byte[5 * 1024];
172     Arrays.fill(buf, (byte) 5);
173 
174     final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
175     Configuration conf = new Configuration();
176     MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
177         conf);
178 
179     final AtomicInteger totalQueries = new AtomicInteger();
180     toBeTested.cacheBlock(key, bac);
181 
182     for (int i = 0; i < numThreads; i++) {
183       TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
184         @Override
185         public void doAnAction() throws Exception {
186           ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
187               .getBlock(key, false, false, true);
188           if (returned != null) {
189             assertArrayEquals(buf, returned.buf);
190           } else {
191             Thread.sleep(10);
192           }
193           totalQueries.incrementAndGet();
194         }
195       };
196 
197       t.setDaemon(true);
198       ctx.addThread(t);
199     }
200 
201     // add a thread to periodically evict and re-cache the block
202     final long blockEvictPeriod = 50;
203     TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
204       @Override
205       public void doAnAction() throws Exception {
206         toBeTested.evictBlock(key);
207         toBeTested.cacheBlock(key, bac);
208         Thread.sleep(blockEvictPeriod);
209       }
210     };
211     t.setDaemon(true);
212     ctx.addThread(t);
213 
214     ctx.startThreads();
215     while (totalQueries.get() < numQueries && ctx.shouldRun()) {
216       Thread.sleep(10);
217     }
218     ctx.stop();
219   }
220 
221   public static void hammerEviction(final BlockCache toBeTested, int BlockSize,
222       int numThreads, int numQueries) throws Exception {
223 
224     Configuration conf = new Configuration();
225     MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
226         conf);
227 
228     final AtomicInteger totalQueries = new AtomicInteger();
229 
230     for (int i = 0; i < numThreads; i++) {
231       final int finalI = i;
232 
233       final byte[] buf = new byte[5 * 1024];
234       TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
235         @Override
236         public void doAnAction() throws Exception {
237           for (int j = 0; j < 100; j++) {
238             BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0);
239             Arrays.fill(buf, (byte) (finalI * j));
240             final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
241 
242             ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested
243                 .getBlock(key, true, false, true);
244             if (gotBack != null) {
245               assertArrayEquals(gotBack.buf, bac.buf);
246             } else {
247               toBeTested.cacheBlock(key, bac);
248             }
249           }
250           totalQueries.incrementAndGet();
251         }
252       };
253 
254       t.setDaemon(true);
255       ctx.addThread(t);
256     }
257 
258     ctx.startThreads();
259     while (totalQueries.get() < numQueries && ctx.shouldRun()) {
260       Thread.sleep(10);
261     }
262     ctx.stop();
263 
264     assertTrue(toBeTested.getStats().getEvictedCount() > 0);
265   }
266 
267   public static class ByteArrayCacheable implements Cacheable {
268 
269     static final CacheableDeserializer<Cacheable> blockDeserializer =
270       new CacheableDeserializer<Cacheable>() {
271 
272       @Override
273       public Cacheable deserialize(ByteBuffer b) throws IOException {
274         int len = b.getInt();
275         Thread.yield();
276         byte buf[] = new byte[len];
277         b.get(buf);
278         return new ByteArrayCacheable(buf);
279       }
280 
281       @Override
282       public int getDeserialiserIdentifier() {
283         return deserializerIdentifier;
284       }
285 
286       @Override
287       public Cacheable deserialize(ByteBuffer b, boolean reuse)
288           throws IOException {
289         return deserialize(b);
290       }
291     };
292 
293     final byte[] buf;
294 
295     public ByteArrayCacheable(byte[] buf) {
296       this.buf = buf;
297     }
298 
299     @Override
300     public long heapSize() {
301       return 4 + buf.length;
302     }
303 
304     @Override
305     public int getSerializedLength() {
306       return 4 + buf.length;
307     }
308 
309     @Override
310     public void serialize(ByteBuffer destination) {
311       destination.putInt(buf.length);
312       Thread.yield();
313       destination.put(buf);
314       destination.rewind();
315     }
316 
317     @Override
318     public CacheableDeserializer<Cacheable> getDeserializer() {
319       return blockDeserializer;
320     }
321 
322     private static final int deserializerIdentifier;
323     static {
324       deserializerIdentifier = CacheableDeserializerIdManager
325           .registerDeserializer(blockDeserializer);
326     }
327 
328     @Override
329     public BlockType getBlockType() {
330       return BlockType.DATA;
331     }
332   }
333 
334 
335   private static HFileBlockPair[] generateHFileBlocks(int blockSize,
336       int numBlocks) {
337     HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
338     Random rand = new Random();
339     HashSet<String> usedStrings = new HashSet<String>();
340     for (int i = 0; i < numBlocks; i++) {
341 
342       // The buffer serialized size needs to match the size of BlockSize. So we
343       // declare our data size to be smaller than it by the serialization space
344       // required.
345 
346       ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize
347           - HFileBlock.EXTRA_SERIALIZATION_SPACE);
348       rand.nextBytes(cachedBuffer.array());
349       cachedBuffer.rewind();
350       int onDiskSizeWithoutHeader = blockSize
351           - HFileBlock.EXTRA_SERIALIZATION_SPACE;
352       int uncompressedSizeWithoutHeader = blockSize
353           - HFileBlock.EXTRA_SERIALIZATION_SPACE;
354       long prevBlockOffset = rand.nextLong();
355       BlockType.DATA.write(cachedBuffer);
356       cachedBuffer.putInt(onDiskSizeWithoutHeader);
357       cachedBuffer.putInt(uncompressedSizeWithoutHeader);
358       cachedBuffer.putLong(prevBlockOffset);
359       cachedBuffer.rewind();
360       HFileContext meta = new HFileContextBuilder()
361                           .withHBaseCheckSum(false)
362                           .withIncludesMvcc(includesMemstoreTS)
363                           .withIncludesTags(false)
364                           .withCompression(Compression.Algorithm.NONE)
365                           .withBytesPerCheckSum(0)
366                           .withChecksumType(ChecksumType.NULL)
367                           .build();
368       HFileBlock generated = new HFileBlock(BlockType.DATA,
369           onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
370           prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
371           blockSize,
372           onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, meta);
373 
374       String strKey;
375       /* No conflicting keys */
376       for (strKey = new Long(rand.nextLong()).toString(); !usedStrings
377           .add(strKey); strKey = new Long(rand.nextLong()).toString())
378         ;
379 
380       returnedBlocks[i] = new HFileBlockPair();
381       returnedBlocks[i].blockName = new BlockCacheKey(strKey, 0);
382       returnedBlocks[i].block = generated;
383     }
384     return returnedBlocks;
385   }
386 
387   private static class HFileBlockPair {
388     BlockCacheKey blockName;
389     HFileBlock block;
390   }
391 }