View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.coprocessor.example;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellUtil;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HTableDescriptor;
34  import org.apache.hadoop.hbase.testclassification.MediumTests;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.client.Table;
41  import org.apache.hadoop.hbase.client.coprocessor.Batch;
42  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
43  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
44  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.Builder;
45  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
46  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
47  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
48  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
49  import org.apache.hadoop.hbase.filter.FilterList;
50  import org.apache.hadoop.hbase.filter.FilterList.Operator;
51  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
52  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
53  import org.apache.hadoop.hbase.ipc.ServerRpcController;
54  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.junit.experimental.categories.Category;
57  
58  @Category(MediumTests.class)
59  public class TestBulkDeleteProtocol {
60    private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
61    private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
62    private static final byte[] QUALIFIER1 = Bytes.toBytes("c1");
63    private static final byte[] QUALIFIER2 = Bytes.toBytes("c2");
64    private static final byte[] QUALIFIER3 = Bytes.toBytes("c3");
65    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
66  
67    // @Ignore @BeforeClass
68    public static void setupBeforeClass() throws Exception {
69      TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
70          BulkDeleteEndpoint.class.getName());
71      TEST_UTIL.startMiniCluster(2);
72    }
73  
74    // @Ignore @AfterClass
75    public static void tearDownAfterClass() throws Exception {
76      TEST_UTIL.shutdownMiniCluster();
77    }
78  
79    // @Ignore @Test
80    public void testBulkDeleteEndpoint() throws Throwable {
81      TableName tableName = TableName.valueOf("testBulkDeleteEndpoint");
82      Table ht = createTable(tableName);
83      List<Put> puts = new ArrayList<Put>(100);
84      for (int j = 0; j < 100; j++) {
85        byte[] rowkey = Bytes.toBytes(j);
86        puts.add(createPut(rowkey, "v1"));
87      }
88      ht.put(puts);
89      // Deleting all the rows.
90      long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 5, DeleteType.ROW, null);
91      assertEquals(100, noOfRowsDeleted);
92  
93      int rows = 0;
94      for (Result result : ht.getScanner(new Scan())) {
95        rows++;
96      }
97      assertEquals(0, rows);
98      ht.close();
99    }
100 
101   // @Ignore @Test
102   public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion()
103       throws Throwable {
104     TableName tableName = TableName
105         .valueOf("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion");
106     Table ht = createTable(tableName);
107     List<Put> puts = new ArrayList<Put>(100);
108     for (int j = 0; j < 100; j++) {
109       byte[] rowkey = Bytes.toBytes(j);
110       puts.add(createPut(rowkey, "v1"));
111     }
112     ht.put(puts);
113     // Deleting all the rows.
114     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null);
115     assertEquals(100, noOfRowsDeleted);
116 
117     int rows = 0;
118     for (Result result : ht.getScanner(new Scan())) {
119       rows++;
120     }
121     assertEquals(0, rows);
122     ht.close();
123   }
124 
125   private long invokeBulkDeleteProtocol(TableName tableName, final Scan scan, final int rowBatchSize,
126       final DeleteType deleteType, final Long timeStamp) throws Throwable {
127     Table ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
128     long noOfDeletedRows = 0L;
129     Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
130       new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
131       ServerRpcController controller = new ServerRpcController();
132       BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
133         new BlockingRpcCallback<BulkDeleteResponse>();
134 
135       public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
136         Builder builder = BulkDeleteRequest.newBuilder();
137         builder.setScan(ProtobufUtil.toScan(scan));
138         builder.setDeleteType(deleteType);
139         builder.setRowBatchSize(rowBatchSize);
140         if (timeStamp != null) {
141           builder.setTimestamp(timeStamp);
142         }
143         service.delete(controller, builder.build(), rpcCallback);
144         return rpcCallback.get();
145       }
146     };
147     Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
148         .getStartRow(), scan.getStopRow(), callable);
149     for (BulkDeleteResponse response : result.values()) {
150       noOfDeletedRows += response.getRowsDeleted();
151     }
152     ht.close();
153     return noOfDeletedRows;
154   }
155 
156   // @Ignore @Test
157   public void testBulkDeleteWithConditionBasedDelete() throws Throwable {
158     TableName tableName = TableName.valueOf("testBulkDeleteWithConditionBasedDelete");
159     Table ht = createTable(tableName);
160     List<Put> puts = new ArrayList<Put>(100);
161     for (int j = 0; j < 100; j++) {
162       byte[] rowkey = Bytes.toBytes(j);
163       String value = (j % 10 == 0) ? "v1" : "v2";
164       puts.add(createPut(rowkey, value));
165     }
166     ht.put(puts);
167     Scan scan = new Scan();
168     FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
169     SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3,
170         CompareOp.EQUAL, Bytes.toBytes("v1"));
171     // fl.addFilter(new FirstKeyOnlyFilter());
172     fl.addFilter(scvf);
173     scan.setFilter(fl);
174     // Deleting all the rows where cf1:c1=v1
175     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null);
176     assertEquals(10, noOfRowsDeleted);
177 
178     int rows = 0;
179     for (Result result : ht.getScanner(new Scan())) {
180       rows++;
181     }
182     assertEquals(90, rows);
183     ht.close();
184   }
185 
186   // @Ignore @Test
187   public void testBulkDeleteColumn() throws Throwable {
188     TableName tableName = TableName.valueOf("testBulkDeleteColumn");
189     Table ht = createTable(tableName);
190     List<Put> puts = new ArrayList<Put>(100);
191     for (int j = 0; j < 100; j++) {
192       byte[] rowkey = Bytes.toBytes(j);
193       String value = (j % 10 == 0) ? "v1" : "v2";
194       puts.add(createPut(rowkey, value));
195     }
196     ht.put(puts);
197     Scan scan = new Scan();
198     scan.addColumn(FAMILY1, QUALIFIER2);
199     // Delete the column cf1:col2
200     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null);
201     assertEquals(100, noOfRowsDeleted);
202 
203     int rows = 0;
204     for (Result result : ht.getScanner(new Scan())) {
205       assertEquals(2, result.getFamilyMap(FAMILY1).size());
206       assertTrue(result.getColumnCells(FAMILY1, QUALIFIER2).isEmpty());
207       assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER1).size());
208       assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER3).size());
209       rows++;
210     }
211     assertEquals(100, rows);
212     ht.close();
213   }
214 
215   // @Ignore @Test
216   public void testBulkDeleteFamily() throws Throwable {
217     TableName tableName = TableName.valueOf("testBulkDeleteFamily");
218     HTableDescriptor htd = new HTableDescriptor(tableName);
219     htd.addFamily(new HColumnDescriptor(FAMILY1));
220     htd.addFamily(new HColumnDescriptor(FAMILY2));
221     TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
222     Table ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
223     List<Put> puts = new ArrayList<Put>(100);
224     for (int j = 0; j < 100; j++) {
225       Put put = new Put(Bytes.toBytes(j));
226       put.add(FAMILY1, QUALIFIER1, "v1".getBytes());
227       put.add(FAMILY2, QUALIFIER2, "v2".getBytes());
228       puts.add(put);
229     }
230     ht.put(puts);
231     Scan scan = new Scan();
232     scan.addFamily(FAMILY1);
233     // Delete the column family cf1
234     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null);
235     assertEquals(100, noOfRowsDeleted);
236     int rows = 0;
237     for (Result result : ht.getScanner(new Scan())) {
238       assertTrue(result.getFamilyMap(FAMILY1).isEmpty());
239       assertEquals(1, result.getColumnCells(FAMILY2, QUALIFIER2).size());
240       rows++;
241     }
242     assertEquals(100, rows);
243     ht.close();
244   }
245 
246   // @Ignore @Test
247   public void testBulkDeleteColumnVersion() throws Throwable {
248     TableName tableName = TableName.valueOf("testBulkDeleteColumnVersion");
249     Table ht = createTable(tableName);
250     List<Put> puts = new ArrayList<Put>(100);
251     for (int j = 0; j < 100; j++) {
252       Put put = new Put(Bytes.toBytes(j));
253       byte[] value = "v1".getBytes();
254       put.add(FAMILY1, QUALIFIER1, 1234L, value);
255       put.add(FAMILY1, QUALIFIER2, 1234L, value);
256       put.add(FAMILY1, QUALIFIER3, 1234L, value);
257       // Latest version values
258       value = "v2".getBytes();
259       put.add(FAMILY1, QUALIFIER1, value);
260       put.add(FAMILY1, QUALIFIER2, value);
261       put.add(FAMILY1, QUALIFIER3, value);
262       put.add(FAMILY1, null, value);
263       puts.add(put);
264     }
265     ht.put(puts);
266     Scan scan = new Scan();
267     scan.addFamily(FAMILY1);
268     // Delete the latest version values of all the columns in family cf1.
269     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION,
270         HConstants.LATEST_TIMESTAMP);
271     assertEquals(100, noOfRowsDeleted);
272     int rows = 0;
273     scan = new Scan();
274     scan.setMaxVersions();
275     for (Result result : ht.getScanner(scan)) {
276       assertEquals(3, result.getFamilyMap(FAMILY1).size());
277       List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER1);
278       assertEquals(1, column.size());
279       assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
280 
281       column = result.getColumnCells(FAMILY1, QUALIFIER2);
282       assertEquals(1, column.size());
283       assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
284 
285       column = result.getColumnCells(FAMILY1, QUALIFIER3);
286       assertEquals(1, column.size());
287       assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
288       rows++;
289     }
290     assertEquals(100, rows);
291     ht.close();
292   }
293 
294   // @Ignore @Test
295   public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable {
296     TableName tableName = TableName.valueOf("testBulkDeleteColumnVersionBasedOnTS");
297     Table ht = createTable(tableName);
298     List<Put> puts = new ArrayList<Put>(100);
299     for (int j = 0; j < 100; j++) {
300       Put put = new Put(Bytes.toBytes(j));
301       // TS = 1000L
302       byte[] value = "v1".getBytes();
303       put.add(FAMILY1, QUALIFIER1, 1000L, value);
304       put.add(FAMILY1, QUALIFIER2, 1000L, value);
305       put.add(FAMILY1, QUALIFIER3, 1000L, value);
306       // TS = 1234L
307       value = "v2".getBytes();
308       put.add(FAMILY1, QUALIFIER1, 1234L, value);
309       put.add(FAMILY1, QUALIFIER2, 1234L, value);
310       put.add(FAMILY1, QUALIFIER3, 1234L, value);
311       // Latest version values
312       value = "v3".getBytes();
313       put.add(FAMILY1, QUALIFIER1, value);
314       put.add(FAMILY1, QUALIFIER2, value);
315       put.add(FAMILY1, QUALIFIER3, value);
316       puts.add(put);
317     }
318     ht.put(puts);
319     Scan scan = new Scan();
320     scan.addColumn(FAMILY1, QUALIFIER3);
321     // Delete the column cf1:c3's one version at TS=1234
322     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L);
323     assertEquals(100, noOfRowsDeleted);
324     int rows = 0;
325     scan = new Scan();
326     scan.setMaxVersions();
327     for (Result result : ht.getScanner(scan)) {
328       assertEquals(3, result.getFamilyMap(FAMILY1).size());
329       assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER1).size());
330       assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER2).size());
331       List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER3);
332       assertEquals(2, column.size());
333       assertTrue(CellUtil.matchingValue(column.get(0), "v3".getBytes()));
334       assertTrue(CellUtil.matchingValue(column.get(1), "v1".getBytes()));
335       rows++;
336     }
337     assertEquals(100, rows);
338     ht.close();
339   }
340 
341   // @Ignore @Test
342   public void testBulkDeleteWithNumberOfVersions() throws Throwable {
343     TableName tableName = TableName.valueOf("testBulkDeleteWithNumberOfVersions");
344     Table ht = createTable(tableName);
345     List<Put> puts = new ArrayList<Put>(100);
346     for (int j = 0; j < 100; j++) {
347       Put put = new Put(Bytes.toBytes(j));
348       // TS = 1000L
349       byte[] value = "v1".getBytes();
350       put.add(FAMILY1, QUALIFIER1, 1000L, value);
351       put.add(FAMILY1, QUALIFIER2, 1000L, value);
352       put.add(FAMILY1, QUALIFIER3, 1000L, value);
353       // TS = 1234L
354       value = "v2".getBytes();
355       put.add(FAMILY1, QUALIFIER1, 1234L, value);
356       put.add(FAMILY1, QUALIFIER2, 1234L, value);
357       put.add(FAMILY1, QUALIFIER3, 1234L, value);
358       // TS = 2000L
359       value = "v3".getBytes();
360       put.add(FAMILY1, QUALIFIER1, 2000L, value);
361       put.add(FAMILY1, QUALIFIER2, 2000L, value);
362       put.add(FAMILY1, QUALIFIER3, 2000L, value);
363       // Latest version values
364       value = "v4".getBytes();
365       put.add(FAMILY1, QUALIFIER1, value);
366       put.add(FAMILY1, QUALIFIER2, value);
367       put.add(FAMILY1, QUALIFIER3, value);
368       puts.add(put);
369     }
370     ht.put(puts);
371 
372     // Delete all the versions of columns cf1:c1 and cf1:c2 falling with the time range
373     // [1000,2000)
374     final Scan scan = new Scan();
375     scan.addColumn(FAMILY1, QUALIFIER1);
376     scan.addColumn(FAMILY1, QUALIFIER2);
377     scan.setTimeRange(1000L, 2000L);
378     scan.setMaxVersions();
379 
380     long noOfDeletedRows = 0L;
381     long noOfVersionsDeleted = 0L;
382     Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
383       new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
384       ServerRpcController controller = new ServerRpcController();
385       BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
386         new BlockingRpcCallback<BulkDeleteResponse>();
387 
388       public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
389         Builder builder = BulkDeleteRequest.newBuilder();
390         builder.setScan(ProtobufUtil.toScan(scan));
391         builder.setDeleteType(DeleteType.VERSION);
392         builder.setRowBatchSize(500);
393         service.delete(controller, builder.build(), rpcCallback);
394         return rpcCallback.get();
395       }
396     };
397     Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
398         .getStartRow(), scan.getStopRow(), callable);
399     for (BulkDeleteResponse response : result.values()) {
400       noOfDeletedRows += response.getRowsDeleted();
401       noOfVersionsDeleted += response.getVersionsDeleted();
402     }
403     assertEquals(100, noOfDeletedRows);
404     assertEquals(400, noOfVersionsDeleted);
405 
406     int rows = 0;
407     Scan scan1 = new Scan();
408     scan1.setMaxVersions();
409     for (Result res : ht.getScanner(scan1)) {
410       assertEquals(3, res.getFamilyMap(FAMILY1).size());
411       List<Cell> column = res.getColumnCells(FAMILY1, QUALIFIER1);
412       assertEquals(2, column.size());
413       assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes()));
414       assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes()));
415       column = res.getColumnCells(FAMILY1, QUALIFIER2);
416       assertEquals(2, column.size());
417       assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes()));
418       assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes()));
419       assertEquals(4, res.getColumnCells(FAMILY1, QUALIFIER3).size());
420       rows++;
421     }
422     assertEquals(100, rows);
423     ht.close();
424   }
425 
426   private Table createTable(TableName tableName) throws IOException {
427     HTableDescriptor htd = new HTableDescriptor(tableName);
428     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1);
429     hcd.setMaxVersions(10);// Just setting 10 as I am not testing with more than 10 versions here
430     htd.addFamily(hcd);
431     TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
432     Table ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
433     return ht;
434   }
435 
436   private Put createPut(byte[] rowkey, String value) throws IOException {
437     Put put = new Put(rowkey);
438     put.add(FAMILY1, QUALIFIER1, value.getBytes());
439     put.add(FAMILY1, QUALIFIER2, value.getBytes());
440     put.add(FAMILY1, QUALIFIER3, value.getBytes());
441     return put;
442   }
443 }