View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeSet;
36  
37  import org.apache.commons.lang.ArrayUtils;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellComparator;
42  import org.apache.hadoop.hbase.CellUtil;
43  import org.apache.hadoop.hbase.HBaseTestingUtility;
44  import org.apache.hadoop.hbase.HColumnDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.KeyValueTestUtil;
47  import org.apache.hadoop.hbase.client.Delete;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.client.Scan;
50  import org.apache.hadoop.hbase.io.compress.Compression;
51  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
52  import org.apache.hadoop.hbase.testclassification.MediumTests;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.junit.Test;
55  import org.junit.experimental.categories.Category;
56  import org.junit.runner.RunWith;
57  import org.junit.runners.Parameterized;
58  import org.junit.runners.Parameterized.Parameters;
59  
60  /**
61   * Tests optimized scanning of multiple columns.
62   */
63  @RunWith(Parameterized.class)
64  @Category(MediumTests.class)
65  public class TestMultiColumnScanner {
66  
67    private static final Log LOG = LogFactory.getLog(TestMultiColumnScanner.class);
68  
69    private static final String TABLE_NAME =
70        TestMultiColumnScanner.class.getSimpleName();
71  
72    static final int MAX_VERSIONS = 50;
73  
74    private static final String FAMILY = "CF";
75    private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
76  
77    /**
78     * The size of the column qualifier set used. Increasing this parameter
79     * exponentially increases test time.
80     */
81    private static final int NUM_COLUMNS = 8;
82  
83    private static final int MAX_COLUMN_BIT_MASK = 1 << NUM_COLUMNS - 1;
84    private static final int NUM_FLUSHES = 10;
85    private static final int NUM_ROWS = 20;
86  
87    /** A large value of type long for use as a timestamp */
88    private static final long BIG_LONG = 9111222333444555666L;
89  
90    /**
91     * Timestamps to test with. Cannot use {@link Long#MAX_VALUE} here, because
92     * it will be replaced by an timestamp auto-generated based on the time.
93     */
94    private static final long[] TIMESTAMPS = new long[] { 1, 3, 5,
95        Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1 };
96  
97    /** The probability that a column is skipped in a store file. */
98    private static final double COLUMN_SKIP_IN_STORE_FILE_PROB = 0.7;
99  
100   /** The probability of skipping a column in a single row */
101   private static final double COLUMN_SKIP_IN_ROW_PROB = 0.1;
102 
103   /** The probability of skipping a column everywhere */
104   private static final double COLUMN_SKIP_EVERYWHERE_PROB = 0.1;
105 
106   /** The probability to delete a row/column pair */
107   private static final double DELETE_PROBABILITY = 0.02;
108 
109   private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
110 
111   private final Compression.Algorithm comprAlgo;
112   private final BloomType bloomType;
113   private final DataBlockEncoding dataBlockEncoding;
114 
115   // Some static sanity-checking.
116   static {
117     assertTrue(BIG_LONG > 0.9 * Long.MAX_VALUE); // Guard against typos.
118 
119     // Ensure TIMESTAMPS are sorted.
120     for (int i = 0; i < TIMESTAMPS.length - 1; ++i)
121       assertTrue(TIMESTAMPS[i] < TIMESTAMPS[i + 1]);
122   }
123 
124   @Parameters
125   public static final Collection<Object[]> parameters() {
126     List<Object[]> parameters = new ArrayList<Object[]>();
127     for (Object[] bloomAndCompressionParams :
128         HBaseTestingUtility.BLOOM_AND_COMPRESSION_COMBINATIONS) {
129       for (boolean useDataBlockEncoding : new boolean[]{false, true}) {
130         parameters.add(ArrayUtils.add(bloomAndCompressionParams,
131             useDataBlockEncoding));
132       }
133     }
134     return parameters;
135   }
136 
137   public TestMultiColumnScanner(Compression.Algorithm comprAlgo,
138       BloomType bloomType, boolean useDataBlockEncoding) {
139     this.comprAlgo = comprAlgo;
140     this.bloomType = bloomType;
141     this.dataBlockEncoding = useDataBlockEncoding ? DataBlockEncoding.PREFIX :
142         DataBlockEncoding.NONE;
143   }
144 
145   @Test
146   public void testMultiColumnScanner() throws IOException {
147     HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME,
148         new HColumnDescriptor(FAMILY)
149             .setCompressionType(comprAlgo)
150             .setBloomFilterType(bloomType)
151             .setMaxVersions(MAX_VERSIONS)
152             .setDataBlockEncoding(dataBlockEncoding)
153     );
154     List<String> rows = sequentialStrings("row", NUM_ROWS);
155     List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS);
156     List<KeyValue> kvs = new ArrayList<KeyValue>();
157     Set<String> keySet = new HashSet<String>();
158 
159     // A map from <row>_<qualifier> to the most recent delete timestamp for
160     // that column.
161     Map<String, Long> lastDelTimeMap = new HashMap<String, Long>();
162 
163     Random rand = new Random(29372937L);
164     Set<String> rowQualSkip = new HashSet<String>();
165 
166     // Skip some columns in some rows. We need to test scanning over a set
167     // of columns when some of the columns are not there.
168     for (String row : rows)
169       for (String qual : qualifiers)
170         if (rand.nextDouble() < COLUMN_SKIP_IN_ROW_PROB) {
171           LOG.info("Skipping " + qual + " in row " + row);
172           rowQualSkip.add(rowQualKey(row, qual));
173         }
174 
175     // Also skip some columns in all rows.
176     for (String qual : qualifiers)
177       if (rand.nextDouble() < COLUMN_SKIP_EVERYWHERE_PROB) {
178         LOG.info("Skipping " + qual + " in all rows");
179         for (String row : rows)
180           rowQualSkip.add(rowQualKey(row, qual));
181       }
182 
183     for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) {
184       for (String qual : qualifiers) {
185         // This is where we decide to include or not include this column into
186         // this store file, regardless of row and timestamp.
187         if (rand.nextDouble() < COLUMN_SKIP_IN_STORE_FILE_PROB)
188           continue;
189 
190         byte[] qualBytes = Bytes.toBytes(qual);
191         for (String row : rows) {
192           Put p = new Put(Bytes.toBytes(row));
193           for (long ts : TIMESTAMPS) {
194             String value = createValue(row, qual, ts);
195             KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts,
196                 value);
197             assertEquals(kv.getTimestamp(), ts);
198             p.add(kv);
199             String keyAsString = kv.toString();
200             if (!keySet.contains(keyAsString)) {
201               keySet.add(keyAsString);
202               kvs.add(kv);
203             }
204           }
205           region.put(p);
206 
207           Delete d = new Delete(Bytes.toBytes(row));
208           boolean deletedSomething = false;
209           for (long ts : TIMESTAMPS)
210             if (rand.nextDouble() < DELETE_PROBABILITY) {
211               d.deleteColumns(FAMILY_BYTES, qualBytes, ts);
212               String rowAndQual = row + "_" + qual;
213               Long whenDeleted = lastDelTimeMap.get(rowAndQual);
214               lastDelTimeMap.put(rowAndQual, whenDeleted == null ? ts
215                   : Math.max(ts, whenDeleted));
216               deletedSomething = true;
217             }
218           if (deletedSomething)
219             region.delete(d);
220         }
221       }
222       region.flush(true);
223     }
224 
225     Collections.sort(kvs, KeyValue.COMPARATOR);
226     for (int maxVersions = 1; maxVersions <= TIMESTAMPS.length; ++maxVersions) {
227       for (int columnBitMask = 1; columnBitMask <= MAX_COLUMN_BIT_MASK; ++columnBitMask) {
228         Scan scan = new Scan();
229         scan.setMaxVersions(maxVersions);
230         Set<String> qualSet = new TreeSet<String>();
231         {
232           int columnMaskTmp = columnBitMask;
233           for (String qual : qualifiers) {
234             if ((columnMaskTmp & 1) != 0) {
235               scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qual));
236               qualSet.add(qual);
237             }
238             columnMaskTmp >>= 1;
239           }
240           assertEquals(0, columnMaskTmp);
241         }
242 
243         InternalScanner scanner = region.getScanner(scan);
244         List<Cell> results = new ArrayList<Cell>();
245 
246         int kvPos = 0;
247         int numResults = 0;
248         String queryInfo = "columns queried: " + qualSet + " (columnBitMask="
249             + columnBitMask + "), maxVersions=" + maxVersions;
250 
251         while (scanner.next(results) || results.size() > 0) {
252           for (Cell kv : results) {
253             while (kvPos < kvs.size()
254                 && !matchesQuery(kvs.get(kvPos), qualSet, maxVersions,
255                     lastDelTimeMap)) {
256               ++kvPos;
257             }
258             String rowQual = getRowQualStr(kv);
259             String deleteInfo = "";
260             Long lastDelTS = lastDelTimeMap.get(rowQual);
261             if (lastDelTS != null) {
262               deleteInfo = "; last timestamp when row/column " + rowQual
263                   + " was deleted: " + lastDelTS;
264             }
265             assertTrue("Scanner returned additional key/value: " + kv + ", "
266                 + queryInfo + deleteInfo + ";", kvPos < kvs.size());
267             assertTrue("Scanner returned wrong key/value; " + queryInfo
268                 + deleteInfo + ";", CellComparator.equalsIgnoreMvccVersion(kvs.get(kvPos), (kv)));
269             ++kvPos;
270             ++numResults;
271           }
272           results.clear();
273         }
274         for (; kvPos < kvs.size(); ++kvPos) {
275           KeyValue remainingKV = kvs.get(kvPos);
276           assertFalse("Matching column not returned by scanner: "
277               + remainingKV + ", " + queryInfo + ", results returned: "
278               + numResults, matchesQuery(remainingKV, qualSet, maxVersions,
279               lastDelTimeMap));
280         }
281       }
282     }
283     assertTrue("This test is supposed to delete at least some row/column " +
284         "pairs", lastDelTimeMap.size() > 0);
285     LOG.info("Number of row/col pairs deleted at least once: " +
286        lastDelTimeMap.size());
287     HRegion.closeHRegion(region);
288   }
289 
290   private static String getRowQualStr(Cell kv) {
291     String rowStr = Bytes.toString(CellUtil.cloneRow(kv));
292     String qualStr = Bytes.toString(CellUtil.cloneQualifier(kv));
293     return rowStr + "_" + qualStr;
294   }
295 
296   private static boolean matchesQuery(KeyValue kv, Set<String> qualSet,
297       int maxVersions, Map<String, Long> lastDelTimeMap) {
298     Long lastDelTS = lastDelTimeMap.get(getRowQualStr(kv));
299     long ts = kv.getTimestamp();
300     return qualSet.contains(qualStr(kv))
301         && ts >= TIMESTAMPS[TIMESTAMPS.length - maxVersions]
302         && (lastDelTS == null || ts > lastDelTS);
303   }
304 
305   private static String qualStr(KeyValue kv) {
306     return Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(),
307         kv.getQualifierLength());
308   }
309 
310   private static String rowQualKey(String row, String qual) {
311     return row + "_" + qual;
312   }
313 
314   static String createValue(String row, String qual, long ts) {
315     return "value_for_" + row + "_" + qual + "_" + ts;
316   }
317 
318   private static List<String> sequentialStrings(String prefix, int n) {
319     List<String> lst = new ArrayList<String>();
320     for (int i = 0; i < n; ++i) {
321       StringBuilder sb = new StringBuilder();
322       sb.append(prefix + i);
323 
324       // Make column length depend on i.
325       int iBitShifted = i;
326       while (iBitShifted != 0) {
327         sb.append((iBitShifted & 1) == 0 ? 'a' : 'b');
328         iBitShifted >>= 1;
329       }
330 
331       lst.add(sb.toString());
332     }
333 
334     return lst;
335   }
336 
337 
338 }
339