1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
24 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNull;
28 import static org.junit.Assert.assertTrue;
29 import static org.junit.Assert.fail;
30 import static org.mockito.Mockito.when;
31
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.ConcurrentMap;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.Coprocessor;
44 import org.apache.hadoop.hbase.CoprocessorEnvironment;
45 import org.apache.hadoop.hbase.HBaseTestCase;
46 import org.apache.hadoop.hbase.HBaseTestingUtility;
47 import org.apache.hadoop.hbase.HColumnDescriptor;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.HRegionInfo;
50 import org.apache.hadoop.hbase.HTableDescriptor;
51 import org.apache.hadoop.hbase.Server;
52 import org.apache.hadoop.hbase.testclassification.SmallTests;
53 import org.apache.hadoop.hbase.TableName;
54 import org.apache.hadoop.hbase.client.Get;
55 import org.apache.hadoop.hbase.client.Scan;
56 import org.apache.hadoop.hbase.regionserver.HRegion;
57 import org.apache.hadoop.hbase.regionserver.InternalScanner;
58 import org.apache.hadoop.hbase.regionserver.Region;
59 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
60 import org.apache.hadoop.hbase.regionserver.RegionScanner;
61 import org.apache.hadoop.hbase.regionserver.ScanType;
62 import org.apache.hadoop.hbase.regionserver.ScannerContext;
63 import org.apache.hadoop.hbase.regionserver.SplitTransaction;
64 import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory;
65 import org.apache.hadoop.hbase.regionserver.Store;
66 import org.apache.hadoop.hbase.regionserver.StoreFile;
67 import org.apache.hadoop.hbase.util.PairOfSameType;
68 import org.junit.Rule;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.junit.rules.TestName;
72 import org.mockito.Mockito;
73
74 @Category(SmallTests.class)
75 public class TestCoprocessorInterface {
76 @Rule public TestName name = new TestName();
77 private static final Log LOG = LogFactory.getLog(TestCoprocessorInterface.class);
78 private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
79 static final Path DIR = TEST_UTIL.getDataTestDir();
80
81 private static class CustomScanner implements RegionScanner {
82
83 private RegionScanner delegate;
84
85 public CustomScanner(RegionScanner delegate) {
86 this.delegate = delegate;
87 }
88
89 @Override
90 public boolean next(List<Cell> results) throws IOException {
91 return delegate.next(results);
92 }
93
94 @Override
95 public boolean next(List<Cell> result, ScannerContext scannerContext)
96 throws IOException {
97 return delegate.next(result, scannerContext);
98 }
99
100 @Override
101 public boolean nextRaw(List<Cell> result)
102 throws IOException {
103 return delegate.nextRaw(result);
104 }
105
106 @Override
107 public boolean nextRaw(List<Cell> result, ScannerContext context)
108 throws IOException {
109 return delegate.nextRaw(result, context);
110 }
111
112 @Override
113 public void close() throws IOException {
114 delegate.close();
115 }
116
117 @Override
118 public HRegionInfo getRegionInfo() {
119 return delegate.getRegionInfo();
120 }
121
122 @Override
123 public boolean isFilterDone() throws IOException {
124 return delegate.isFilterDone();
125 }
126
127 @Override
128 public boolean reseek(byte[] row) throws IOException {
129 return false;
130 }
131
132 @Override
133 public long getMaxResultSize() {
134 return delegate.getMaxResultSize();
135 }
136
137 @Override
138 public long getMvccReadPoint() {
139 return delegate.getMvccReadPoint();
140 }
141
142 @Override
143 public int getBatch() {
144 return delegate.getBatch();
145 }
146 }
147
148 public static class CoprocessorImpl extends BaseRegionObserver {
149
150 private boolean startCalled;
151 private boolean stopCalled;
152 private boolean preOpenCalled;
153 private boolean postOpenCalled;
154 private boolean preCloseCalled;
155 private boolean postCloseCalled;
156 private boolean preCompactCalled;
157 private boolean postCompactCalled;
158 private boolean preFlushCalled;
159 private boolean postFlushCalled;
160 private boolean preSplitCalled;
161 private boolean postSplitCalled;
162 private boolean preSplitWithSplitRowCalled;
163 private ConcurrentMap<String, Object> sharedData;
164
165 @Override
166 public void start(CoprocessorEnvironment e) {
167 sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
168
169 sharedData.putIfAbsent("test1", new Object());
170 startCalled = true;
171 }
172
173 @Override
174 public void stop(CoprocessorEnvironment e) {
175 sharedData = null;
176 stopCalled = true;
177 }
178
179 @Override
180 public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
181 preOpenCalled = true;
182 }
183 @Override
184 public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
185 postOpenCalled = true;
186 }
187 @Override
188 public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
189 preCloseCalled = true;
190 }
191 @Override
192 public void postClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
193 postCloseCalled = true;
194 }
195 @Override
196 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
197 Store store, InternalScanner scanner, ScanType scanType) {
198 preCompactCalled = true;
199 return scanner;
200 }
201 @Override
202 public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
203 Store store, StoreFile resultFile) {
204 postCompactCalled = true;
205 }
206 @Override
207 public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
208 preFlushCalled = true;
209 }
210 @Override
211 public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
212 postFlushCalled = true;
213 }
214 @Override
215 public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) {
216 preSplitCalled = true;
217 }
218
219 @Override
220 public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
221 byte[] splitRow) throws IOException {
222 preSplitWithSplitRowCalled = true;
223 }
224 @Override
225 public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, Region l, Region r) {
226 postSplitCalled = true;
227 }
228
229 @Override
230 public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
231 final Scan scan, final RegionScanner s) throws IOException {
232 return new CustomScanner(s);
233 }
234
235 boolean wasStarted() {
236 return startCalled;
237 }
238 boolean wasStopped() {
239 return stopCalled;
240 }
241 boolean wasOpened() {
242 return (preOpenCalled && postOpenCalled);
243 }
244 boolean wasClosed() {
245 return (preCloseCalled && postCloseCalled);
246 }
247 boolean wasFlushed() {
248 return (preFlushCalled && postFlushCalled);
249 }
250 boolean wasCompacted() {
251 return (preCompactCalled && postCompactCalled);
252 }
253 boolean wasSplit() {
254 return (preSplitCalled && postSplitCalled && preSplitWithSplitRowCalled);
255 }
256 Map<String, Object> getSharedData() {
257 return sharedData;
258 }
259 }
260
261 public static class CoprocessorII extends BaseRegionObserver {
262 private ConcurrentMap<String, Object> sharedData;
263 @Override
264 public void start(CoprocessorEnvironment e) {
265 sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
266 sharedData.putIfAbsent("test2", new Object());
267 }
268 @Override
269 public void stop(CoprocessorEnvironment e) {
270 sharedData = null;
271 }
272 @Override
273 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
274 final Get get, final List<Cell> results) throws IOException {
275 if (1/0 == 1) {
276 e.complete();
277 }
278 }
279
280 Map<String, Object> getSharedData() {
281 return sharedData;
282 }
283 }
284
285 @Test
286 public void testSharedData() throws IOException {
287 TableName tableName = TableName.valueOf(name.getMethodName());
288 byte [][] families = { fam1, fam2, fam3 };
289
290 Configuration hc = initSplit();
291 Region region = initHRegion(tableName, name.getMethodName(), hc,
292 new Class<?>[]{}, families);
293
294 for (int i = 0; i < 3; i++) {
295 HBaseTestCase.addContent(region, fam3);
296 region.flush(true);
297 }
298
299 region.compact(false);
300
301 byte [] splitRow = ((HRegion)region).checkSplit();
302 assertNotNull(splitRow);
303 Region [] regions = split(region, splitRow);
304 for (int i = 0; i < regions.length; i++) {
305 regions[i] = reopenRegion(regions[i], CoprocessorImpl.class, CoprocessorII.class);
306 }
307 Coprocessor c = regions[0].getCoprocessorHost().
308 findCoprocessor(CoprocessorImpl.class.getName());
309 Coprocessor c2 = regions[0].getCoprocessorHost().
310 findCoprocessor(CoprocessorII.class.getName());
311 Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
312 Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
313 assertNotNull(o);
314 assertNotNull(o2);
315
316 assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
317 for (int i = 1; i < regions.length; i++) {
318 c = regions[i].getCoprocessorHost().
319 findCoprocessor(CoprocessorImpl.class.getName());
320 c2 = regions[i].getCoprocessorHost().
321 findCoprocessor(CoprocessorII.class.getName());
322
323 assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
324 assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
325 }
326
327 for (int i = 0; i < regions.length; i++) {
328 try {
329 byte [] r = regions[i].getRegionInfo().getStartKey();
330 if (r == null || r.length <= 0) {
331
332 r = new byte [] {0};
333 }
334 Get g = new Get(r);
335 regions[i].get(g);
336 fail();
337 } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
338 }
339 assertNull(regions[i].getCoprocessorHost().
340 findCoprocessor(CoprocessorII.class.getName()));
341 }
342 c = regions[0].getCoprocessorHost().
343 findCoprocessor(CoprocessorImpl.class.getName());
344 assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
345 c = c2 = null;
346
347 System.gc();
348
349 region = reopenRegion(regions[0], CoprocessorImpl.class, CoprocessorII.class);
350 c = region.getCoprocessorHost().
351 findCoprocessor(CoprocessorImpl.class.getName());
352
353 assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
354 c2 = region.getCoprocessorHost().
355 findCoprocessor(CoprocessorII.class.getName());
356
357
358 Object o3 = ((CoprocessorII)c2).getSharedData().get("test2");
359 assertFalse(o3 == o2);
360 }
361
362 @Test
363 public void testCoprocessorInterface() throws IOException {
364 TableName tableName = TableName.valueOf(name.getMethodName());
365 byte [][] families = { fam1, fam2, fam3 };
366
367 Configuration hc = initSplit();
368 Region region = initHRegion(tableName, name.getMethodName(), hc,
369 new Class<?>[]{CoprocessorImpl.class}, families);
370 for (int i = 0; i < 3; i++) {
371 HBaseTestCase.addContent(region, fam3);
372 region.flush(true);
373 }
374
375 region.compact(false);
376
377 byte [] splitRow = ((HRegion)region).checkSplit();
378
379 assertNotNull(splitRow);
380 Region [] regions = split(region, splitRow);
381 for (int i = 0; i < regions.length; i++) {
382 regions[i] = reopenRegion(regions[i], CoprocessorImpl.class);
383 }
384 HRegion.closeHRegion((HRegion)region);
385 Coprocessor c = region.getCoprocessorHost().
386 findCoprocessor(CoprocessorImpl.class.getName());
387
388
389 Scan s = new Scan();
390 RegionScanner scanner = regions[0].getCoprocessorHost().postScannerOpen(s, regions[0].getScanner(s));
391 assertTrue(scanner instanceof CustomScanner);
392
393 scanner.next(new ArrayList<Cell>());
394
395 assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
396 assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
397 assertTrue(((CoprocessorImpl)c).wasOpened());
398 assertTrue(((CoprocessorImpl)c).wasClosed());
399 assertTrue(((CoprocessorImpl)c).wasFlushed());
400 assertTrue(((CoprocessorImpl)c).wasCompacted());
401 assertTrue(((CoprocessorImpl)c).wasSplit());
402
403 for (int i = 0; i < regions.length; i++) {
404 HRegion.closeHRegion((HRegion)regions[i]);
405 c = region.getCoprocessorHost()
406 .findCoprocessor(CoprocessorImpl.class.getName());
407 assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
408 assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
409 assertTrue(((CoprocessorImpl)c).wasOpened());
410 assertTrue(((CoprocessorImpl)c).wasClosed());
411 assertTrue(((CoprocessorImpl)c).wasCompacted());
412 }
413 }
414
415 Region reopenRegion(final Region closedRegion, Class<?> ... implClasses)
416 throws IOException {
417
418 Region r = HRegion.openHRegion(closedRegion, null);
419
420
421
422
423
424 Configuration conf = TEST_UTIL.getConfiguration();
425 RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
426 ((HRegion)r).setCoprocessorHost(host);
427
428 for (Class<?> implClass : implClasses) {
429 host.load(implClass, Coprocessor.PRIORITY_USER, conf);
430 }
431
432
433
434
435
436
437 host.preOpen();
438 host.postOpen();
439 return r;
440 }
441
442 Region initHRegion (TableName tableName, String callingMethod,
443 Configuration conf, Class<?> [] implClasses, byte [][] families)
444 throws IOException {
445 HTableDescriptor htd = new HTableDescriptor(tableName);
446 for(byte [] family : families) {
447 htd.addFamily(new HColumnDescriptor(family));
448 }
449 HRegionInfo info = new HRegionInfo(tableName, null, null, false);
450 Path path = new Path(DIR + callingMethod);
451 HRegion r = HRegion.createHRegion(info, path, conf, htd);
452
453
454 RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
455 r.setCoprocessorHost(host);
456
457 for (Class<?> implClass : implClasses) {
458 host.load(implClass, Coprocessor.PRIORITY_USER, conf);
459 Coprocessor c = host.findCoprocessor(implClass.getName());
460 assertNotNull(c);
461 }
462
463
464 host.preOpen();
465 host.postOpen();
466 return r;
467 }
468
469 Configuration initSplit() {
470
471 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
472
473 TEST_UTIL.getConfiguration().setInt(
474 "hbase.master.lease.thread.wakefrequency", 5 * 1000);
475 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
476
477 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
478
479
480 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
481 1024 * 128);
482 TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster",
483 true);
484 TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
485
486 return TEST_UTIL.getConfiguration();
487 }
488
489 private Region [] split(final Region r, final byte [] splitRow) throws IOException {
490 Region[] regions = new Region[2];
491
492 SplitTransaction st = new SplitTransactionFactory(TEST_UTIL.getConfiguration())
493 .create(r, splitRow);
494 int i = 0;
495
496 if (!st.prepare()) {
497
498 assertTrue(false);
499 }
500 try {
501 Server mockServer = Mockito.mock(Server.class);
502 when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
503 PairOfSameType<Region> daughters = st.execute(mockServer, null);
504 for (Region each_daughter: daughters) {
505 regions[i] = each_daughter;
506 i++;
507 }
508 } catch (IOException ioe) {
509 LOG.info("Split transaction of " + r.getRegionInfo().getRegionNameAsString() +
510 " failed:" + ioe.getMessage());
511 assertTrue(false);
512 } catch (RuntimeException e) {
513 LOG.info("Failed rollback of failed split of " +
514 r.getRegionInfo().getRegionNameAsString() + e.getMessage());
515 }
516
517 assertTrue(i == 2);
518 return regions;
519 }
520
521 }