1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.util.Collections;
29 import java.util.Map;
30 import java.util.NavigableMap;
31 import java.util.TreeMap;
32
33 import org.apache.hadoop.hbase.client.Admin;
34 import org.apache.hadoop.hbase.client.Table;
35 import org.apache.hadoop.hbase.util.ByteStringer;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.HTableDescriptor;
44 import org.apache.hadoop.hbase.testclassification.MediumTests;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.HTable;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.client.coprocessor.Batch;
50 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
51 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
52 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
53 import org.apache.hadoop.hbase.ipc.ServerRpcController;
54 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
55 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.junit.AfterClass;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.junit.experimental.categories.Category;
61
62 import com.google.protobuf.RpcController;
63 import com.google.protobuf.ServiceException;
64
65
66
67
68 @Category(MediumTests.class)
69 public class TestCoprocessorEndpoint {
70 private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
71
72 private static final TableName TEST_TABLE =
73 TableName.valueOf("TestCoprocessorEndpoint");
74 private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
75 private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
76 private static byte[] ROW = Bytes.toBytes("testRow");
77
78 private static final int ROWSIZE = 20;
79 private static final int rowSeperator1 = 5;
80 private static final int rowSeperator2 = 12;
81 private static byte[][] ROWS = makeN(ROW, ROWSIZE);
82
83 private static HBaseTestingUtility util = new HBaseTestingUtility();
84
85 @BeforeClass
86 public static void setupBeforeClass() throws Exception {
87
88 Configuration conf = util.getConfiguration();
89 conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000);
90 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
91 org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
92 ProtobufCoprocessorService.class.getName());
93 conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
94 ProtobufCoprocessorService.class.getName());
95 util.startMiniCluster(2);
96
97 Admin admin = util.getHBaseAdmin();
98 HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
99 desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
100 admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
101 util.waitUntilAllRegionsAssigned(TEST_TABLE);
102
103 Table table = new HTable(conf, TEST_TABLE);
104 for (int i = 0; i < ROWSIZE; i++) {
105 Put put = new Put(ROWS[i]);
106 put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
107 table.put(put);
108 }
109 table.close();
110 }
111
112 @AfterClass
113 public static void tearDownAfterClass() throws Exception {
114 util.shutdownMiniCluster();
115 }
116
117 private Map<byte [], Long> sum(final Table table, final byte [] family,
118 final byte [] qualifier, final byte [] start, final byte [] end)
119 throws ServiceException, Throwable {
120 return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
121 start, end,
122 new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
123 @Override
124 public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
125 throws IOException {
126 BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
127 new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
128 ColumnAggregationProtos.SumRequest.Builder builder =
129 ColumnAggregationProtos.SumRequest.newBuilder();
130 builder.setFamily(ByteStringer.wrap(family));
131 if (qualifier != null && qualifier.length > 0) {
132 builder.setQualifier(ByteStringer.wrap(qualifier));
133 }
134 instance.sum(null, builder.build(), rpcCallback);
135 return rpcCallback.get().getSum();
136 }
137 });
138 }
139
140 @Test
141 public void testAggregation() throws Throwable {
142 Table table = new HTable(util.getConfiguration(), TEST_TABLE);
143 Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
144 ROWS[0], ROWS[ROWS.length-1]);
145 int sumResult = 0;
146 int expectedResult = 0;
147 for (Map.Entry<byte[], Long> e : results.entrySet()) {
148 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
149 sumResult += e.getValue();
150 }
151 for (int i = 0; i < ROWSIZE; i++) {
152 expectedResult += i;
153 }
154 assertEquals("Invalid result", expectedResult, sumResult);
155
156 results.clear();
157
158
159 results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
160 ROWS[rowSeperator1], ROWS[ROWS.length-1]);
161 sumResult = 0;
162 expectedResult = 0;
163 for (Map.Entry<byte[], Long> e : results.entrySet()) {
164 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
165 sumResult += e.getValue();
166 }
167 for (int i = rowSeperator1; i < ROWSIZE; i++) {
168 expectedResult += i;
169 }
170 assertEquals("Invalid result", expectedResult, sumResult);
171 table.close();
172 }
173
174 @Test
175 public void testCoprocessorService() throws Throwable {
176 HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
177 NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
178
179 final TestProtos.EchoRequestProto request =
180 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
181 final Map<byte[], String> results = Collections.synchronizedMap(
182 new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
183 try {
184
185 final RpcController controller = new ServerRpcController();
186 table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
187 ROWS[0], ROWS[ROWS.length - 1],
188 new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
189 public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
190 throws IOException {
191 LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
192 BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
193 instance.echo(controller, request, callback);
194 TestProtos.EchoResponseProto response = callback.get();
195 LOG.debug("Batch.Call returning result " + response);
196 return response;
197 }
198 },
199 new Batch.Callback<TestProtos.EchoResponseProto>() {
200 public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
201 assertNotNull(result);
202 assertEquals("hello", result.getMessage());
203 results.put(region, result.getMessage());
204 }
205 }
206 );
207 for (Map.Entry<byte[], String> e : results.entrySet()) {
208 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
209 }
210 assertEquals(3, results.size());
211 for (HRegionInfo info : regions.navigableKeySet()) {
212 LOG.info("Region info is "+info.getRegionNameAsString());
213 assertTrue(results.containsKey(info.getRegionName()));
214 }
215 results.clear();
216
217
218 table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
219 ROWS[rowSeperator1], ROWS[ROWS.length - 1],
220 new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
221 public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
222 throws IOException {
223 LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
224 BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
225 instance.echo(controller, request, callback);
226 TestProtos.EchoResponseProto response = callback.get();
227 LOG.debug("Batch.Call returning result " + response);
228 return response;
229 }
230 },
231 new Batch.Callback<TestProtos.EchoResponseProto>() {
232 public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
233 assertNotNull(result);
234 assertEquals("hello", result.getMessage());
235 results.put(region, result.getMessage());
236 }
237 }
238 );
239 for (Map.Entry<byte[], String> e : results.entrySet()) {
240 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
241 }
242 assertEquals(2, results.size());
243 } finally {
244 table.close();
245 }
246 }
247
248 @Test
249 public void testCoprocessorServiceNullResponse() throws Throwable {
250 HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
251 NavigableMap<HRegionInfo,ServerName> regions = table.getRegionLocations();
252
253 final TestProtos.EchoRequestProto request =
254 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
255 try {
256
257 final RpcController controller = new ServerRpcController();
258
259 Map<byte[], String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
260 ROWS[0], ROWS[ROWS.length - 1],
261 new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
262 public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
263 throws IOException {
264 BlockingRpcCallback<TestProtos.EchoResponseProto> callback = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
265 instance.echo(controller, request, callback);
266 TestProtos.EchoResponseProto response = callback.get();
267 LOG.debug("Batch.Call got result " + response);
268 return null;
269 }
270 }
271 );
272 for (Map.Entry<byte[], String> e : results.entrySet()) {
273 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
274 }
275 assertEquals(3, results.size());
276 for (HRegionInfo info : regions.navigableKeySet()) {
277 LOG.info("Region info is "+info.getRegionNameAsString());
278 assertTrue(results.containsKey(info.getRegionName()));
279 assertNull(results.get(info.getRegionName()));
280 }
281 } finally {
282 table.close();
283 }
284 }
285
286 @Test
287 public void testMasterCoprocessorService() throws Throwable {
288 Admin admin = util.getHBaseAdmin();
289 final TestProtos.EchoRequestProto request =
290 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
291 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
292 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
293 assertEquals("hello", service.echo(null, request).getMessage());
294 }
295
296 @Test
297 public void testCoprocessorError() throws Exception {
298 Configuration configuration = new Configuration(util.getConfiguration());
299
300 configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
301 Table table = new HTable(configuration, TEST_TABLE);
302
303 try {
304 CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
305
306 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
307 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
308
309 service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
310 fail("Should have thrown an exception");
311 } catch (ServiceException e) {
312 } finally {
313 table.close();
314 }
315 }
316
317 @Test
318 public void testMasterCoprocessorError() throws Throwable {
319 Admin admin = util.getHBaseAdmin();
320 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
321 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
322 try {
323 service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
324 fail("Should have thrown an exception");
325 } catch (ServiceException e) {
326 }
327 }
328
329 private static byte[][] makeN(byte[] base, int n) {
330 byte[][] ret = new byte[n][];
331 for (int i = 0; i < n; i++) {
332 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
333 }
334 return ret;
335 }
336
337 }
338