1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.TreeSet;
36
37 import org.apache.commons.lang.ArrayUtils;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellComparator;
42 import org.apache.hadoop.hbase.CellUtil;
43 import org.apache.hadoop.hbase.HBaseTestingUtility;
44 import org.apache.hadoop.hbase.HColumnDescriptor;
45 import org.apache.hadoop.hbase.KeyValue;
46 import org.apache.hadoop.hbase.KeyValueTestUtil;
47 import org.apache.hadoop.hbase.client.Delete;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.client.Scan;
50 import org.apache.hadoop.hbase.io.compress.Compression;
51 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
52 import org.apache.hadoop.hbase.testclassification.MediumTests;
53 import org.apache.hadoop.hbase.util.Bytes;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56 import org.junit.runner.RunWith;
57 import org.junit.runners.Parameterized;
58 import org.junit.runners.Parameterized.Parameters;
59
60
61
62
63 @RunWith(Parameterized.class)
64 @Category(MediumTests.class)
65 public class TestMultiColumnScanner {
66
67 private static final Log LOG = LogFactory.getLog(TestMultiColumnScanner.class);
68
69 private static final String TABLE_NAME =
70 TestMultiColumnScanner.class.getSimpleName();
71
72 static final int MAX_VERSIONS = 50;
73
74 private static final String FAMILY = "CF";
75 private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
76
77
78
79
80
81 private static final int NUM_COLUMNS = 8;
82
83 private static final int MAX_COLUMN_BIT_MASK = 1 << NUM_COLUMNS - 1;
84 private static final int NUM_FLUSHES = 10;
85 private static final int NUM_ROWS = 20;
86
87
88 private static final long BIG_LONG = 9111222333444555666L;
89
90
91
92
93
94 private static final long[] TIMESTAMPS = new long[] { 1, 3, 5,
95 Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1 };
96
97
98 private static final double COLUMN_SKIP_IN_STORE_FILE_PROB = 0.7;
99
100
101 private static final double COLUMN_SKIP_IN_ROW_PROB = 0.1;
102
103
104 private static final double COLUMN_SKIP_EVERYWHERE_PROB = 0.1;
105
106
107 private static final double DELETE_PROBABILITY = 0.02;
108
109 private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
110
111 private final Compression.Algorithm comprAlgo;
112 private final BloomType bloomType;
113 private final DataBlockEncoding dataBlockEncoding;
114
115
116 static {
117 assertTrue(BIG_LONG > 0.9 * Long.MAX_VALUE);
118
119
120 for (int i = 0; i < TIMESTAMPS.length - 1; ++i)
121 assertTrue(TIMESTAMPS[i] < TIMESTAMPS[i + 1]);
122 }
123
124 @Parameters
125 public static final Collection<Object[]> parameters() {
126 List<Object[]> parameters = new ArrayList<Object[]>();
127 for (Object[] bloomAndCompressionParams :
128 HBaseTestingUtility.BLOOM_AND_COMPRESSION_COMBINATIONS) {
129 for (boolean useDataBlockEncoding : new boolean[]{false, true}) {
130 parameters.add(ArrayUtils.add(bloomAndCompressionParams,
131 useDataBlockEncoding));
132 }
133 }
134 return parameters;
135 }
136
137 public TestMultiColumnScanner(Compression.Algorithm comprAlgo,
138 BloomType bloomType, boolean useDataBlockEncoding) {
139 this.comprAlgo = comprAlgo;
140 this.bloomType = bloomType;
141 this.dataBlockEncoding = useDataBlockEncoding ? DataBlockEncoding.PREFIX :
142 DataBlockEncoding.NONE;
143 }
144
145 @Test
146 public void testMultiColumnScanner() throws IOException {
147 HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME,
148 new HColumnDescriptor(FAMILY)
149 .setCompressionType(comprAlgo)
150 .setBloomFilterType(bloomType)
151 .setMaxVersions(MAX_VERSIONS)
152 .setDataBlockEncoding(dataBlockEncoding)
153 );
154 List<String> rows = sequentialStrings("row", NUM_ROWS);
155 List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS);
156 List<KeyValue> kvs = new ArrayList<KeyValue>();
157 Set<String> keySet = new HashSet<String>();
158
159
160
161 Map<String, Long> lastDelTimeMap = new HashMap<String, Long>();
162
163 Random rand = new Random(29372937L);
164 Set<String> rowQualSkip = new HashSet<String>();
165
166
167
168 for (String row : rows)
169 for (String qual : qualifiers)
170 if (rand.nextDouble() < COLUMN_SKIP_IN_ROW_PROB) {
171 LOG.info("Skipping " + qual + " in row " + row);
172 rowQualSkip.add(rowQualKey(row, qual));
173 }
174
175
176 for (String qual : qualifiers)
177 if (rand.nextDouble() < COLUMN_SKIP_EVERYWHERE_PROB) {
178 LOG.info("Skipping " + qual + " in all rows");
179 for (String row : rows)
180 rowQualSkip.add(rowQualKey(row, qual));
181 }
182
183 for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) {
184 for (String qual : qualifiers) {
185
186
187 if (rand.nextDouble() < COLUMN_SKIP_IN_STORE_FILE_PROB)
188 continue;
189
190 byte[] qualBytes = Bytes.toBytes(qual);
191 for (String row : rows) {
192 Put p = new Put(Bytes.toBytes(row));
193 for (long ts : TIMESTAMPS) {
194 String value = createValue(row, qual, ts);
195 KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts,
196 value);
197 assertEquals(kv.getTimestamp(), ts);
198 p.add(kv);
199 String keyAsString = kv.toString();
200 if (!keySet.contains(keyAsString)) {
201 keySet.add(keyAsString);
202 kvs.add(kv);
203 }
204 }
205 region.put(p);
206
207 Delete d = new Delete(Bytes.toBytes(row));
208 boolean deletedSomething = false;
209 for (long ts : TIMESTAMPS)
210 if (rand.nextDouble() < DELETE_PROBABILITY) {
211 d.deleteColumns(FAMILY_BYTES, qualBytes, ts);
212 String rowAndQual = row + "_" + qual;
213 Long whenDeleted = lastDelTimeMap.get(rowAndQual);
214 lastDelTimeMap.put(rowAndQual, whenDeleted == null ? ts
215 : Math.max(ts, whenDeleted));
216 deletedSomething = true;
217 }
218 if (deletedSomething)
219 region.delete(d);
220 }
221 }
222 region.flush(true);
223 }
224
225 Collections.sort(kvs, KeyValue.COMPARATOR);
226 for (int maxVersions = 1; maxVersions <= TIMESTAMPS.length; ++maxVersions) {
227 for (int columnBitMask = 1; columnBitMask <= MAX_COLUMN_BIT_MASK; ++columnBitMask) {
228 Scan scan = new Scan();
229 scan.setMaxVersions(maxVersions);
230 Set<String> qualSet = new TreeSet<String>();
231 {
232 int columnMaskTmp = columnBitMask;
233 for (String qual : qualifiers) {
234 if ((columnMaskTmp & 1) != 0) {
235 scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qual));
236 qualSet.add(qual);
237 }
238 columnMaskTmp >>= 1;
239 }
240 assertEquals(0, columnMaskTmp);
241 }
242
243 InternalScanner scanner = region.getScanner(scan);
244 List<Cell> results = new ArrayList<Cell>();
245
246 int kvPos = 0;
247 int numResults = 0;
248 String queryInfo = "columns queried: " + qualSet + " (columnBitMask="
249 + columnBitMask + "), maxVersions=" + maxVersions;
250
251 while (scanner.next(results) || results.size() > 0) {
252 for (Cell kv : results) {
253 while (kvPos < kvs.size()
254 && !matchesQuery(kvs.get(kvPos), qualSet, maxVersions,
255 lastDelTimeMap)) {
256 ++kvPos;
257 }
258 String rowQual = getRowQualStr(kv);
259 String deleteInfo = "";
260 Long lastDelTS = lastDelTimeMap.get(rowQual);
261 if (lastDelTS != null) {
262 deleteInfo = "; last timestamp when row/column " + rowQual
263 + " was deleted: " + lastDelTS;
264 }
265 assertTrue("Scanner returned additional key/value: " + kv + ", "
266 + queryInfo + deleteInfo + ";", kvPos < kvs.size());
267 assertTrue("Scanner returned wrong key/value; " + queryInfo
268 + deleteInfo + ";", CellComparator.equalsIgnoreMvccVersion(kvs.get(kvPos), (kv)));
269 ++kvPos;
270 ++numResults;
271 }
272 results.clear();
273 }
274 for (; kvPos < kvs.size(); ++kvPos) {
275 KeyValue remainingKV = kvs.get(kvPos);
276 assertFalse("Matching column not returned by scanner: "
277 + remainingKV + ", " + queryInfo + ", results returned: "
278 + numResults, matchesQuery(remainingKV, qualSet, maxVersions,
279 lastDelTimeMap));
280 }
281 }
282 }
283 assertTrue("This test is supposed to delete at least some row/column " +
284 "pairs", lastDelTimeMap.size() > 0);
285 LOG.info("Number of row/col pairs deleted at least once: " +
286 lastDelTimeMap.size());
287 HRegion.closeHRegion(region);
288 }
289
290 private static String getRowQualStr(Cell kv) {
291 String rowStr = Bytes.toString(CellUtil.cloneRow(kv));
292 String qualStr = Bytes.toString(CellUtil.cloneQualifier(kv));
293 return rowStr + "_" + qualStr;
294 }
295
296 private static boolean matchesQuery(KeyValue kv, Set<String> qualSet,
297 int maxVersions, Map<String, Long> lastDelTimeMap) {
298 Long lastDelTS = lastDelTimeMap.get(getRowQualStr(kv));
299 long ts = kv.getTimestamp();
300 return qualSet.contains(qualStr(kv))
301 && ts >= TIMESTAMPS[TIMESTAMPS.length - maxVersions]
302 && (lastDelTS == null || ts > lastDelTS);
303 }
304
305 private static String qualStr(KeyValue kv) {
306 return Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(),
307 kv.getQualifierLength());
308 }
309
310 private static String rowQualKey(String row, String qual) {
311 return row + "_" + qual;
312 }
313
314 static String createValue(String row, String qual, long ts) {
315 return "value_for_" + row + "_" + qual + "_" + ts;
316 }
317
318 private static List<String> sequentialStrings(String prefix, int n) {
319 List<String> lst = new ArrayList<String>();
320 for (int i = 0; i < n; ++i) {
321 StringBuilder sb = new StringBuilder();
322 sb.append(prefix + i);
323
324
325 int iBitShifted = i;
326 while (iBitShifted != 0) {
327 sb.append((iBitShifted & 1) == 0 ? 'a' : 'b');
328 iBitShifted >>= 1;
329 }
330
331 lst.add(sb.toString());
332 }
333
334 return lst;
335 }
336
337
338 }
339