View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import com.google.common.collect.Lists;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.fs.FileSystem;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.HBaseTestingUtility;
29  import org.apache.hadoop.hbase.HColumnDescriptor;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.testclassification.LargeTests;
34  import org.apache.hadoop.hbase.KeyValueUtil;
35  import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
36  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
37  import org.apache.hadoop.hbase.TableExistsException;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.HConnection;
40  import org.apache.hadoop.hbase.client.RegionServerCallable;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.ResultScanner;
43  import org.apache.hadoop.hbase.client.RpcRetryingCaller;
44  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.io.compress.Compression;
47  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
48  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
49  import org.apache.hadoop.hbase.io.hfile.HFile;
50  import org.apache.hadoop.hbase.io.hfile.HFileContext;
51  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
52  import org.apache.hadoop.hbase.protobuf.RequestConverter;
53  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
54  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
55  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
56  import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
57  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.Pair;
60  import org.apache.hadoop.hbase.wal.WAL;
61  import org.apache.hadoop.hbase.wal.WALKey;
62  import org.junit.Test;
63  import org.junit.experimental.categories.Category;
64  
65  import com.google.common.collect.Lists;
66  
67  import java.io.IOException;
68  import java.util.ArrayList;
69  import java.util.List;
70  import java.util.Map;
71  import java.util.concurrent.atomic.AtomicLong;
72  
73  import static org.hamcrest.core.Is.is;
74  import static org.junit.Assert.assertThat;
75  
76  /**
77   * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
78   * the region server's bullkLoad functionality.
79   */
80  @Category(LargeTests.class)
81  public class TestHRegionServerBulkLoad {
82    private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
83    private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
84    private final static Configuration conf = UTIL.getConfiguration();
85    private final static byte[] QUAL = Bytes.toBytes("qual");
86    private final static int NUM_CFS = 10;
87    public static int BLOCKSIZE = 64 * 1024;
88    public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
89  
90    private final static byte[][] families = new byte[NUM_CFS][];
91    static {
92      for (int i = 0; i < NUM_CFS; i++) {
93        families[i] = Bytes.toBytes(family(i));
94      }
95    }
96  
97    /**
98     * Create a rowkey compatible with
99     * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
100    */
101   public static byte[] rowkey(int i) {
102     return Bytes.toBytes(String.format("row_%08d", i));
103   }
104 
105   static String family(int i) {
106     return String.format("family_%04d", i);
107   }
108 
109   /**
110    * Create an HFile with the given number of rows with a specified value.
111    */
112   public static void createHFile(FileSystem fs, Path path, byte[] family,
113       byte[] qualifier, byte[] value, int numRows) throws IOException {
114     HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
115                             .withCompression(COMPRESSION)
116                             .build();
117     HFile.Writer writer = HFile
118         .getWriterFactory(conf, new CacheConfig(conf))
119         .withPath(fs, path)
120         .withFileContext(context)
121         .create();
122     long now = System.currentTimeMillis();
123     try {
124       // subtract 2 since iterateOnSplits doesn't include boundary keys
125       for (int i = 0; i < numRows; i++) {
126         KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
127         writer.append(kv);
128       }
129       writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(now));
130     } finally {
131       writer.close();
132     }
133   }
134 
135   /**
136    * Thread that does full scans of the table looking for any partially
137    * completed rows.
138    *
139    * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
140    * handles. So every 10 iterations (500 file handles) it does a region
141    * compaction to reduce the number of open file handles.
142    */
143   public static class AtomicHFileLoader extends RepeatingTestThread {
144     final AtomicLong numBulkLoads = new AtomicLong();
145     final AtomicLong numCompactions = new AtomicLong();
146     private TableName tableName;
147 
148     public AtomicHFileLoader(TableName tableName, TestContext ctx,
149         byte targetFamilies[][]) throws IOException {
150       super(ctx);
151       this.tableName = tableName;
152     }
153 
154     public void doAnAction() throws Exception {
155       long iteration = numBulkLoads.getAndIncrement();
156       Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
157           iteration));
158 
159       // create HFiles for different column families
160       FileSystem fs = UTIL.getTestFileSystem();
161       byte[] val = Bytes.toBytes(String.format("%010d", iteration));
162       final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
163           NUM_CFS);
164       for (int i = 0; i < NUM_CFS; i++) {
165         Path hfile = new Path(dir, family(i));
166         byte[] fam = Bytes.toBytes(family(i));
167         createHFile(fs, hfile, fam, QUAL, val, 1000);
168         famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
169       }
170 
171       // bulk load HFiles
172       final HConnection conn = UTIL.getHBaseAdmin().getConnection();
173       RegionServerCallable<Void> callable =
174           new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
175         @Override
176         public Void call(int callTimeout) throws Exception {
177           LOG.debug("Going to connect to server " + getLocation() + " for row "
178               + Bytes.toStringBinary(getRow()));
179           byte[] regionName = getLocation().getRegionInfo().getRegionName();
180           BulkLoadHFileRequest request =
181             RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
182           getStub().bulkLoadHFile(null, request);
183           return null;
184         }
185       };
186       RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
187       RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
188       caller.callWithRetries(callable, Integer.MAX_VALUE);
189 
190       // Periodically do compaction to reduce the number of open file handles.
191       if (numBulkLoads.get() % 10 == 0) {
192         // 10 * 50 = 500 open file handles!
193         callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
194           @Override
195           public Void call(int callTimeout) throws Exception {
196             LOG.debug("compacting " + getLocation() + " for row "
197                 + Bytes.toStringBinary(getRow()));
198             AdminProtos.AdminService.BlockingInterface server =
199               conn.getAdmin(getLocation().getServerName());
200             CompactRegionRequest request =
201               RequestConverter.buildCompactRegionRequest(
202                 getLocation().getRegionInfo().getRegionName(), true, null);
203             server.compactRegion(null, request);
204             numCompactions.incrementAndGet();
205             return null;
206           }
207         };
208         caller.callWithRetries(callable, Integer.MAX_VALUE);
209       }
210     }
211   }
212 
213   /**
214    * Thread that does full scans of the table looking for any partially
215    * completed rows.
216    */
217   public static class AtomicScanReader extends RepeatingTestThread {
218     byte targetFamilies[][];
219     HTable table;
220     AtomicLong numScans = new AtomicLong();
221     AtomicLong numRowsScanned = new AtomicLong();
222     TableName TABLE_NAME;
223 
224     public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
225         byte targetFamilies[][]) throws IOException {
226       super(ctx);
227       this.TABLE_NAME = TABLE_NAME;
228       this.targetFamilies = targetFamilies;
229       table = new HTable(conf, TABLE_NAME);
230     }
231 
232     public void doAnAction() throws Exception {
233       Scan s = new Scan();
234       for (byte[] family : targetFamilies) {
235         s.addFamily(family);
236       }
237       ResultScanner scanner = table.getScanner(s);
238 
239       for (Result res : scanner) {
240         byte[] lastRow = null, lastFam = null, lastQual = null;
241         byte[] gotValue = null;
242         for (byte[] family : targetFamilies) {
243           byte qualifier[] = QUAL;
244           byte thisValue[] = res.getValue(family, qualifier);
245           if (gotValue != null && thisValue != null
246               && !Bytes.equals(gotValue, thisValue)) {
247 
248             StringBuilder msg = new StringBuilder();
249             msg.append("Failed on scan ").append(numScans)
250                 .append(" after scanning ").append(numRowsScanned)
251                 .append(" rows!\n");
252             msg.append("Current  was " + Bytes.toString(res.getRow()) + "/"
253                 + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
254                 + " = " + Bytes.toString(thisValue) + "\n");
255             msg.append("Previous  was " + Bytes.toString(lastRow) + "/"
256                 + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
257                 + " = " + Bytes.toString(gotValue));
258             throw new RuntimeException(msg.toString());
259           }
260 
261           lastFam = family;
262           lastQual = qualifier;
263           lastRow = res.getRow();
264           gotValue = thisValue;
265         }
266         numRowsScanned.getAndIncrement();
267       }
268       numScans.getAndIncrement();
269     }
270   }
271 
272   /**
273    * Creates a table with given table name and specified number of column
274    * families if the table does not already exist.
275    */
276   private void setupTable(TableName table, int cfs) throws IOException {
277     try {
278       LOG.info("Creating table " + table);
279       HTableDescriptor htd = new HTableDescriptor(table);
280       for (int i = 0; i < 10; i++) {
281         htd.addFamily(new HColumnDescriptor(family(i)));
282       }
283 
284       UTIL.getHBaseAdmin().createTable(htd);
285     } catch (TableExistsException tee) {
286       LOG.info("Table " + table + " already exists");
287     }
288   }
289 
290   /**
291    * Atomic bulk load.
292    */
293   @Test
294   public void testAtomicBulkLoad() throws Exception {
295     TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
296 
297     int millisToRun = 30000;
298     int numScanners = 50;
299 
300     UTIL.startMiniCluster(1);
301     try {
302       WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
303       FindBulkHBaseListener listener = new FindBulkHBaseListener();
304       log.registerWALActionsListener(listener);
305       runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
306       assertThat(listener.isFound(), is(true));
307     } finally {
308       UTIL.shutdownMiniCluster();
309     }
310   }
311 
312   void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
313       throws Exception {
314     setupTable(tableName, 10);
315 
316     TestContext ctx = new TestContext(UTIL.getConfiguration());
317 
318     AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
319     ctx.addThread(loader);
320 
321     List<AtomicScanReader> scanners = Lists.newArrayList();
322     for (int i = 0; i < numScanners; i++) {
323       AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
324       scanners.add(scanner);
325       ctx.addThread(scanner);
326     }
327 
328     ctx.startThreads();
329     ctx.waitFor(millisToRun);
330     ctx.stop();
331 
332     LOG.info("Loaders:");
333     LOG.info("  loaded " + loader.numBulkLoads.get());
334     LOG.info("  compations " + loader.numCompactions.get());
335 
336     LOG.info("Scanners:");
337     for (AtomicScanReader scanner : scanners) {
338       LOG.info("  scanned " + scanner.numScans.get());
339       LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
340     }
341   }
342 
343   /**
344    * Run test on an HBase instance for 5 minutes. This assumes that the table
345    * under test only has a single region.
346    */
347   public static void main(String args[]) throws Exception {
348     try {
349       Configuration c = HBaseConfiguration.create();
350       TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
351       test.setConf(c);
352       test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
353     } finally {
354       System.exit(0); // something hangs (believe it is lru threadpool)
355     }
356   }
357 
358   private void setConf(Configuration c) {
359     UTIL = new HBaseTestingUtility(c);
360   }
361 
362   static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
363     private boolean found = false;
364 
365     @Override
366     public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
367       for (Cell cell : logEdit.getCells()) {
368         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
369         for (Map.Entry entry : kv.toStringMap().entrySet()) {
370           if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
371             found = true;
372           }
373         }
374       }
375     }
376 
377     public boolean isFound() {
378       return found;
379     }
380   }
381 }
382 
383