1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.util.Collections;
25 import java.util.Map;
26 import java.util.TreeMap;
27
28 import org.apache.hadoop.hbase.client.Admin;
29 import org.apache.hadoop.hbase.client.Table;
30 import org.apache.hadoop.hbase.util.ByteStringer;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HColumnDescriptor;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.HBaseAdmin;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.Put;
42 import org.apache.hadoop.hbase.client.coprocessor.Batch;
43 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
44 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
45 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
46 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos;
47 import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.junit.AfterClass;
50 import org.junit.BeforeClass;
51 import org.junit.Test;
52 import org.junit.experimental.categories.Category;
53
54 import com.google.protobuf.ServiceException;
55
56
57
58
59 @Category(MediumTests.class)
60 public class TestBatchCoprocessorEndpoint {
61 private static final Log LOG = LogFactory.getLog(TestBatchCoprocessorEndpoint.class);
62
63 private static final TableName TEST_TABLE =
64 TableName.valueOf("TestTable");
65 private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
66 private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
67 private static byte[] ROW = Bytes.toBytes("testRow");
68
69 private static final int ROWSIZE = 20;
70 private static final int rowSeperator1 = 5;
71 private static final int rowSeperator2 = 12;
72 private static byte[][] ROWS = makeN(ROW, ROWSIZE);
73
74 private static HBaseTestingUtility util = new HBaseTestingUtility();
75
76 @BeforeClass
77 public static void setupBeforeClass() throws Exception {
78
79 Configuration conf = util.getConfiguration();
80 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
81 org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
82 ProtobufCoprocessorService.class.getName(),
83 ColumnAggregationEndpointWithErrors.class.getName(),
84 ColumnAggregationEndpointNullResponse.class.getName());
85 conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
86 ProtobufCoprocessorService.class.getName());
87 util.startMiniCluster(2);
88 Admin admin = new HBaseAdmin(conf);
89 HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
90 desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
91 admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
92 util.waitUntilAllRegionsAssigned(TEST_TABLE);
93 admin.close();
94
95 Table table = new HTable(conf, TEST_TABLE);
96 for (int i = 0; i < ROWSIZE; i++) {
97 Put put = new Put(ROWS[i]);
98 put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
99 table.put(put);
100 }
101 table.close();
102 }
103
104 @AfterClass
105 public static void tearDownAfterClass() throws Exception {
106 util.shutdownMiniCluster();
107 }
108
109 @Test
110 public void testAggregationNullResponse() throws Throwable {
111 Table table = new HTable(util.getConfiguration(), TEST_TABLE);
112 ColumnAggregationWithNullResponseProtos.SumRequest.Builder builder =
113 ColumnAggregationWithNullResponseProtos.SumRequest
114 .newBuilder();
115 builder.setFamily(ByteStringer.wrap(TEST_FAMILY));
116 if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
117 builder.setQualifier(ByteStringer.wrap(TEST_QUALIFIER));
118 }
119 Map<byte[], ColumnAggregationWithNullResponseProtos.SumResponse> results =
120 table.batchCoprocessorService(
121 ColumnAggregationServiceNullResponse.getDescriptor().findMethodByName("sum"),
122 builder.build(), ROWS[0], ROWS[ROWS.length - 1],
123 ColumnAggregationWithNullResponseProtos.SumResponse.getDefaultInstance());
124
125 int sumResult = 0;
126 int expectedResult = 0;
127 for (Map.Entry<byte[], ColumnAggregationWithNullResponseProtos.SumResponse> e :
128 results.entrySet()) {
129 LOG.info("Got value " + e.getValue().getSum() + " for region "
130 + Bytes.toStringBinary(e.getKey()));
131 sumResult += e.getValue().getSum();
132 }
133 for (int i = 0; i < rowSeperator2; i++) {
134 expectedResult += i;
135 }
136 assertEquals("Invalid result", expectedResult, sumResult);
137 table.close();
138 }
139
140 private static byte[][] makeN(byte[] base, int n) {
141 byte[][] ret = new byte[n][];
142 for (int i = 0; i < n; i++) {
143 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
144 }
145 return ret;
146 }
147
148 private Map<byte[], SumResponse> sum(final Table table, final byte[] family,
149 final byte[] qualifier, final byte[] start, final byte[] end) throws ServiceException,
150 Throwable {
151 ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest
152 .newBuilder();
153 builder.setFamily(ByteStringer.wrap(family));
154 if (qualifier != null && qualifier.length > 0) {
155 builder.setQualifier(ByteStringer.wrap(qualifier));
156 }
157 return table.batchCoprocessorService(
158 ColumnAggregationProtos.ColumnAggregationService.getDescriptor().findMethodByName("sum"),
159 builder.build(), start, end, ColumnAggregationProtos.SumResponse.getDefaultInstance());
160 }
161
162 @Test
163 public void testAggregationWithReturnValue() throws Throwable {
164 Table table = new HTable(util.getConfiguration(), TEST_TABLE);
165 Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
166 ROWS[ROWS.length - 1]);
167 int sumResult = 0;
168 int expectedResult = 0;
169 for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
170 LOG.info("Got value " + e.getValue().getSum() + " for region "
171 + Bytes.toStringBinary(e.getKey()));
172 sumResult += e.getValue().getSum();
173 }
174 for (int i = 0; i < ROWSIZE; i++) {
175 expectedResult += i;
176 }
177 assertEquals("Invalid result", expectedResult, sumResult);
178
179 results.clear();
180
181
182 results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1],
183 ROWS[ROWS.length - 1]);
184 sumResult = 0;
185 expectedResult = 0;
186 for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
187 LOG.info("Got value " + e.getValue().getSum() + " for region "
188 + Bytes.toStringBinary(e.getKey()));
189 sumResult += e.getValue().getSum();
190 }
191 for (int i = rowSeperator1; i < ROWSIZE; i++) {
192 expectedResult += i;
193 }
194 assertEquals("Invalid result", expectedResult, sumResult);
195 table.close();
196 }
197
198 @Test
199 public void testAggregation() throws Throwable {
200 Table table = new HTable(util.getConfiguration(), TEST_TABLE);
201 Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
202 ROWS[0], ROWS[ROWS.length - 1]);
203 int sumResult = 0;
204 int expectedResult = 0;
205 for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
206 LOG.info("Got value " + e.getValue().getSum() + " for region "
207 + Bytes.toStringBinary(e.getKey()));
208 sumResult += e.getValue().getSum();
209 }
210 for (int i = 0; i < ROWSIZE; i++) {
211 expectedResult += i;
212 }
213 assertEquals("Invalid result", expectedResult, sumResult);
214
215
216 results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length - 1]);
217 sumResult = 0;
218 expectedResult = 0;
219 for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
220 LOG.info("Got value " + e.getValue().getSum() + " for region "
221 + Bytes.toStringBinary(e.getKey()));
222 sumResult += e.getValue().getSum();
223 }
224 for (int i = rowSeperator1; i < ROWSIZE; i++) {
225 expectedResult += i;
226 }
227 assertEquals("Invalid result", expectedResult, sumResult);
228 table.close();
229 }
230
231 @Test
232 public void testAggregationWithErrors() throws Throwable {
233 Table table = new HTable(util.getConfiguration(), TEST_TABLE);
234 final Map<byte[], ColumnAggregationWithErrorsProtos.SumResponse> results =
235 Collections.synchronizedMap(
236 new TreeMap<byte[], ColumnAggregationWithErrorsProtos.SumResponse>(
237 Bytes.BYTES_COMPARATOR
238 ));
239 ColumnAggregationWithErrorsProtos.SumRequest.Builder builder =
240 ColumnAggregationWithErrorsProtos.SumRequest
241 .newBuilder();
242 builder.setFamily(ByteStringer.wrap(TEST_FAMILY));
243 if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
244 builder.setQualifier(ByteStringer.wrap(TEST_QUALIFIER));
245 }
246
247 boolean hasError = false;
248 try {
249 table.batchCoprocessorService(
250 ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors.getDescriptor()
251 .findMethodByName("sum"),
252 builder.build(), ROWS[0], ROWS[ROWS.length - 1],
253 ColumnAggregationWithErrorsProtos.SumResponse.getDefaultInstance(),
254 new Batch.Callback<ColumnAggregationWithErrorsProtos.SumResponse>() {
255
256 @Override
257 public void update(byte[] region, byte[] row,
258 ColumnAggregationWithErrorsProtos.SumResponse result) {
259 results.put(region, result);
260 }
261 });
262 } catch (Throwable t) {
263 LOG.info("Exceptions in coprocessor service", t);
264 hasError = true;
265 }
266
267 int sumResult = 0;
268 int expectedResult = 0;
269 for (Map.Entry<byte[], ColumnAggregationWithErrorsProtos.SumResponse> e : results.entrySet()) {
270 LOG.info("Got value " + e.getValue().getSum() + " for region "
271 + Bytes.toStringBinary(e.getKey()));
272 sumResult += e.getValue().getSum();
273 }
274 for (int i = 0; i < rowSeperator2; i++) {
275 expectedResult += i;
276 }
277 assertEquals("Invalid result", expectedResult, sumResult);
278 assertTrue(hasError);
279 table.close();
280 }
281 }