1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.encoding;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.fail;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.DataOutputStream;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.List;
33 import java.util.Random;
34 import java.util.concurrent.ConcurrentSkipListSet;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.KeyValueUtil;
41 import org.apache.hadoop.hbase.testclassification.SmallTests;
42 import org.apache.hadoop.hbase.Tag;
43 import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec;
44 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
45 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
46 import org.apache.hadoop.hbase.io.hfile.HFileContext;
47 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
50 import org.junit.Assert;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.junit.experimental.categories.Category;
54 import org.junit.runner.RunWith;
55 import org.junit.runners.Parameterized;
56 import org.junit.runners.Parameterized.Parameters;
57
58
59
60
61 @RunWith(Parameterized.class)
62 @Category(SmallTests.class)
63 public class TestPrefixTreeEncoding {
64 private static final Log LOG = LogFactory.getLog(TestPrefixTreeEncoding.class);
65 private static final String CF = "EncodingTestCF";
66 private static final byte[] CF_BYTES = Bytes.toBytes(CF);
67 private static final int NUM_ROWS_PER_BATCH = 50;
68 private static final int NUM_COLS_PER_ROW = 20;
69
70 private int numBatchesWritten = 0;
71 private ConcurrentSkipListSet<Cell> kvset = new ConcurrentSkipListSet<Cell>(
72 KeyValue.COMPARATOR);
73
74 private static boolean formatRowNum = false;
75
76 @Parameters
77 public static Collection<Object[]> parameters() {
78 List<Object[]> paramList = new ArrayList<Object[]>();
79 {
80 paramList.add(new Object[] { false });
81 paramList.add(new Object[] { true });
82 }
83 return paramList;
84 }
85 private final boolean includesTag;
86 public TestPrefixTreeEncoding(boolean includesTag) {
87 this.includesTag = includesTag;
88 }
89
90 @Before
91 public void setUp() throws Exception {
92 kvset.clear();
93 formatRowNum = false;
94 }
95
96 @Test
97 public void testSeekBeforeWithFixedData() throws Exception {
98 formatRowNum = true;
99 PrefixTreeCodec encoder = new PrefixTreeCodec();
100 int batchId = numBatchesWritten++;
101 HFileContext meta = new HFileContextBuilder()
102 .withHBaseCheckSum(false)
103 .withIncludesMvcc(false)
104 .withIncludesTags(includesTag)
105 .withCompression(Algorithm.NONE).build();
106 HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
107 DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
108 ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
109 DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
110 generateFixedTestData(kvset, batchId, false, includesTag, encoder, blkEncodingCtx,
111 userDataStream);
112 EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
113 encoder.newDataBlockDecodingContext(meta));
114 byte[] onDiskBytes = baosInMemory.toByteArray();
115 ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
116 onDiskBytes.length - DataBlockEncoding.ID_SIZE);
117 seeker.setCurrentBuffer(readBuffer);
118
119
120 KeyValue seekKey = KeyValueUtil.createFirstDeleteFamilyOnRow(getRowKey(batchId, 0), CF_BYTES);
121 seeker.seekToKeyInBlock(
122 new KeyValue.KeyOnlyKeyValue(seekKey.getBuffer(), seekKey.getKeyOffset(), seekKey
123 .getKeyLength()), true);
124 assertEquals(null, seeker.getKeyValue());
125
126
127 seekKey = KeyValueUtil.createFirstDeleteFamilyOnRow(getRowKey(batchId, NUM_ROWS_PER_BATCH / 3),
128 CF_BYTES);
129 seeker.seekToKeyInBlock(
130 new KeyValue.KeyOnlyKeyValue(seekKey.getBuffer(), seekKey.getKeyOffset(), seekKey
131 .getKeyLength()), true);
132 assertNotNull(seeker.getKeyValue());
133 assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH / 3 - 1), seeker.getKeyValue().getRow());
134
135
136 seekKey = KeyValueUtil.createFirstDeleteFamilyOnRow(Bytes.toBytes("zzzz"), CF_BYTES);
137 seeker.seekToKeyInBlock(
138 new KeyValue.KeyOnlyKeyValue(seekKey.getBuffer(), seekKey.getKeyOffset(), seekKey
139 .getKeyLength()), true);
140 assertNotNull(seeker.getKeyValue());
141 assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH - 1), seeker.getKeyValue().getRow());
142 }
143
144 @Test
145 public void testScanWithRandomData() throws Exception {
146 PrefixTreeCodec encoder = new PrefixTreeCodec();
147 ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
148 DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
149 HFileContext meta = new HFileContextBuilder()
150 .withHBaseCheckSum(false)
151 .withIncludesMvcc(false)
152 .withIncludesTags(includesTag)
153 .withCompression(Algorithm.NONE)
154 .build();
155 HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
156 DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
157 generateRandomTestData(kvset, numBatchesWritten++, includesTag, encoder, blkEncodingCtx,
158 userDataStream);
159 EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
160 encoder.newDataBlockDecodingContext(meta));
161 byte[] onDiskBytes = baosInMemory.toByteArray();
162 ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
163 onDiskBytes.length - DataBlockEncoding.ID_SIZE);
164 seeker.setCurrentBuffer(readBuffer);
165 Cell previousKV = null;
166 do {
167 Cell currentKV = seeker.getKeyValue();
168 System.out.println(currentKV);
169 if (previousKV != null && KeyValue.COMPARATOR.compare(currentKV, previousKV) < 0) {
170 dumpInputKVSet();
171 fail("Current kv " + currentKV + " is smaller than previous keyvalue " + previousKV);
172 }
173 if (!includesTag) {
174 assertFalse(currentKV.getTagsLength() > 0);
175 } else {
176 Assert.assertTrue(currentKV.getTagsLength() > 0);
177 }
178 previousKV = currentKV;
179 } while (seeker.next());
180 }
181
182 @Test
183 public void testSeekWithRandomData() throws Exception {
184 PrefixTreeCodec encoder = new PrefixTreeCodec();
185 ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
186 DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
187 int batchId = numBatchesWritten++;
188 HFileContext meta = new HFileContextBuilder()
189 .withHBaseCheckSum(false)
190 .withIncludesMvcc(false)
191 .withIncludesTags(includesTag)
192 .withCompression(Algorithm.NONE)
193 .build();
194 HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
195 DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
196 generateRandomTestData(kvset, batchId, includesTag, encoder, blkEncodingCtx, userDataStream);
197 EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
198 encoder.newDataBlockDecodingContext(meta));
199 byte[] onDiskBytes = baosInMemory.toByteArray();
200 ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
201 onDiskBytes.length - DataBlockEncoding.ID_SIZE);
202 verifySeeking(seeker, readBuffer, batchId);
203 }
204
205 @Test
206 public void testSeekWithFixedData() throws Exception {
207 PrefixTreeCodec encoder = new PrefixTreeCodec();
208 int batchId = numBatchesWritten++;
209 HFileContext meta = new HFileContextBuilder()
210 .withHBaseCheckSum(false)
211 .withIncludesMvcc(false)
212 .withIncludesTags(includesTag)
213 .withCompression(Algorithm.NONE)
214 .build();
215 HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
216 DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
217 ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
218 DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
219 generateFixedTestData(kvset, batchId, includesTag, encoder, blkEncodingCtx, userDataStream);
220 EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
221 encoder.newDataBlockDecodingContext(meta));
222 byte[] onDiskBytes = baosInMemory.toByteArray();
223 ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
224 onDiskBytes.length - DataBlockEncoding.ID_SIZE);
225 verifySeeking(seeker, readBuffer, batchId);
226 }
227
228 private void verifySeeking(EncodedSeeker encodeSeeker,
229 ByteBuffer encodedData, int batchId) {
230 List<KeyValue> kvList = new ArrayList<KeyValue>();
231 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
232 kvList.clear();
233 encodeSeeker.setCurrentBuffer(encodedData);
234 KeyValue firstOnRow = KeyValueUtil.createFirstOnRow(getRowKey(batchId, i));
235 encodeSeeker.seekToKeyInBlock(
236 new KeyValue.KeyOnlyKeyValue(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
237 firstOnRow.getKeyLength()), false);
238 boolean hasMoreOfEncodeScanner = encodeSeeker.next();
239 CollectionBackedScanner collectionScanner = new CollectionBackedScanner(
240 this.kvset);
241 boolean hasMoreOfCollectionScanner = collectionScanner.seek(firstOnRow);
242 if (hasMoreOfEncodeScanner != hasMoreOfCollectionScanner) {
243 dumpInputKVSet();
244 fail("Get error result after seeking " + firstOnRow);
245 }
246 if (hasMoreOfEncodeScanner) {
247 if (KeyValue.COMPARATOR.compare(encodeSeeker.getKeyValue(),
248 collectionScanner.peek()) != 0) {
249 dumpInputKVSet();
250 fail("Expected " + collectionScanner.peek() + " actual "
251 + encodeSeeker.getKeyValue() + ", after seeking " + firstOnRow);
252 }
253 }
254 }
255 }
256
257 private void dumpInputKVSet() {
258 LOG.info("Dumping input keyvalue set in error case:");
259 for (Cell kv : kvset) {
260 System.out.println(kv);
261 }
262 }
263
264 private static void generateFixedTestData(ConcurrentSkipListSet<Cell> kvset, int batchId,
265 boolean useTags, PrefixTreeCodec encoder, HFileBlockEncodingContext blkEncodingCtx,
266 DataOutputStream userDataStream) throws Exception {
267 generateFixedTestData(kvset, batchId, true, useTags, encoder, blkEncodingCtx, userDataStream);
268 }
269
270 private static void generateFixedTestData(ConcurrentSkipListSet<Cell> kvset,
271 int batchId, boolean partial, boolean useTags, PrefixTreeCodec encoder,
272 HFileBlockEncodingContext blkEncodingCtx, DataOutputStream userDataStream) throws Exception {
273 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
274 if (partial && i / 10 % 2 == 1)
275 continue;
276 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
277 if (!useTags) {
278 KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), getValue(
279 batchId, i, j));
280 kvset.add(kv);
281 } else {
282 KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), 0l,
283 getValue(batchId, i, j), new Tag[] { new Tag((byte) 1, "metaValue1") });
284 kvset.add(kv);
285 }
286 }
287 }
288 encoder.startBlockEncoding(blkEncodingCtx, userDataStream);
289 for (Cell kv : kvset) {
290 encoder.encode(kv, blkEncodingCtx, userDataStream);
291 }
292 encoder.endBlockEncoding(blkEncodingCtx, userDataStream, null);
293 }
294
295 private static void generateRandomTestData(ConcurrentSkipListSet<Cell> kvset,
296 int batchId, boolean useTags, PrefixTreeCodec encoder,
297 HFileBlockEncodingContext blkEncodingCtx, DataOutputStream userDataStream) throws Exception {
298 Random random = new Random();
299 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
300 if (random.nextInt(100) < 50)
301 continue;
302 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
303 if (random.nextInt(100) < 50)
304 continue;
305 if (!useTags) {
306 KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), getValue(
307 batchId, i, j));
308 kvset.add(kv);
309 } else {
310 KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), 0l,
311 getValue(batchId, i, j), new Tag[] { new Tag((byte) 1, "metaValue1") });
312 kvset.add(kv);
313 }
314 }
315 }
316 encoder.startBlockEncoding(blkEncodingCtx, userDataStream);
317 for (Cell kv : kvset) {
318 encoder.encode(kv, blkEncodingCtx, userDataStream);
319 }
320 encoder.endBlockEncoding(blkEncodingCtx, userDataStream, null);
321 }
322
323 private static byte[] getRowKey(int batchId, int i) {
324 return Bytes
325 .toBytes("batch" + batchId + "_row" + (formatRowNum ? String.format("%04d", i) : i));
326 }
327
328 private static byte[] getQualifier(int j) {
329 return Bytes.toBytes("colfdfafhfhsdfhsdfh" + j);
330 }
331
332 private static byte[] getValue(int batchId, int i, int j) {
333 return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i)) + "_col" + j);
334 }
335
336 }