View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile.bucket;
20  
21  import org.apache.hadoop.hbase.testclassification.SmallTests;
22  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
23  import org.apache.hadoop.hbase.io.hfile.Cacheable;
24  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
25  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
26  import org.junit.After;
27  import org.junit.Before;
28  import org.junit.Test;
29  import org.junit.experimental.categories.Category;
30  import org.mockito.Mockito;
31  
32  import java.io.FileNotFoundException;
33  import java.io.IOException;
34  import java.util.ArrayList;
35  import java.util.List;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  import static org.hamcrest.CoreMatchers.is;
40  import static org.junit.Assert.assertEquals;
41  import static org.junit.Assert.assertThat;
42  import static org.junit.Assert.assertTrue;
43  
44  @Category(SmallTests.class)
45  public class TestBucketWriterThread {
46    private BucketCache bc;
47    private BucketCache.WriterThread wt;
48    private BlockingQueue<RAMQueueEntry> q;
49    private Cacheable plainCacheable;
50    private BlockCacheKey plainKey;
51  
52    /** A BucketCache that does not start its writer threads. */
53    private static class MockBucketCache extends BucketCache {
54  
55      public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
56        int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
57        throws FileNotFoundException, IOException {
58        super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
59          persistencePath, ioErrorsTolerationDuration);
60      }
61  
62      @Override
63      protected void startWriterThreads() {
64        // intentional noop
65      }
66    }
67  
68    /**
69     * Set up variables and get BucketCache and WriterThread into state where tests can  manually
70     * control the running of WriterThread and BucketCache is empty.
71     * @throws Exception
72     */
73    @Before
74    public void setUp() throws Exception {
75      // Arbitrary capacity.
76      final int capacity = 16;
77      // Run with one writer thread only. Means there will be one writer queue only too.  We depend
78      // on this in below.
79      final int writerThreadsCount = 1;
80      this.bc = new MockBucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount,
81        capacity, null, 100/*Tolerate ioerrors for 100ms*/);
82      assertEquals(writerThreadsCount, bc.writerThreads.length);
83      assertEquals(writerThreadsCount, bc.writerQueues.size());
84      // Get reference to our single WriterThread instance.
85      this.wt = bc.writerThreads[0];
86      this.q = bc.writerQueues.get(0);
87  
88      wt.disableWriter();
89      this.plainKey = new BlockCacheKey("f", 0);
90      this.plainCacheable = Mockito.mock(Cacheable.class);
91  
92      assertThat(bc.ramCache.isEmpty(), is(true));
93      assertTrue(q.isEmpty());
94    }
95  
96    @After
97    public void tearDown() throws Exception {
98      if (this.bc != null) this.bc.shutdown();
99    }
100 
101   /**
102    * Test non-error case just works.
103    * @throws FileNotFoundException
104    * @throws IOException
105    * @throws InterruptedException
106    */
107   @Test (timeout=30000)
108   public void testNonErrorCase() throws IOException, InterruptedException {
109     bc.cacheBlock(this.plainKey, this.plainCacheable);
110     doDrainOfOneEntry(this.bc, this.wt, this.q);
111   }
112 
113   /**
114    * Pass through a too big entry and ensure it is cleared from queues and ramCache.
115    * Manually run the WriterThread.
116    * @throws InterruptedException
117    */
118   @Test
119   public void testTooBigEntry() throws InterruptedException {
120     Cacheable tooBigCacheable = Mockito.mock(Cacheable.class);
121     Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE);
122     this.bc.cacheBlock(this.plainKey, tooBigCacheable);
123     doDrainOfOneEntry(this.bc, this.wt, this.q);
124   }
125 
126   /**
127    * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then
128    * put it back and process it.
129    * @throws IOException
130    * @throws InterruptedException
131    */
132   @SuppressWarnings("unchecked")
133   @Test (timeout=30000)
134   public void testIOE() throws IOException, InterruptedException {
135     this.bc.cacheBlock(this.plainKey, plainCacheable);
136     RAMQueueEntry rqe = q.remove();
137     RAMQueueEntry spiedRqe = Mockito.spy(rqe);
138     Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
139       writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
140         (UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any());
141     this.q.add(spiedRqe);
142     doDrainOfOneEntry(bc, wt, q);
143     // Cache disabled when ioes w/o ever healing.
144     assertTrue(!bc.isCacheEnabled());
145   }
146 
147   /**
148    * Do Cache full exception
149    * @throws IOException
150    * @throws InterruptedException
151    */
152   @Test (timeout=30000)
153   public void testCacheFullException()
154       throws IOException, InterruptedException {
155     this.bc.cacheBlock(this.plainKey, plainCacheable);
156     RAMQueueEntry rqe = q.remove();
157     RAMQueueEntry spiedRqe = Mockito.spy(rqe);
158     final CacheFullException cfe = new CacheFullException(0, 0);
159     BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
160     Mockito.doThrow(cfe).
161       doReturn(mockedBucketEntry).
162       when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
163         (UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any());
164     this.q.add(spiedRqe);
165     doDrainOfOneEntry(bc, wt, q);
166   }
167 
168   private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt,
169       final BlockingQueue<RAMQueueEntry> q)
170   throws InterruptedException {
171     List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<RAMQueueEntry>(1));
172     wt.doDrain(rqes);
173     assertTrue(q.isEmpty());
174     assertTrue(bc.ramCache.isEmpty());
175     assertEquals(0, bc.heapSize());
176   }
177 }