1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.CellScanner;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.HColumnDescriptor;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValueUtil;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.Tag;
40 import org.apache.hadoop.hbase.client.Admin;
41 import org.apache.hadoop.hbase.client.Append;
42 import org.apache.hadoop.hbase.client.Durability;
43 import org.apache.hadoop.hbase.client.HTable;
44 import org.apache.hadoop.hbase.client.Increment;
45 import org.apache.hadoop.hbase.client.Mutation;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.ResultScanner;
49 import org.apache.hadoop.hbase.client.Scan;
50 import org.apache.hadoop.hbase.client.Table;
51 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
52 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
53 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
54 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
55 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
56 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
57 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.junit.After;
60 import org.junit.AfterClass;
61 import org.junit.BeforeClass;
62 import org.junit.Rule;
63 import org.junit.Test;
64 import org.junit.experimental.categories.Category;
65 import org.junit.rules.TestName;
66
67
68
69
70 @Category(MediumTests.class)
71 public class TestTags {
72 static boolean useFilter = false;
73
74 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
75
76 @Rule
77 public final TestName TEST_NAME = new TestName();
78
79 @BeforeClass
80 public static void setUpBeforeClass() throws Exception {
81 Configuration conf = TEST_UTIL.getConfiguration();
82 conf.setInt("hfile.format.version", 3);
83 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
84 TestCoprocessorForTags.class.getName());
85 TEST_UTIL.startMiniCluster(1, 2);
86 }
87
88 @AfterClass
89 public static void tearDownAfterClass() throws Exception {
90 TEST_UTIL.shutdownMiniCluster();
91 }
92
93 @After
94 public void tearDown() {
95 useFilter = false;
96 }
97
98 @Test
99 public void testTags() throws Exception {
100 Table table = null;
101 try {
102 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
103 byte[] fam = Bytes.toBytes("info");
104 byte[] row = Bytes.toBytes("rowa");
105
106 byte[] qual = Bytes.toBytes("qual");
107
108 byte[] row1 = Bytes.toBytes("rowb");
109
110 byte[] row2 = Bytes.toBytes("rowc");
111
112 HTableDescriptor desc = new HTableDescriptor(tableName);
113 HColumnDescriptor colDesc = new HColumnDescriptor(fam);
114 colDesc.setBlockCacheEnabled(true);
115
116 colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
117 desc.addFamily(colDesc);
118 Admin admin = TEST_UTIL.getHBaseAdmin();
119 admin.createTable(desc);
120 byte[] value = Bytes.toBytes("value");
121 table = new HTable(TEST_UTIL.getConfiguration(), tableName);
122 Put put = new Put(row);
123 put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
124 put.setAttribute("visibility", Bytes.toBytes("myTag"));
125 table.put(put);
126 admin.flush(tableName);
127
128
129
130
131
132 Thread.sleep(1000);
133
134 Put put1 = new Put(row1);
135 byte[] value1 = Bytes.toBytes("1000dfsdf");
136 put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
137
138 table.put(put1);
139 admin.flush(tableName);
140 Thread.sleep(1000);
141
142 Put put2 = new Put(row2);
143 byte[] value2 = Bytes.toBytes("1000dfsdf");
144 put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
145 put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
146 table.put(put2);
147 admin.flush(tableName);
148 Thread.sleep(1000);
149
150 result(fam, row, qual, row2, table, value, value2, row1, value1);
151
152 admin.compact(tableName);
153 while (admin.getCompactionState(tableName) != CompactionState.NONE) {
154 Thread.sleep(10);
155 }
156 result(fam, row, qual, row2, table, value, value2, row1, value1);
157 } finally {
158 if (table != null) {
159 table.close();
160 }
161 }
162 }
163
164 @Test
165 public void testFlushAndCompactionWithoutTags() throws Exception {
166 Table table = null;
167 try {
168 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
169 byte[] fam = Bytes.toBytes("info");
170 byte[] row = Bytes.toBytes("rowa");
171
172 byte[] qual = Bytes.toBytes("qual");
173
174 byte[] row1 = Bytes.toBytes("rowb");
175
176 byte[] row2 = Bytes.toBytes("rowc");
177
178 HTableDescriptor desc = new HTableDescriptor(tableName);
179 HColumnDescriptor colDesc = new HColumnDescriptor(fam);
180 colDesc.setBlockCacheEnabled(true);
181
182 colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
183 desc.addFamily(colDesc);
184 Admin admin = TEST_UTIL.getHBaseAdmin();
185 admin.createTable(desc);
186
187 table = new HTable(TEST_UTIL.getConfiguration(), tableName);
188 Put put = new Put(row);
189 byte[] value = Bytes.toBytes("value");
190 put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
191 table.put(put);
192 admin.flush(tableName);
193
194
195
196
197
198 Thread.sleep(1000);
199
200 Put put1 = new Put(row1);
201 byte[] value1 = Bytes.toBytes("1000dfsdf");
202 put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
203 table.put(put1);
204 admin.flush(tableName);
205 Thread.sleep(1000);
206
207 Put put2 = new Put(row2);
208 byte[] value2 = Bytes.toBytes("1000dfsdf");
209 put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
210 table.put(put2);
211 admin.flush(tableName);
212 Thread.sleep(1000);
213
214 Scan s = new Scan(row);
215 ResultScanner scanner = table.getScanner(s);
216 try {
217 Result[] next = scanner.next(3);
218 for (Result result : next) {
219 CellScanner cellScanner = result.cellScanner();
220 cellScanner.advance();
221 KeyValue current = (KeyValue) cellScanner.current();
222 assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
223 }
224 } finally {
225 if (scanner != null)
226 scanner.close();
227 }
228 admin.compact(tableName);
229 while (admin.getCompactionState(tableName) != CompactionState.NONE) {
230 Thread.sleep(10);
231 }
232 s = new Scan(row);
233 scanner = table.getScanner(s);
234 try {
235 Result[] next = scanner.next(3);
236 for (Result result : next) {
237 CellScanner cellScanner = result.cellScanner();
238 cellScanner.advance();
239 KeyValue current = (KeyValue) cellScanner.current();
240 assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
241 }
242 } finally {
243 if (scanner != null) {
244 scanner.close();
245 }
246 }
247 } finally {
248 if (table != null) {
249 table.close();
250 }
251 }
252 }
253
254 @Test
255 public void testFlushAndCompactionwithCombinations() throws Exception {
256 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
257 byte[] fam = Bytes.toBytes("info");
258 byte[] row = Bytes.toBytes("rowa");
259
260 byte[] qual = Bytes.toBytes("qual");
261
262 byte[] row1 = Bytes.toBytes("rowb");
263
264 byte[] row2 = Bytes.toBytes("rowc");
265 byte[] rowd = Bytes.toBytes("rowd");
266 byte[] rowe = Bytes.toBytes("rowe");
267 Table table = null;
268 for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
269 HTableDescriptor desc = new HTableDescriptor(tableName);
270 HColumnDescriptor colDesc = new HColumnDescriptor(fam);
271 colDesc.setBlockCacheEnabled(true);
272 colDesc.setDataBlockEncoding(encoding);
273 desc.addFamily(colDesc);
274 Admin admin = TEST_UTIL.getHBaseAdmin();
275 admin.createTable(desc);
276 try {
277 table = new HTable(TEST_UTIL.getConfiguration(), tableName);
278 Put put = new Put(row);
279 byte[] value = Bytes.toBytes("value");
280 put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
281 int bigTagLen = Short.MAX_VALUE - 5;
282 put.setAttribute("visibility", new byte[bigTagLen]);
283 table.put(put);
284 Put put1 = new Put(row1);
285 byte[] value1 = Bytes.toBytes("1000dfsdf");
286 put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
287 table.put(put1);
288 admin.flush(tableName);
289
290
291
292
293
294 Thread.sleep(1000);
295
296 put1 = new Put(row2);
297 value1 = Bytes.toBytes("1000dfsdf");
298 put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
299 table.put(put1);
300 admin.flush(tableName);
301 Thread.sleep(1000);
302
303 Put put2 = new Put(rowd);
304 byte[] value2 = Bytes.toBytes("1000dfsdf");
305 put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
306 table.put(put2);
307 put2 = new Put(rowe);
308 value2 = Bytes.toBytes("1000dfsddfdf");
309 put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
310 put.setAttribute("visibility", Bytes.toBytes("ram"));
311 table.put(put2);
312 admin.flush(tableName);
313 Thread.sleep(1000);
314
315 TestCoprocessorForTags.checkTagPresence = true;
316 Scan s = new Scan(row);
317 s.setCaching(1);
318 ResultScanner scanner = table.getScanner(s);
319 try {
320 Result next = null;
321 while ((next = scanner.next()) != null) {
322 CellScanner cellScanner = next.cellScanner();
323 cellScanner.advance();
324 KeyValue current = (KeyValue) cellScanner.current();
325 if (CellUtil.matchingRow(current, row)) {
326 assertEquals(1, TestCoprocessorForTags.tags.size());
327 Tag tag = TestCoprocessorForTags.tags.get(0);
328 assertEquals(bigTagLen, tag.getTagLength());
329 } else {
330 assertEquals(0, TestCoprocessorForTags.tags.size());
331 }
332 }
333 } finally {
334 if (scanner != null) {
335 scanner.close();
336 }
337 TestCoprocessorForTags.checkTagPresence = false;
338 }
339 while (admin.getCompactionState(tableName) != CompactionState.NONE) {
340 Thread.sleep(10);
341 }
342 TestCoprocessorForTags.checkTagPresence = true;
343 scanner = table.getScanner(s);
344 try {
345 Result next = null;
346 while ((next = scanner.next()) != null) {
347 CellScanner cellScanner = next.cellScanner();
348 cellScanner.advance();
349 KeyValue current = (KeyValue) cellScanner.current();
350 if (CellUtil.matchingRow(current, row)) {
351 assertEquals(1, TestCoprocessorForTags.tags.size());
352 Tag tag = TestCoprocessorForTags.tags.get(0);
353 assertEquals(bigTagLen, tag.getTagLength());
354 } else {
355 assertEquals(0, TestCoprocessorForTags.tags.size());
356 }
357 }
358 } finally {
359 if (scanner != null) {
360 scanner.close();
361 }
362 TestCoprocessorForTags.checkTagPresence = false;
363 }
364 } finally {
365 if (table != null) {
366 table.close();
367 }
368
369 admin.disableTable(tableName);
370 admin.deleteTable(tableName);
371 }
372 }
373 }
374
375 @Test
376 public void testTagsWithAppendAndIncrement() throws Exception {
377 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
378 byte[] f = Bytes.toBytes("f");
379 byte[] q = Bytes.toBytes("q");
380 byte[] row1 = Bytes.toBytes("r1");
381 byte[] row2 = Bytes.toBytes("r2");
382
383 HTableDescriptor desc = new HTableDescriptor(tableName);
384 HColumnDescriptor colDesc = new HColumnDescriptor(f);
385 desc.addFamily(colDesc);
386 TEST_UTIL.getHBaseAdmin().createTable(desc);
387
388 Table table = null;
389 try {
390 table = new HTable(TEST_UTIL.getConfiguration(), tableName);
391 Put put = new Put(row1);
392 byte[] v = Bytes.toBytes(2L);
393 put.add(f, q, v);
394 put.setAttribute("visibility", Bytes.toBytes("tag1"));
395 table.put(put);
396 Increment increment = new Increment(row1);
397 increment.addColumn(f, q, 1L);
398 table.increment(increment);
399 TestCoprocessorForTags.checkTagPresence = true;
400 ResultScanner scanner = table.getScanner(new Scan());
401 Result result = scanner.next();
402 KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
403 List<Tag> tags = TestCoprocessorForTags.tags;
404 assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
405 assertEquals(1, tags.size());
406 assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
407 TestCoprocessorForTags.checkTagPresence = false;
408 TestCoprocessorForTags.tags = null;
409
410 increment = new Increment(row1);
411 increment.add(new KeyValue(row1, f, q, 1234L, v));
412 increment.setAttribute("visibility", Bytes.toBytes("tag2"));
413 table.increment(increment);
414 TestCoprocessorForTags.checkTagPresence = true;
415 scanner = table.getScanner(new Scan());
416 result = scanner.next();
417 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
418 tags = TestCoprocessorForTags.tags;
419 assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
420 assertEquals(2, tags.size());
421
422 List<String> tagValues = new ArrayList<String>();
423 for (Tag tag: tags) {
424 tagValues.add(Bytes.toString(tag.getValue()));
425 }
426 assertTrue(tagValues.contains("tag1"));
427 assertTrue(tagValues.contains("tag2"));
428 TestCoprocessorForTags.checkTagPresence = false;
429 TestCoprocessorForTags.tags = null;
430
431 put = new Put(row2);
432 v = Bytes.toBytes(2L);
433 put.add(f, q, v);
434 table.put(put);
435 increment = new Increment(row2);
436 increment.add(new KeyValue(row2, f, q, 1234L, v));
437 increment.setAttribute("visibility", Bytes.toBytes("tag2"));
438 table.increment(increment);
439 Scan scan = new Scan();
440 scan.setStartRow(row2);
441 TestCoprocessorForTags.checkTagPresence = true;
442 scanner = table.getScanner(scan);
443 result = scanner.next();
444 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
445 tags = TestCoprocessorForTags.tags;
446 assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
447 assertEquals(1, tags.size());
448 assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
449 TestCoprocessorForTags.checkTagPresence = false;
450 TestCoprocessorForTags.tags = null;
451
452
453 byte[] row3 = Bytes.toBytes("r3");
454 put = new Put(row3);
455 put.add(f, q, Bytes.toBytes("a"));
456 put.setAttribute("visibility", Bytes.toBytes("tag1"));
457 table.put(put);
458 Append append = new Append(row3);
459 append.add(f, q, Bytes.toBytes("b"));
460 table.append(append);
461 scan = new Scan();
462 scan.setStartRow(row3);
463 TestCoprocessorForTags.checkTagPresence = true;
464 scanner = table.getScanner(scan);
465 result = scanner.next();
466 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
467 tags = TestCoprocessorForTags.tags;
468 assertEquals(1, tags.size());
469 assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
470 TestCoprocessorForTags.checkTagPresence = false;
471 TestCoprocessorForTags.tags = null;
472
473 append = new Append(row3);
474 append.add(new KeyValue(row3, f, q, 1234L, v));
475 append.setAttribute("visibility", Bytes.toBytes("tag2"));
476 table.append(append);
477 TestCoprocessorForTags.checkTagPresence = true;
478 scanner = table.getScanner(scan);
479 result = scanner.next();
480 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
481 tags = TestCoprocessorForTags.tags;
482 assertEquals(2, tags.size());
483
484 tagValues.clear();
485 for (Tag tag: tags) {
486 tagValues.add(Bytes.toString(tag.getValue()));
487 }
488 assertTrue(tagValues.contains("tag1"));
489 assertTrue(tagValues.contains("tag2"));
490 TestCoprocessorForTags.checkTagPresence = false;
491 TestCoprocessorForTags.tags = null;
492
493 byte[] row4 = Bytes.toBytes("r4");
494 put = new Put(row4);
495 put.add(f, q, Bytes.toBytes("a"));
496 table.put(put);
497 append = new Append(row4);
498 append.add(new KeyValue(row4, f, q, 1234L, v));
499 append.setAttribute("visibility", Bytes.toBytes("tag2"));
500 table.append(append);
501 scan = new Scan();
502 scan.setStartRow(row4);
503 TestCoprocessorForTags.checkTagPresence = true;
504 scanner = table.getScanner(scan);
505 result = scanner.next();
506 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
507 tags = TestCoprocessorForTags.tags;
508 assertEquals(1, tags.size());
509 assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
510 } finally {
511 TestCoprocessorForTags.checkTagPresence = false;
512 TestCoprocessorForTags.tags = null;
513 if (table != null) {
514 table.close();
515 }
516 }
517 }
518
519 private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value,
520 byte[] value2, byte[] row1, byte[] value1) throws IOException {
521 Scan s = new Scan(row);
522
523
524
525 s.setAttribute("visibility", Bytes.toBytes("myTag"));
526 ResultScanner scanner = null;
527 try {
528 scanner = table.getScanner(s);
529 Result next = scanner.next();
530
531 assertTrue(Bytes.equals(next.getRow(), row));
532 assertTrue(Bytes.equals(next.getValue(fam, qual), value));
533
534 Result next2 = scanner.next();
535 assertTrue(next2 != null);
536 assertTrue(Bytes.equals(next2.getRow(), row1));
537 assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
538
539 next2 = scanner.next();
540 assertTrue(next2 != null);
541 assertTrue(Bytes.equals(next2.getRow(), row2));
542 assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
543
544 } finally {
545 if (scanner != null)
546 scanner.close();
547 }
548 }
549
550 public static class TestCoprocessorForTags extends BaseRegionObserver {
551
552 public static volatile boolean checkTagPresence = false;
553 public static List<Tag> tags = null;
554
555 @Override
556 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
557 final WALEdit edit, final Durability durability) throws IOException {
558 updateMutationAddingTags(put);
559 }
560
561 private void updateMutationAddingTags(final Mutation m) {
562 byte[] attribute = m.getAttribute("visibility");
563 byte[] cf = null;
564 List<Cell> updatedCells = new ArrayList<Cell>();
565 if (attribute != null) {
566 for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
567 for (Cell cell : edits) {
568 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
569 if (cf == null) {
570 cf = kv.getFamily();
571 }
572 Tag tag = new Tag((byte) 1, attribute);
573 List<Tag> tagList = new ArrayList<Tag>();
574 tagList.add(tag);
575
576 KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0,
577 kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(),
578 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0,
579 kv.getValueLength(), tagList);
580 ((List<Cell>) updatedCells).add(newKV);
581 }
582 }
583 m.getFamilyCellMap().remove(cf);
584
585 m.getFamilyCellMap().put(cf, updatedCells);
586 }
587 }
588
589 @Override
590 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
591 throws IOException {
592 updateMutationAddingTags(increment);
593 return super.preIncrement(e, increment);
594 }
595
596 @Override
597 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
598 throws IOException {
599 updateMutationAddingTags(append);
600 return super.preAppend(e, append);
601 }
602
603 @Override
604 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
605 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
606 if (checkTagPresence) {
607 if (results.size() > 0) {
608
609 Result result = results.get(0);
610 CellScanner cellScanner = result.cellScanner();
611 if (cellScanner.advance()) {
612 Cell cell = cellScanner.current();
613 tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
614 cell.getTagsLength());
615 }
616 }
617 }
618 return hasMore;
619 }
620 }
621 }