View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.util.Collections;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.TreeMap;
32  
33  import org.apache.hadoop.hbase.client.Admin;
34  import org.apache.hadoop.hbase.client.Table;
35  import org.apache.hadoop.hbase.util.ByteStringer;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.testclassification.MediumTests;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.client.HTable;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.client.coprocessor.Batch;
50  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
51  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
52  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
53  import org.apache.hadoop.hbase.ipc.ServerRpcController;
54  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
55  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.junit.AfterClass;
58  import org.junit.BeforeClass;
59  import org.junit.Test;
60  import org.junit.experimental.categories.Category;
61  
62  import com.google.protobuf.RpcController;
63  import com.google.protobuf.ServiceException;
64  
65  /**
66   * TestEndpoint: test cases to verify coprocessor Endpoint
67   */
68  @Category(MediumTests.class)
69  public class TestCoprocessorEndpoint {
70    private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
71  
72    private static final TableName TEST_TABLE =
73        TableName.valueOf("TestCoprocessorEndpoint");
74    private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
75    private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
76    private static byte[] ROW = Bytes.toBytes("testRow");
77  
78    private static final int ROWSIZE = 20;
79    private static final int rowSeperator1 = 5;
80    private static final int rowSeperator2 = 12;
81    private static byte[][] ROWS = makeN(ROW, ROWSIZE);
82  
83    private static HBaseTestingUtility util = new HBaseTestingUtility();
84  
85    @BeforeClass
86    public static void setupBeforeClass() throws Exception {
87      // set configure to indicate which cp should be loaded
88      Configuration conf = util.getConfiguration();
89      conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000);
90      conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
91          org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
92          ProtobufCoprocessorService.class.getName());
93      conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
94          ProtobufCoprocessorService.class.getName());
95      util.startMiniCluster(2);
96  
97      Admin admin = util.getHBaseAdmin();
98      HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
99      desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
100     admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
101     util.waitUntilAllRegionsAssigned(TEST_TABLE);
102 
103     Table table = new HTable(conf, TEST_TABLE);
104     for (int i = 0; i < ROWSIZE; i++) {
105       Put put = new Put(ROWS[i]);
106       put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
107       table.put(put);
108     }
109     table.close();
110   }
111 
112   @AfterClass
113   public static void tearDownAfterClass() throws Exception {
114     util.shutdownMiniCluster();
115   }
116 
117   private Map<byte [], Long> sum(final Table table, final byte [] family,
118       final byte [] qualifier, final byte [] start, final byte [] end)
119   throws ServiceException, Throwable {
120     return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
121         start, end,
122       new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
123         @Override
124         public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
125         throws IOException {
126           BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
127               new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
128           ColumnAggregationProtos.SumRequest.Builder builder =
129             ColumnAggregationProtos.SumRequest.newBuilder();
130           builder.setFamily(ByteStringer.wrap(family));
131           if (qualifier != null && qualifier.length > 0) {
132             builder.setQualifier(ByteStringer.wrap(qualifier));
133           }
134           instance.sum(null, builder.build(), rpcCallback);
135           return rpcCallback.get().getSum();
136         }
137       });
138   }
139 
140   @Test
141   public void testAggregation() throws Throwable {
142     Table table = new HTable(util.getConfiguration(), TEST_TABLE);
143     Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
144       ROWS[0], ROWS[ROWS.length-1]);
145     int sumResult = 0;
146     int expectedResult = 0;
147     for (Map.Entry<byte[], Long> e : results.entrySet()) {
148       LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
149       sumResult += e.getValue();
150     }
151     for (int i = 0; i < ROWSIZE; i++) {
152       expectedResult += i;
153     }
154     assertEquals("Invalid result", expectedResult, sumResult);
155 
156     results.clear();
157 
158     // scan: for region 2 and region 3
159     results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
160       ROWS[rowSeperator1], ROWS[ROWS.length-1]);
161     sumResult = 0;
162     expectedResult = 0;
163     for (Map.Entry<byte[], Long> e : results.entrySet()) {
164       LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
165       sumResult += e.getValue();
166     }
167     for (int i = rowSeperator1; i < ROWSIZE; i++) {
168       expectedResult += i;
169     }
170     assertEquals("Invalid result", expectedResult, sumResult);
171     table.close();
172   }
173 
174   @Test
175   public void testCoprocessorService() throws Throwable {
176     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
177     NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
178 
179     final TestProtos.EchoRequestProto request =
180         TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
181     final Map<byte[], String> results = Collections.synchronizedMap(
182         new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
183     try {
184       // scan: for all regions
185       final RpcController controller = new ServerRpcController();
186       table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
187           ROWS[0], ROWS[ROWS.length - 1],
188           new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
189             public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
190                 throws IOException {
191               LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
192               BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
193               instance.echo(controller, request, callback);
194               TestProtos.EchoResponseProto response = callback.get();
195               LOG.debug("Batch.Call returning result " + response);
196               return response;
197             }
198           },
199           new Batch.Callback<TestProtos.EchoResponseProto>() {
200             public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
201               assertNotNull(result);
202               assertEquals("hello", result.getMessage());
203               results.put(region, result.getMessage());
204             }
205           }
206       );
207       for (Map.Entry<byte[], String> e : results.entrySet()) {
208         LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
209       }
210       assertEquals(3, results.size());
211       for (HRegionInfo info : regions.navigableKeySet()) {
212         LOG.info("Region info is "+info.getRegionNameAsString());
213         assertTrue(results.containsKey(info.getRegionName()));
214       }
215       results.clear();
216 
217       // scan: for region 2 and region 3
218       table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
219           ROWS[rowSeperator1], ROWS[ROWS.length - 1],
220           new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
221             public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
222                 throws IOException {
223               LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
224               BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
225               instance.echo(controller, request, callback);
226               TestProtos.EchoResponseProto response = callback.get();
227               LOG.debug("Batch.Call returning result " + response);
228               return response;
229             }
230           },
231           new Batch.Callback<TestProtos.EchoResponseProto>() {
232             public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
233               assertNotNull(result);
234               assertEquals("hello", result.getMessage());
235               results.put(region, result.getMessage());
236             }
237           }
238       );
239       for (Map.Entry<byte[], String> e : results.entrySet()) {
240         LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
241       }
242       assertEquals(2, results.size());
243     } finally {
244       table.close();
245     }
246   }
247 
248   @Test
249   public void testCoprocessorServiceNullResponse() throws Throwable {
250     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
251     NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
252 
253     final TestProtos.EchoRequestProto request =
254         TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
255     try {
256       // scan: for all regions
257       final RpcController controller = new ServerRpcController();
258       // test that null results are supported
259       Map<byte[], String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
260           ROWS[0], ROWS[ROWS.length - 1],
261           new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
262             public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
263                 throws IOException {
264               BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
265               instance.echo(controller, request, callback);
266               TestProtos.EchoResponseProto response = callback.get();
267               LOG.debug("Batch.Call got result " + response);
268               return null;
269             }
270           }
271       );
272       for (Map.Entry<byte[], String> e : results.entrySet()) {
273         LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
274       }
275       assertEquals(3, results.size());
276       for (HRegionInfo info : regions.navigableKeySet()) {
277         LOG.info("Region info is "+info.getRegionNameAsString());
278         assertTrue(results.containsKey(info.getRegionName()));
279         assertNull(results.get(info.getRegionName()));
280       }
281     } finally {
282       table.close();
283     }
284   }
285 
286   @Test
287   public void testMasterCoprocessorService() throws Throwable {
288     Admin admin = util.getHBaseAdmin();
289     final TestProtos.EchoRequestProto request =
290         TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
291     TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
292         TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
293     assertEquals("hello", service.echo(null, request).getMessage());
294   }
295 
296   @Test
297   public void testCoprocessorError() throws Exception {
298     Configuration configuration = new Configuration(util.getConfiguration());
299     // Make it not retry forever
300     configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
301     Table table = new HTable(configuration, TEST_TABLE);
302 
303     try {
304       CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
305 
306       TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
307           TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
308 
309       service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
310       fail("Should have thrown an exception");
311     } catch (ServiceException e) {
312     } finally {
313       table.close();
314     }
315   }
316 
317   @Test
318   public void testMasterCoprocessorError() throws Throwable {
319     Admin admin = util.getHBaseAdmin();
320     TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
321         TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
322     try {
323       service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
324       fail("Should have thrown an exception");
325     } catch (ServiceException e) {
326     }
327   }
328 
329   private static byte[][] makeN(byte[] base, int n) {
330     byte[][] ret = new byte[n][];
331     for (int i = 0; i < n; i++) {
332       ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
333     }
334     return ret;
335   }
336 
337 }
338