View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
24  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
25  import static org.junit.Assert.assertFalse;
26  import static org.junit.Assert.assertNotNull;
27  import static org.junit.Assert.assertNull;
28  import static org.junit.Assert.assertTrue;
29  import static org.junit.Assert.fail;
30  import static org.mockito.Mockito.when;
31  
32  import java.io.IOException;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.concurrent.ConcurrentMap;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.HBaseTestCase;
46  import org.apache.hadoop.hbase.HBaseTestingUtility;
47  import org.apache.hadoop.hbase.HColumnDescriptor;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.Server;
52  import org.apache.hadoop.hbase.testclassification.SmallTests;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.client.Get;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.regionserver.HRegion;
57  import org.apache.hadoop.hbase.regionserver.InternalScanner;
58  import org.apache.hadoop.hbase.regionserver.Region;
59  import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
60  import org.apache.hadoop.hbase.regionserver.RegionScanner;
61  import org.apache.hadoop.hbase.regionserver.ScanType;
62  import org.apache.hadoop.hbase.regionserver.ScannerContext;
63  import org.apache.hadoop.hbase.regionserver.SplitTransaction;
64  import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory;
65  import org.apache.hadoop.hbase.regionserver.Store;
66  import org.apache.hadoop.hbase.regionserver.StoreFile;
67  import org.apache.hadoop.hbase.util.PairOfSameType;
68  import org.junit.Rule;
69  import org.junit.Test;
70  import org.junit.experimental.categories.Category;
71  import org.junit.rules.TestName;
72  import org.mockito.Mockito;
73  
74  @Category(SmallTests.class)
75  public class TestCoprocessorInterface {
76    @Rule public TestName name = new TestName();
77    private static final Log LOG = LogFactory.getLog(TestCoprocessorInterface.class);
78    private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
79    static final Path DIR = TEST_UTIL.getDataTestDir();
80  
81    private static class CustomScanner implements RegionScanner {
82  
83      private RegionScanner delegate;
84  
85      public CustomScanner(RegionScanner delegate) {
86        this.delegate = delegate;
87      }
88  
89      @Override
90      public boolean next(List<Cell> results) throws IOException {
91        return delegate.next(results);
92      }
93  
94      @Override
95      public boolean next(List<Cell> result, ScannerContext scannerContext)
96          throws IOException {
97        return delegate.next(result, scannerContext);
98      }
99  
100     @Override
101     public boolean nextRaw(List<Cell> result)
102         throws IOException {
103       return delegate.nextRaw(result);
104     }
105 
106     @Override
107     public boolean nextRaw(List<Cell> result, ScannerContext context)
108         throws IOException {
109       return delegate.nextRaw(result, context);
110     }
111 
112     @Override
113     public void close() throws IOException {
114       delegate.close();
115     }
116 
117     @Override
118     public HRegionInfo getRegionInfo() {
119       return delegate.getRegionInfo();
120     }
121 
122     @Override
123     public boolean isFilterDone() throws IOException {
124       return delegate.isFilterDone();
125     }
126 
127     @Override
128     public boolean reseek(byte[] row) throws IOException {
129       return false;
130     }
131 
132     @Override
133     public long getMaxResultSize() {
134       return delegate.getMaxResultSize();
135     }
136 
137     @Override
138     public long getMvccReadPoint() {
139       return delegate.getMvccReadPoint();
140     }
141 
142     @Override
143     public int getBatch() {
144       return delegate.getBatch();
145     }
146   }
147 
148   public static class CoprocessorImpl extends BaseRegionObserver {
149 
150     private boolean startCalled;
151     private boolean stopCalled;
152     private boolean preOpenCalled;
153     private boolean postOpenCalled;
154     private boolean preCloseCalled;
155     private boolean postCloseCalled;
156     private boolean preCompactCalled;
157     private boolean postCompactCalled;
158     private boolean preFlushCalled;
159     private boolean postFlushCalled;
160     private boolean preSplitCalled;
161     private boolean postSplitCalled;
162     private boolean preSplitWithSplitRowCalled;
163     private ConcurrentMap<String, Object> sharedData;
164 
165     @Override
166     public void start(CoprocessorEnvironment e) {
167       sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
168       // using new String here, so that there will be new object on each invocation
169       sharedData.putIfAbsent("test1", new Object());
170       startCalled = true;
171     }
172 
173     @Override
174     public void stop(CoprocessorEnvironment e) {
175       sharedData = null;
176       stopCalled = true;
177     }
178 
179     @Override
180     public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
181       preOpenCalled = true;
182     }
183     @Override
184     public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
185       postOpenCalled = true;
186     }
187     @Override
188     public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
189       preCloseCalled = true;
190     }
191     @Override
192     public void postClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested) {
193       postCloseCalled = true;
194     }
195     @Override
196     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
197         Store store, InternalScanner scanner, ScanType scanType) {
198       preCompactCalled = true;
199       return scanner;
200     }
201     @Override
202     public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
203         Store store, StoreFile resultFile) {
204       postCompactCalled = true;
205     }
206     @Override
207     public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
208       preFlushCalled = true;
209     }
210     @Override
211     public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
212       postFlushCalled = true;
213     }
214     @Override
215     public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) {
216       preSplitCalled = true;
217     }
218     
219     @Override
220     public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
221         byte[] splitRow) throws IOException {
222       preSplitWithSplitRowCalled = true;
223     }
224     @Override
225     public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, Region l, Region r) {
226       postSplitCalled = true;
227     }
228 
229     @Override
230     public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
231         final Scan scan, final RegionScanner s) throws IOException {
232       return new CustomScanner(s);
233     }
234 
235     boolean wasStarted() {
236       return startCalled;
237     }
238     boolean wasStopped() {
239       return stopCalled;
240     }
241     boolean wasOpened() {
242       return (preOpenCalled && postOpenCalled);
243     }
244     boolean wasClosed() {
245       return (preCloseCalled && postCloseCalled);
246     }
247     boolean wasFlushed() {
248       return (preFlushCalled && postFlushCalled);
249     }
250     boolean wasCompacted() {
251       return (preCompactCalled && postCompactCalled);
252     }
253     boolean wasSplit() {
254       return (preSplitCalled && postSplitCalled && preSplitWithSplitRowCalled);
255     }
256     Map<String, Object> getSharedData() {
257       return sharedData;
258     }
259   }
260 
261   public static class CoprocessorII extends BaseRegionObserver {
262     private ConcurrentMap<String, Object> sharedData;
263     @Override
264     public void start(CoprocessorEnvironment e) {
265       sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
266       sharedData.putIfAbsent("test2", new Object());
267     }
268     @Override
269     public void stop(CoprocessorEnvironment e) {
270       sharedData = null;
271     }
272     @Override
273     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
274         final Get get, final List<Cell> results) throws IOException {
275       if (1/0 == 1) {
276         e.complete();
277       }
278     }
279 
280     Map<String, Object> getSharedData() {
281       return sharedData;
282     }
283   }
284 
285   @Test
286   public void testSharedData() throws IOException {
287     TableName tableName = TableName.valueOf(name.getMethodName());
288     byte [][] families = { fam1, fam2, fam3 };
289 
290     Configuration hc = initSplit();
291     Region region = initHRegion(tableName, name.getMethodName(), hc,
292       new Class<?>[]{}, families);
293 
294     for (int i = 0; i < 3; i++) {
295       HBaseTestCase.addContent(region, fam3);
296       region.flush(true);
297     }
298 
299     region.compact(false);
300 
301     byte [] splitRow = ((HRegion)region).checkSplit();
302     assertNotNull(splitRow);
303     Region [] regions = split(region, splitRow);
304     for (int i = 0; i < regions.length; i++) {
305       regions[i] = reopenRegion(regions[i], CoprocessorImpl.class, CoprocessorII.class);
306     }
307     Coprocessor c = regions[0].getCoprocessorHost().
308         findCoprocessor(CoprocessorImpl.class.getName());
309     Coprocessor c2 = regions[0].getCoprocessorHost().
310         findCoprocessor(CoprocessorII.class.getName());
311     Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
312     Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
313     assertNotNull(o);
314     assertNotNull(o2);
315     // to coprocessors get different sharedDatas
316     assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
317     for (int i = 1; i < regions.length; i++) {
318       c = regions[i].getCoprocessorHost().
319           findCoprocessor(CoprocessorImpl.class.getName());
320       c2 = regions[i].getCoprocessorHost().
321           findCoprocessor(CoprocessorII.class.getName());
322       // make sure that all coprocessor of a class have identical sharedDatas
323       assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
324       assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
325     }
326     // now have all Environments fail
327     for (int i = 0; i < regions.length; i++) {
328       try {
329         byte [] r = regions[i].getRegionInfo().getStartKey();
330         if (r == null || r.length <= 0) {
331           // Its the start row.  Can't ask for null.  Ask for minimal key instead.
332           r = new byte [] {0};
333         }
334         Get g = new Get(r);
335         regions[i].get(g);
336         fail();
337       } catch (org.apache.hadoop.hbase.DoNotRetryIOException xc) {
338       }
339       assertNull(regions[i].getCoprocessorHost().
340           findCoprocessor(CoprocessorII.class.getName()));
341     }
342     c = regions[0].getCoprocessorHost().
343         findCoprocessor(CoprocessorImpl.class.getName());
344     assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
345     c = c2 = null;
346     // perform a GC
347     System.gc();
348     // reopen the region
349     region = reopenRegion(regions[0], CoprocessorImpl.class, CoprocessorII.class);
350     c = region.getCoprocessorHost().
351         findCoprocessor(CoprocessorImpl.class.getName());
352     // CPimpl is unaffected, still the same reference
353     assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
354     c2 = region.getCoprocessorHost().
355         findCoprocessor(CoprocessorII.class.getName());
356     // new map and object created, hence the reference is different
357     // hence the old entry was indeed removed by the GC and new one has been created
358     Object o3 = ((CoprocessorII)c2).getSharedData().get("test2");
359     assertFalse(o3 == o2);
360   }
361 
362   @Test
363   public void testCoprocessorInterface() throws IOException {
364     TableName tableName = TableName.valueOf(name.getMethodName());
365     byte [][] families = { fam1, fam2, fam3 };
366 
367     Configuration hc = initSplit();
368     Region region = initHRegion(tableName, name.getMethodName(), hc,
369       new Class<?>[]{CoprocessorImpl.class}, families);
370     for (int i = 0; i < 3; i++) {
371       HBaseTestCase.addContent(region, fam3);
372       region.flush(true);
373     }
374 
375     region.compact(false);
376 
377     byte [] splitRow = ((HRegion)region).checkSplit();
378 
379     assertNotNull(splitRow);
380     Region [] regions = split(region, splitRow);
381     for (int i = 0; i < regions.length; i++) {
382       regions[i] = reopenRegion(regions[i], CoprocessorImpl.class);
383     }
384     HRegion.closeHRegion((HRegion)region);
385     Coprocessor c = region.getCoprocessorHost().
386       findCoprocessor(CoprocessorImpl.class.getName());
387 
388     // HBASE-4197
389     Scan s = new Scan();
390     RegionScanner scanner = regions[0].getCoprocessorHost().postScannerOpen(s, regions[0].getScanner(s));
391     assertTrue(scanner instanceof CustomScanner);
392     // this would throw an exception before HBASE-4197
393     scanner.next(new ArrayList<Cell>());
394 
395     assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
396     assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
397     assertTrue(((CoprocessorImpl)c).wasOpened());
398     assertTrue(((CoprocessorImpl)c).wasClosed());
399     assertTrue(((CoprocessorImpl)c).wasFlushed());
400     assertTrue(((CoprocessorImpl)c).wasCompacted());
401     assertTrue(((CoprocessorImpl)c).wasSplit());
402 
403     for (int i = 0; i < regions.length; i++) {
404       HRegion.closeHRegion((HRegion)regions[i]);
405       c = region.getCoprocessorHost()
406             .findCoprocessor(CoprocessorImpl.class.getName());
407       assertTrue("Coprocessor not started", ((CoprocessorImpl)c).wasStarted());
408       assertTrue("Coprocessor not stopped", ((CoprocessorImpl)c).wasStopped());
409       assertTrue(((CoprocessorImpl)c).wasOpened());
410       assertTrue(((CoprocessorImpl)c).wasClosed());
411       assertTrue(((CoprocessorImpl)c).wasCompacted());
412     }
413   }
414 
415   Region reopenRegion(final Region closedRegion, Class<?> ... implClasses)
416       throws IOException {
417     //HRegionInfo info = new HRegionInfo(tableName, null, null, false);
418     Region r = HRegion.openHRegion(closedRegion, null);
419 
420     // this following piece is a hack. currently a coprocessorHost
421     // is secretly loaded at OpenRegionHandler. we don't really
422     // start a region server here, so just manually create cphost
423     // and set it to region.
424     Configuration conf = TEST_UTIL.getConfiguration();
425     RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
426     ((HRegion)r).setCoprocessorHost(host);
427 
428     for (Class<?> implClass : implClasses) {
429       host.load(implClass, Coprocessor.PRIORITY_USER, conf);
430     }
431     // we need to manually call pre- and postOpen here since the
432     // above load() is not the real case for CP loading. A CP is
433     // expected to be loaded by default from 1) configuration; or 2)
434     // HTableDescriptor. If it's loaded after HRegion initialized,
435     // the pre- and postOpen() won't be triggered automatically.
436     // Here we have to call pre and postOpen explicitly.
437     host.preOpen();
438     host.postOpen();
439     return r;
440   }
441 
442   Region initHRegion (TableName tableName, String callingMethod,
443       Configuration conf, Class<?> [] implClasses, byte [][] families)
444       throws IOException {
445     HTableDescriptor htd = new HTableDescriptor(tableName);
446     for(byte [] family : families) {
447       htd.addFamily(new HColumnDescriptor(family));
448     }
449     HRegionInfo info = new HRegionInfo(tableName, null, null, false);
450     Path path = new Path(DIR + callingMethod);
451     HRegion r = HRegion.createHRegion(info, path, conf, htd);
452 
453     // this following piece is a hack.
454     RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
455     r.setCoprocessorHost(host);
456 
457     for (Class<?> implClass : implClasses) {
458       host.load(implClass, Coprocessor.PRIORITY_USER, conf);
459       Coprocessor c = host.findCoprocessor(implClass.getName());
460       assertNotNull(c);
461     }
462 
463     // Here we have to call pre and postOpen explicitly.
464     host.preOpen();
465     host.postOpen();
466     return r;
467   }
468 
469   Configuration initSplit() {
470     // Always compact if there is more than one store file.
471     TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
472     // Make lease timeout longer, lease checks less frequent
473     TEST_UTIL.getConfiguration().setInt(
474         "hbase.master.lease.thread.wakefrequency", 5 * 1000);
475     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
476     // Increase the amount of time between client retries
477     TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
478     // This size should make it so we always split using the addContent
479     // below.  After adding all data, the first region is 1.3M
480     TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
481         1024 * 128);
482     TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster",
483         true);
484     TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
485 
486     return TEST_UTIL.getConfiguration();
487   }
488 
489   private Region [] split(final Region r, final byte [] splitRow) throws IOException {
490     Region[] regions = new Region[2];
491 
492     SplitTransaction st = new SplitTransactionFactory(TEST_UTIL.getConfiguration())
493       .create(r, splitRow);
494     int i = 0;
495 
496     if (!st.prepare()) {
497       // test fails.
498       assertTrue(false);
499     }
500     try {
501       Server mockServer = Mockito.mock(Server.class);
502       when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
503       PairOfSameType<Region> daughters = st.execute(mockServer, null);
504       for (Region each_daughter: daughters) {
505         regions[i] = each_daughter;
506         i++;
507       }
508     } catch (IOException ioe) {
509       LOG.info("Split transaction of " + r.getRegionInfo().getRegionNameAsString() +
510           " failed:" + ioe.getMessage());
511       assertTrue(false);
512     } catch (RuntimeException e) {
513       LOG.info("Failed rollback of failed split of " +
514           r.getRegionInfo().getRegionNameAsString() + e.getMessage());
515     }
516 
517     assertTrue(i == 2);
518     return regions;
519   }
520 
521 }