1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.junit.Assert.*;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.*;
31 import org.apache.hadoop.hbase.filter.Filter;
32 import org.apache.hadoop.hbase.filter.TimestampsFilter;
33 import org.apache.hadoop.hbase.testclassification.MediumTests;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.junit.After;
36 import org.junit.AfterClass;
37 import org.junit.Before;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 import org.junit.experimental.categories.Category;
41
42
43
44
45
46
47 @Category(MediumTests.class)
48 public class TestTimestampsFilter {
49 private static final Log LOG = LogFactory.getLog(TestTimestampsFilter.class);
50 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
51
52
53
54
55 @BeforeClass
56 public static void setUpBeforeClass() throws Exception {
57 TEST_UTIL.startMiniCluster();
58 }
59
60
61
62
63 @AfterClass
64 public static void tearDownAfterClass() throws Exception {
65 TEST_UTIL.shutdownMiniCluster();
66 }
67
68
69
70
71 @Before
72 public void setUp() throws Exception {
73
74 }
75
76
77
78
79 @After
80 public void tearDown() throws Exception {
81
82 }
83
84
85
86
87
88
89
90
91
92 @Test
93 public void testTimestampsFilter() throws Exception {
94 byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
95 byte [] FAMILY = Bytes.toBytes("event_log");
96 byte [][] FAMILIES = new byte[][] { FAMILY };
97 Cell kvs[];
98
99
100 Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
101
102 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
103 for (int colIdx = 0; colIdx < 5; colIdx++) {
104
105 putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
106
107 putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
108 }
109 }
110
111
112 verifyInsertedValues(ht, FAMILY);
113
114 TEST_UTIL.flush();
115
116
117 verifyInsertedValues(ht, FAMILY);
118
119
120
121 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
122 for (int colIdx = 0; colIdx < 5; colIdx++) {
123 putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
124 putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
125 }
126 }
127
128 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
129 for (int colIdx = 0; colIdx < 5; colIdx++) {
130 kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
131 Arrays.asList(505L, 5L, 105L, 305L, 205L));
132 assertEquals(4, kvs.length);
133 checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
134 checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
135 checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
136 checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
137 }
138 }
139
140
141
142 kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
143 assertEquals(0, kvs == null? 0: kvs.length);
144
145
146
147
148
149 Result[] results = scanNVersions(ht, FAMILY, 0, 4,
150 Arrays.asList(6L, 106L, 306L));
151 assertEquals("# of rows returned from scan", 5, results.length);
152 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
153 kvs = results[rowIdx].rawCells();
154
155
156 assertEquals("Number of KeyValues in result for row:" + rowIdx,
157 3*5, kvs.length);
158 for (int colIdx = 0; colIdx < 5; colIdx++) {
159 int offset = colIdx * 3;
160 checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
161 checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
162 checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
163 }
164 }
165 ht.close();
166 }
167
168 @Test
169 public void testMultiColumns() throws Exception {
170 byte [] TABLE = Bytes.toBytes("testTimestampsFilterMultiColumns");
171 byte [] FAMILY = Bytes.toBytes("event_log");
172 byte [][] FAMILIES = new byte[][] { FAMILY };
173
174
175 Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
176
177 Put p = new Put(Bytes.toBytes("row"));
178 p.add(FAMILY, Bytes.toBytes("column0"), 3, Bytes.toBytes("value0-3"));
179 p.add(FAMILY, Bytes.toBytes("column1"), 3, Bytes.toBytes("value1-3"));
180 p.add(FAMILY, Bytes.toBytes("column2"), 1, Bytes.toBytes("value2-1"));
181 p.add(FAMILY, Bytes.toBytes("column2"), 2, Bytes.toBytes("value2-2"));
182 p.add(FAMILY, Bytes.toBytes("column2"), 3, Bytes.toBytes("value2-3"));
183 p.add(FAMILY, Bytes.toBytes("column3"), 2, Bytes.toBytes("value3-2"));
184 p.add(FAMILY, Bytes.toBytes("column4"), 1, Bytes.toBytes("value4-1"));
185 p.add(FAMILY, Bytes.toBytes("column4"), 2, Bytes.toBytes("value4-2"));
186 p.add(FAMILY, Bytes.toBytes("column4"), 3, Bytes.toBytes("value4-3"));
187 ht.put(p);
188
189 ArrayList<Long> timestamps = new ArrayList<Long>();
190 timestamps.add(new Long(3));
191 TimestampsFilter filter = new TimestampsFilter(timestamps);
192
193 Get g = new Get(Bytes.toBytes("row"));
194 g.setFilter(filter);
195 g.setMaxVersions();
196 g.addColumn(FAMILY, Bytes.toBytes("column2"));
197 g.addColumn(FAMILY, Bytes.toBytes("column4"));
198
199 Result result = ht.get(g);
200 for (Cell kv : result.listCells()) {
201 System.out.println("found row " + Bytes.toString(CellUtil.cloneRow(kv)) +
202 ", column " + Bytes.toString(CellUtil.cloneQualifier(kv)) + ", value "
203 + Bytes.toString(CellUtil.cloneValue(kv)));
204 }
205
206 assertEquals(result.listCells().size(), 2);
207 assertTrue(CellUtil.matchingValue(result.listCells().get(0), Bytes.toBytes("value2-3")));
208 assertTrue(CellUtil.matchingValue(result.listCells().get(1), Bytes.toBytes("value4-3")));
209
210 ht.close();
211 }
212
213
214
215
216
217
218 @Test
219 public void testWithVersionDeletes() throws Exception {
220
221
222 testWithVersionDeletes(false);
223
224
225 testWithVersionDeletes(true);
226 }
227
228 private void testWithVersionDeletes(boolean flushTables) throws IOException {
229 byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
230 (flushTables ? "flush" : "noflush"));
231 byte [] FAMILY = Bytes.toBytes("event_log");
232 byte [][] FAMILIES = new byte[][] { FAMILY };
233
234
235 Table ht = TEST_UTIL.createTable(TableName.valueOf(TABLE), FAMILIES, Integer.MAX_VALUE);
236
237
238 putNVersions(ht, FAMILY, 0, 0, 1, 5);
239
240
241 deleteOneVersion(ht, FAMILY, 0, 0, 4);
242
243 if (flushTables) {
244 TEST_UTIL.flush();
245 }
246
247
248
249 Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
250 assertEquals(3, kvs.length);
251 checkOneCell(kvs[0], FAMILY, 0, 0, 5);
252 checkOneCell(kvs[1], FAMILY, 0, 0, 3);
253 checkOneCell(kvs[2], FAMILY, 0, 0, 2);
254
255 ht.close();
256 }
257
258 private void verifyInsertedValues(Table ht, byte[] cf) throws IOException {
259 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
260 for (int colIdx = 0; colIdx < 5; colIdx++) {
261
262 Cell[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
263 Arrays.asList(5L, 300L, 6L, 80L));
264 assertEquals(4, kvs.length);
265 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
266 checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
267 checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
268 checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
269
270
271 kvs = getNVersions(ht, cf, rowIdx, colIdx,
272 Arrays.asList(101L, 102L));
273 assertEquals(0, kvs == null? 0: kvs.length);
274
275
276 kvs = getNVersions(ht, cf, rowIdx, colIdx,
277 Arrays.asList(1L, 300L, 105L, 70L, 115L));
278 assertEquals(3, kvs.length);
279 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
280 checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
281 checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
282 }
283 }
284 }
285
286
287
288
289
290 private void checkOneCell(Cell kv, byte[] cf,
291 int rowIdx, int colIdx, long ts) {
292
293 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
294
295 assertEquals("Row mismatch which checking: " + ctx,
296 "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv)));
297
298 assertEquals("ColumnFamily mismatch while checking: " + ctx,
299 Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)));
300
301 assertEquals("Column qualifier mismatch while checking: " + ctx,
302 "column:" + colIdx,
303 Bytes.toString(CellUtil.cloneQualifier(kv)));
304
305 assertEquals("Timestamp mismatch while checking: " + ctx,
306 ts, kv.getTimestamp());
307
308 assertEquals("Value mismatch while checking: " + ctx,
309 "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)));
310 }
311
312
313
314
315
316
317 private Cell[] getNVersions(Table ht, byte[] cf, int rowIdx,
318 int colIdx, List<Long> versions)
319 throws IOException {
320 byte row[] = Bytes.toBytes("row:" + rowIdx);
321 byte column[] = Bytes.toBytes("column:" + colIdx);
322 Filter filter = new TimestampsFilter(versions);
323 Get get = new Get(row);
324 get.addColumn(cf, column);
325 get.setFilter(filter);
326 get.setMaxVersions();
327 Result result = ht.get(get);
328
329 return result.rawCells();
330 }
331
332
333
334
335
336 private Result[] scanNVersions(Table ht, byte[] cf, int startRowIdx,
337 int endRowIdx, List<Long> versions)
338 throws IOException {
339 byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
340 byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1);
341 Filter filter = new TimestampsFilter(versions);
342 Scan scan = new Scan(startRow, endRow);
343 scan.setFilter(filter);
344 scan.setMaxVersions();
345 ResultScanner scanner = ht.getScanner(scan);
346 return scanner.next(endRowIdx - startRowIdx + 1);
347 }
348
349
350
351
352
353 private void putNVersions(Table ht, byte[] cf, int rowIdx, int colIdx,
354 long versionStart, long versionEnd)
355 throws IOException {
356 byte row[] = Bytes.toBytes("row:" + rowIdx);
357 byte column[] = Bytes.toBytes("column:" + colIdx);
358 Put put = new Put(row);
359 put.setDurability(Durability.SKIP_WAL);
360
361 for (long idx = versionStart; idx <= versionEnd; idx++) {
362 put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
363 }
364
365 ht.put(put);
366 }
367
368
369
370
371
372 private void deleteOneVersion(Table ht, byte[] cf, int rowIdx,
373 int colIdx, long version)
374 throws IOException {
375 byte row[] = Bytes.toBytes("row:" + rowIdx);
376 byte column[] = Bytes.toBytes("column:" + colIdx);
377 Delete del = new Delete(row);
378 del.deleteColumn(cf, column, version);
379 ht.delete(del);
380 }
381
382 }
383
384