View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import static org.junit.Assert.*;
23  import static org.mockito.Matchers.any;
24  import static org.mockito.Mockito.spy;
25  import static org.mockito.Mockito.times;
26  import static org.mockito.Mockito.verify;
27  
28  import java.io.IOException;
29  import java.lang.ref.SoftReference;
30  import java.security.PrivilegedExceptionAction;
31  import java.util.ArrayList;
32  import java.util.Collection;
33  import java.util.Collections;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.NavigableSet;
37  import java.util.concurrent.ConcurrentSkipListSet;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FSDataOutputStream;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.FilterFileSystem;
47  import org.apache.hadoop.fs.LocalFileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.fs.permission.FsPermission;
50  import org.apache.hadoop.hbase.Cell;
51  import org.apache.hadoop.hbase.CellUtil;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HBaseTestingUtility;
54  import org.apache.hadoop.hbase.HColumnDescriptor;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.KeyValue;
58  import org.apache.hadoop.hbase.KeyValue.KVComparator;
59  import org.apache.hadoop.hbase.KeyValueUtil;
60  import org.apache.hadoop.hbase.testclassification.MediumTests;
61  import org.apache.hadoop.hbase.TableName;
62  import org.apache.hadoop.hbase.client.Get;
63  import org.apache.hadoop.hbase.io.compress.Compression;
64  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
65  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
66  import org.apache.hadoop.hbase.io.hfile.HFile;
67  import org.apache.hadoop.hbase.io.hfile.HFileContext;
68  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
69  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
70  import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
71  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
72  import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
73  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
74  import org.apache.hadoop.hbase.wal.WALFactory;
75  import org.apache.hadoop.hbase.security.User;
76  import org.apache.hadoop.hbase.util.Bytes;
77  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78  import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
79  import org.apache.hadoop.hbase.util.FSUtils;
80  import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
81  import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
82  import org.apache.hadoop.util.Progressable;
83  import org.junit.After;
84  import org.junit.Assert;
85  import org.junit.Before;
86  import org.junit.Rule;
87  import org.junit.Test;
88  import org.junit.experimental.categories.Category;
89  import org.junit.rules.TestName;
90  import org.mockito.Mockito;
91  
92  import com.google.common.collect.Lists;
93  
94  /**
95   * Test class for the Store
96   */
97  @Category(MediumTests.class)
98  public class TestStore {
99    private static final Log LOG = LogFactory.getLog(TestStore.class);
100   @Rule public TestName name = new TestName();
101 
102   HStore store;
103   byte [] table = Bytes.toBytes("table");
104   byte [] family = Bytes.toBytes("family");
105 
106   byte [] row = Bytes.toBytes("row");
107   byte [] row2 = Bytes.toBytes("row2");
108   byte [] qf1 = Bytes.toBytes("qf1");
109   byte [] qf2 = Bytes.toBytes("qf2");
110   byte [] qf3 = Bytes.toBytes("qf3");
111   byte [] qf4 = Bytes.toBytes("qf4");
112   byte [] qf5 = Bytes.toBytes("qf5");
113   byte [] qf6 = Bytes.toBytes("qf6");
114 
115   NavigableSet<byte[]> qualifiers =
116     new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
117 
118   List<Cell> expected = new ArrayList<Cell>();
119   List<Cell> result = new ArrayList<Cell>();
120 
121   long id = System.currentTimeMillis();
122   Get get = new Get(row);
123 
124   private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
125   private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
126 
127 
128   /**
129    * Setup
130    * @throws IOException
131    */
132   @Before
133   public void setUp() throws IOException {
134     qualifiers.add(qf1);
135     qualifiers.add(qf3);
136     qualifiers.add(qf5);
137 
138     Iterator<byte[]> iter = qualifiers.iterator();
139     while(iter.hasNext()){
140       byte [] next = iter.next();
141       expected.add(new KeyValue(row, family, next, 1, (byte[])null));
142       get.addColumn(family, next);
143     }
144   }
145 
146   private void init(String methodName) throws IOException {
147     init(methodName, TEST_UTIL.getConfiguration());
148   }
149 
150   private void init(String methodName, Configuration conf)
151   throws IOException {
152     HColumnDescriptor hcd = new HColumnDescriptor(family);
153     // some of the tests write 4 versions and then flush
154     // (with HBASE-4241, lower versions are collected on flush)
155     hcd.setMaxVersions(4);
156     init(methodName, conf, hcd);
157   }
158 
159   private void init(String methodName, Configuration conf,
160       HColumnDescriptor hcd) throws IOException {
161     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
162     init(methodName, conf, htd, hcd);
163   }
164 
165   @SuppressWarnings("deprecation")
166   private Store init(String methodName, Configuration conf, HTableDescriptor htd,
167       HColumnDescriptor hcd) throws IOException {
168     //Setting up a Store
169     Path basedir = new Path(DIR+methodName);
170     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
171     final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(methodName));
172 
173     FileSystem fs = FileSystem.get(conf);
174 
175     fs.delete(logdir, true);
176 
177     if (htd.hasFamily(hcd.getName())) {
178       htd.modifyFamily(hcd);
179     } else {
180       htd.addFamily(hcd);
181     }
182     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
183     final Configuration walConf = new Configuration(conf);
184     FSUtils.setRootDir(walConf, basedir);
185     final WALFactory wals = new WALFactory(walConf, null, methodName);
186     HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf,
187         info, htd, null);
188 
189     store = new HStore(region, hcd, conf);
190     return store;
191   }
192 
193   /**
194    * Test we do not lose data if we fail a flush and then close.
195    * Part of HBase-10466
196    * @throws Exception
197    */
198   @Test
199   public void testFlushSizeAccounting() throws Exception {
200     LOG.info("Setting up a faulty file system that cannot write in " +
201       this.name.getMethodName());
202     final Configuration conf = HBaseConfiguration.create();
203     // Only retry once.
204     conf.setInt("hbase.hstore.flush.retries.number", 1);
205     User user = User.createUserForTesting(conf, this.name.getMethodName(),
206       new String[]{"foo"});
207     // Inject our faulty LocalFileSystem
208     conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
209     user.runAs(new PrivilegedExceptionAction<Object>() {
210       @Override
211       public Object run() throws Exception {
212         // Make sure it worked (above is sensitive to caching details in hadoop core)
213         FileSystem fs = FileSystem.get(conf);
214         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
215         FaultyFileSystem ffs = (FaultyFileSystem)fs;
216 
217         // Initialize region
218         init(name.getMethodName(), conf);
219 
220         long size = store.memstore.getFlushableSize();
221         Assert.assertEquals(0, size);
222         LOG.info("Adding some data");
223         long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
224         size = store.memstore.getFlushableSize();
225         Assert.assertEquals(kvSize, size);
226         // Flush.  Bug #1 from HBASE-10466.  Make sure size calculation on failed flush is right.
227         try {
228           LOG.info("Flushing");
229           flushStore(store, id++);
230           Assert.fail("Didn't bubble up IOE!");
231         } catch (IOException ioe) {
232           Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
233         }
234         size = store.memstore.getFlushableSize();
235         Assert.assertEquals(kvSize, size);
236         store.add(new KeyValue(row, family, qf2, 2, (byte[])null));
237         // Even though we add a new kv, we expect the flushable size to be 'same' since we have
238         // not yet cleared the snapshot -- the above flush failed.
239         Assert.assertEquals(kvSize, size);
240         ffs.fault.set(false);
241         flushStore(store, id++);
242         size = store.memstore.getFlushableSize();
243         // Size should be the foreground kv size.
244         Assert.assertEquals(kvSize, size);
245         flushStore(store, id++);
246         size = store.memstore.getFlushableSize();
247         Assert.assertEquals(0, size);
248         return null;
249       }
250     });
251   }
252 
253   /**
254    * Verify that compression and data block encoding are respected by the
255    * Store.createWriterInTmp() method, used on store flush.
256    */
257   @Test
258   public void testCreateWriter() throws Exception {
259     Configuration conf = HBaseConfiguration.create();
260     FileSystem fs = FileSystem.get(conf);
261 
262     HColumnDescriptor hcd = new HColumnDescriptor(family);
263     hcd.setCompressionType(Compression.Algorithm.GZ);
264     hcd.setDataBlockEncoding(DataBlockEncoding.DIFF);
265     init(name.getMethodName(), conf, hcd);
266 
267     // Test createWriterInTmp()
268     StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false);
269     Path path = writer.getPath();
270     writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
271     writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
272     writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
273     writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
274     writer.close();
275 
276     // Verify that compression and encoding settings are respected
277     HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
278     Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
279     Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
280     reader.close();
281   }
282 
283   @Test
284   public void testDeleteExpiredStoreFiles() throws Exception {
285     testDeleteExpiredStoreFiles(0);
286     testDeleteExpiredStoreFiles(1);
287   }
288 
289   /*
290    * @param minVersions the MIN_VERSIONS for the column family
291    */
292   public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
293     int storeFileNum = 4;
294     int ttl = 4;
295     IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
296     EnvironmentEdgeManagerTestHelper.injectEdge(edge);
297 
298     Configuration conf = HBaseConfiguration.create();
299     // Enable the expired store file deletion
300     conf.setBoolean("hbase.store.delete.expired.storefile", true);
301     // Set the compaction threshold higher to avoid normal compactions.
302     conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
303 
304     HColumnDescriptor hcd = new HColumnDescriptor(family);
305     hcd.setMinVersions(minVersions);
306     hcd.setTimeToLive(ttl);
307     init(name.getMethodName() + "-" + minVersions, conf, hcd);
308 
309     long storeTtl = this.store.getScanInfo().getTtl();
310     long sleepTime = storeTtl / storeFileNum;
311     long timeStamp;
312     // There are 4 store files and the max time stamp difference among these
313     // store files will be (this.store.ttl / storeFileNum)
314     for (int i = 1; i <= storeFileNum; i++) {
315       LOG.info("Adding some data for the store file #" + i);
316       timeStamp = EnvironmentEdgeManager.currentTime();
317       this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null));
318       this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null));
319       this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null));
320       flush(i);
321       edge.incrementTime(sleepTime);
322     }
323 
324     // Verify the total number of store files
325     Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
326 
327      // Each call will find one expired store file and delete it before compaction happens.
328      // There will be no compaction due to threshold above. Last file will not be replaced.
329     for (int i = 1; i <= storeFileNum - 1; i++) {
330       // verify the expired store file.
331       assertNull(this.store.requestCompaction());
332       Collection<StoreFile> sfs = this.store.getStorefiles();
333       // Ensure i files are gone.
334       if (minVersions == 0) {
335         assertEquals(storeFileNum - i, sfs.size());
336         // Ensure only non-expired files remain.
337         for (StoreFile sf : sfs) {
338           assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
339         }
340       } else {
341         assertEquals(storeFileNum, sfs.size());
342       }
343       // Let the next store file expired.
344       edge.incrementTime(sleepTime);
345     }
346     assertNull(this.store.requestCompaction());
347 
348     Collection<StoreFile> sfs = this.store.getStorefiles();
349     // Assert the last expired file is not removed.
350     if (minVersions == 0) {
351       assertEquals(1, sfs.size());
352     }
353     long ts = sfs.iterator().next().getReader().getMaxTimestamp();
354     assertTrue(ts < (edge.currentTime() - storeTtl));
355 
356     for (StoreFile sf : sfs) {
357       sf.closeReader(true);
358     }
359   }
360 
361   @Test
362   public void testLowestModificationTime() throws Exception {
363     Configuration conf = HBaseConfiguration.create();
364     FileSystem fs = FileSystem.get(conf);
365     // Initialize region
366     init(name.getMethodName(), conf);
367 
368     int storeFileNum = 4;
369     for (int i = 1; i <= storeFileNum; i++) {
370       LOG.info("Adding some data for the store file #"+i);
371       this.store.add(new KeyValue(row, family, qf1, i, (byte[])null));
372       this.store.add(new KeyValue(row, family, qf2, i, (byte[])null));
373       this.store.add(new KeyValue(row, family, qf3, i, (byte[])null));
374       flush(i);
375     }
376     // after flush; check the lowest time stamp
377     long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
378     long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
379     Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
380 
381     // after compact; check the lowest time stamp
382     store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE);
383     lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
384     lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
385     Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
386   }
387 
388   private static long getLowestTimeStampFromFS(FileSystem fs,
389       final Collection<StoreFile> candidates) throws IOException {
390     long minTs = Long.MAX_VALUE;
391     if (candidates.isEmpty()) {
392       return minTs;
393     }
394     Path[] p = new Path[candidates.size()];
395     int i = 0;
396     for (StoreFile sf : candidates) {
397       p[i] = sf.getPath();
398       ++i;
399     }
400 
401     FileStatus[] stats = fs.listStatus(p);
402     if (stats == null || stats.length == 0) {
403       return minTs;
404     }
405     for (FileStatus s : stats) {
406       minTs = Math.min(minTs, s.getModificationTime());
407     }
408     return minTs;
409   }
410 
411   //////////////////////////////////////////////////////////////////////////////
412   // Get tests
413   //////////////////////////////////////////////////////////////////////////////
414 
415   private static final int BLOCKSIZE_SMALL = 8192;
416   /**
417    * Test for hbase-1686.
418    * @throws IOException
419    */
420   @Test
421   public void testEmptyStoreFile() throws IOException {
422     init(this.name.getMethodName());
423     // Write a store file.
424     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
425     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
426     flush(1);
427     // Now put in place an empty store file.  Its a little tricky.  Have to
428     // do manually with hacked in sequence id.
429     StoreFile f = this.store.getStorefiles().iterator().next();
430     Path storedir = f.getPath().getParent();
431     long seqid = f.getMaxSequenceId();
432     Configuration c = HBaseConfiguration.create();
433     FileSystem fs = FileSystem.get(c);
434     HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
435     StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
436         fs)
437             .withOutputDir(storedir)
438             .withFileContext(meta)
439             .build();
440     w.appendMetadata(seqid + 1, false);
441     w.close();
442     this.store.close();
443     // Reopen it... should pick up two files
444     this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c);
445     Assert.assertEquals(2, this.store.getStorefilesCount());
446 
447     result = HBaseTestingUtility.getFromStoreFile(store,
448         get.getRow(),
449         qualifiers);
450     Assert.assertEquals(1, result.size());
451   }
452 
453   /**
454    * Getting data from memstore only
455    * @throws IOException
456    */
457   @Test
458   public void testGet_FromMemStoreOnly() throws IOException {
459     init(this.name.getMethodName());
460 
461     //Put data in memstore
462     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
463     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
464     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
465     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
466     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
467     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
468 
469     //Get
470     result = HBaseTestingUtility.getFromStoreFile(store,
471         get.getRow(), qualifiers);
472 
473     //Compare
474     assertCheck();
475   }
476 
477   /**
478    * Getting data from files only
479    * @throws IOException
480    */
481   @Test
482   public void testGet_FromFilesOnly() throws IOException {
483     init(this.name.getMethodName());
484 
485     //Put data in memstore
486     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
487     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
488     //flush
489     flush(1);
490 
491     //Add more data
492     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
493     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
494     //flush
495     flush(2);
496 
497     //Add more data
498     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
499     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
500     //flush
501     flush(3);
502 
503     //Get
504     result = HBaseTestingUtility.getFromStoreFile(store,
505         get.getRow(),
506         qualifiers);
507     //this.store.get(get, qualifiers, result);
508 
509     //Need to sort the result since multiple files
510     Collections.sort(result, KeyValue.COMPARATOR);
511 
512     //Compare
513     assertCheck();
514   }
515 
516   /**
517    * Getting data from memstore and files
518    * @throws IOException
519    */
520   @Test
521   public void testGet_FromMemStoreAndFiles() throws IOException {
522     init(this.name.getMethodName());
523 
524     //Put data in memstore
525     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
526     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
527     //flush
528     flush(1);
529 
530     //Add more data
531     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
532     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
533     //flush
534     flush(2);
535 
536     //Add more data
537     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
538     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
539 
540     //Get
541     result = HBaseTestingUtility.getFromStoreFile(store,
542         get.getRow(), qualifiers);
543 
544     //Need to sort the result since multiple files
545     Collections.sort(result, KeyValue.COMPARATOR);
546 
547     //Compare
548     assertCheck();
549   }
550 
551   private void flush(int storeFilessize) throws IOException{
552     this.store.snapshot();
553     flushStore(store, id++);
554     Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
555     Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
556   }
557 
558   private void assertCheck() {
559     Assert.assertEquals(expected.size(), result.size());
560     for(int i=0; i<expected.size(); i++) {
561       Assert.assertEquals(expected.get(i), result.get(i));
562     }
563   }
564 
565   //////////////////////////////////////////////////////////////////////////////
566   // IncrementColumnValue tests
567   //////////////////////////////////////////////////////////////////////////////
568   /*
569    * test the internal details of how ICV works, especially during a flush scenario.
570    */
571   @Test
572   public void testIncrementColumnValue_ICVDuringFlush()
573       throws IOException, InterruptedException {
574     init(this.name.getMethodName());
575 
576     long oldValue = 1L;
577     long newValue = 3L;
578     this.store.add(new KeyValue(row, family, qf1,
579         System.currentTimeMillis(),
580         Bytes.toBytes(oldValue)));
581 
582     // snapshot the store.
583     this.store.snapshot();
584 
585     // add other things:
586     this.store.add(new KeyValue(row, family, qf2,
587         System.currentTimeMillis(),
588         Bytes.toBytes(oldValue)));
589 
590     // update during the snapshot.
591     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
592 
593     // memstore should have grown by some amount.
594     Assert.assertTrue(ret > 0);
595 
596     // then flush.
597     flushStore(store, id++);
598     Assert.assertEquals(1, this.store.getStorefiles().size());
599     // from the one we inserted up there, and a new one
600     Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size());
601 
602     // how many key/values for this row are there?
603     Get get = new Get(row);
604     get.addColumn(family, qf1);
605     get.setMaxVersions(); // all versions.
606     List<Cell> results = new ArrayList<Cell>();
607 
608     results = HBaseTestingUtility.getFromStoreFile(store, get);
609     Assert.assertEquals(2, results.size());
610 
611     long ts1 = results.get(0).getTimestamp();
612     long ts2 = results.get(1).getTimestamp();
613 
614     Assert.assertTrue(ts1 > ts2);
615 
616     Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
617     Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
618   }
619 
620   @After
621   public void tearDown() throws Exception {
622     EnvironmentEdgeManagerTestHelper.reset();
623   }
624 
625   @Test
626   public void testICV_negMemstoreSize()  throws IOException {
627       init(this.name.getMethodName());
628 
629     long time = 100;
630     ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
631     ee.setValue(time);
632     EnvironmentEdgeManagerTestHelper.injectEdge(ee);
633     long newValue = 3L;
634     long size = 0;
635 
636 
637     size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
638         System.currentTimeMillis(), Bytes.toBytes(newValue)));
639     size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
640         System.currentTimeMillis(), Bytes.toBytes(newValue)));
641     size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
642         System.currentTimeMillis(), Bytes.toBytes(newValue)));
643     size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
644         System.currentTimeMillis(), Bytes.toBytes(newValue)));
645     size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
646         System.currentTimeMillis(), Bytes.toBytes(newValue)));
647 
648 
649     for ( int i = 0 ; i < 10000 ; ++i) {
650       newValue++;
651 
652       long ret = this.store.updateColumnValue(row, family, qf1, newValue);
653       long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
654 
655       if (ret != 0) System.out.println("ret: " + ret);
656       if (ret2 != 0) System.out.println("ret2: " + ret2);
657 
658       Assert.assertTrue("ret: " + ret, ret >= 0);
659       size += ret;
660       Assert.assertTrue("ret2: " + ret2, ret2 >= 0);
661       size += ret2;
662 
663 
664       if (i % 1000 == 0)
665         ee.setValue(++time);
666     }
667 
668     long computedSize=0;
669     for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) {
670       long kvsize = DefaultMemStore.heapSizeChange(cell, true);
671       //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
672       computedSize += kvsize;
673     }
674     Assert.assertEquals(computedSize, size);
675   }
676 
677   @Test
678   public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
679     ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
680     EnvironmentEdgeManagerTestHelper.injectEdge(mee);
681     init(this.name.getMethodName());
682 
683     long oldValue = 1L;
684     long newValue = 3L;
685     this.store.add(new KeyValue(row, family, qf1,
686         EnvironmentEdgeManager.currentTime(),
687         Bytes.toBytes(oldValue)));
688 
689     // snapshot the store.
690     this.store.snapshot();
691 
692     // update during the snapshot, the exact same TS as the Put (lololol)
693     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
694 
695     // memstore should have grown by some amount.
696     Assert.assertTrue(ret > 0);
697 
698     // then flush.
699     flushStore(store, id++);
700     Assert.assertEquals(1, this.store.getStorefiles().size());
701     Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size());
702 
703     // now increment again:
704     newValue += 1;
705     this.store.updateColumnValue(row, family, qf1, newValue);
706 
707     // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
708     newValue += 1;
709     this.store.updateColumnValue(row, family, qf1, newValue);
710 
711     // the second TS should be TS=2 or higher., even though 'time=1' right now.
712 
713 
714     // how many key/values for this row are there?
715     Get get = new Get(row);
716     get.addColumn(family, qf1);
717     get.setMaxVersions(); // all versions.
718     List<Cell> results = new ArrayList<Cell>();
719 
720     results = HBaseTestingUtility.getFromStoreFile(store, get);
721     Assert.assertEquals(2, results.size());
722 
723     long ts1 = results.get(0).getTimestamp();
724     long ts2 = results.get(1).getTimestamp();
725 
726     Assert.assertTrue(ts1 > ts2);
727     Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
728     Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
729 
730     mee.setValue(2); // time goes up slightly
731     newValue += 1;
732     this.store.updateColumnValue(row, family, qf1, newValue);
733 
734     results = HBaseTestingUtility.getFromStoreFile(store, get);
735     Assert.assertEquals(2, results.size());
736 
737     ts1 = results.get(0).getTimestamp();
738     ts2 = results.get(1).getTimestamp();
739 
740     Assert.assertTrue(ts1 > ts2);
741     Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
742     Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
743   }
744 
745   @Test
746   public void testHandleErrorsInFlush() throws Exception {
747     LOG.info("Setting up a faulty file system that cannot write");
748 
749     final Configuration conf = HBaseConfiguration.create();
750     User user = User.createUserForTesting(conf,
751         "testhandleerrorsinflush", new String[]{"foo"});
752     // Inject our faulty LocalFileSystem
753     conf.setClass("fs.file.impl", FaultyFileSystem.class,
754         FileSystem.class);
755     user.runAs(new PrivilegedExceptionAction<Object>() {
756       @Override
757       public Object run() throws Exception {
758         // Make sure it worked (above is sensitive to caching details in hadoop core)
759         FileSystem fs = FileSystem.get(conf);
760         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
761 
762         // Initialize region
763         init(name.getMethodName(), conf);
764 
765         LOG.info("Adding some data");
766         store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
767         store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
768         store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
769 
770         LOG.info("Before flush, we should have no files");
771 
772         Collection<StoreFileInfo> files =
773           store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
774         Assert.assertEquals(0, files != null ? files.size() : 0);
775 
776         //flush
777         try {
778           LOG.info("Flushing");
779           flush(1);
780           Assert.fail("Didn't bubble up IOE!");
781         } catch (IOException ioe) {
782           Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
783         }
784 
785         LOG.info("After failed flush, we should still have no files!");
786         files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
787         Assert.assertEquals(0, files != null ? files.size() : 0);
788         store.getHRegion().getWAL().close();
789         return null;
790       }
791     });
792     FileSystem.closeAllForUGI(user.getUGI());
793   }
794 
795   /**
796    * Faulty file system that will fail if you write past its fault position the FIRST TIME
797    * only; thereafter it will succeed.  Used by {@link TestHRegion} too.
798    */
799   static class FaultyFileSystem extends FilterFileSystem {
800     List<SoftReference<FaultyOutputStream>> outStreams =
801       new ArrayList<SoftReference<FaultyOutputStream>>();
802     private long faultPos = 200;
803     AtomicBoolean fault = new AtomicBoolean(true);
804 
805     public FaultyFileSystem() {
806       super(new LocalFileSystem());
807       System.err.println("Creating faulty!");
808     }
809 
810     @Override
811     public FSDataOutputStream create(Path p) throws IOException {
812       return new FaultyOutputStream(super.create(p), faultPos, fault);
813     }
814 
815     @Override
816     public FSDataOutputStream create(Path f, FsPermission permission,
817         boolean overwrite, int bufferSize, short replication, long blockSize,
818         Progressable progress) throws IOException {
819       return new FaultyOutputStream(super.create(f, permission,
820           overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
821     }
822 
823     @Override
824     public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
825         int bufferSize, short replication, long blockSize, Progressable progress)
826     throws IOException {
827       // Fake it.  Call create instead.  The default implementation throws an IOE
828       // that this is not supported.
829       return create(f, overwrite, bufferSize, replication, blockSize, progress);
830     }
831   }
832 
833   static class FaultyOutputStream extends FSDataOutputStream {
834     volatile long faultPos = Long.MAX_VALUE;
835     private final AtomicBoolean fault;
836 
837     public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
838     throws IOException {
839       super(out, null);
840       this.faultPos = faultPos;
841       this.fault = fault;
842     }
843 
844     @Override
845     public void write(byte[] buf, int offset, int length) throws IOException {
846       System.err.println("faulty stream write at pos " + getPos());
847       injectFault();
848       super.write(buf, offset, length);
849     }
850 
851     private void injectFault() throws IOException {
852       if (this.fault.get() && getPos() >= faultPos) {
853         throw new IOException("Fault injected");
854       }
855     }
856   }
857 
858   private static void flushStore(HStore store, long id) throws IOException {
859     StoreFlushContext storeFlushCtx = store.createFlushContext(id);
860     storeFlushCtx.prepare();
861     storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
862     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
863   }
864 
865   /**
866    * Generate a list of KeyValues for testing based on given parameters
867    * @param timestamps
868    * @param numRows
869    * @param qualifier
870    * @param family
871    * @return
872    */
873   List<Cell> getKeyValueSet(long[] timestamps, int numRows,
874       byte[] qualifier, byte[] family) {
875     List<Cell> kvList = new ArrayList<Cell>();
876     for (int i=1;i<=numRows;i++) {
877       byte[] b = Bytes.toBytes(i);
878       for (long timestamp: timestamps) {
879         kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
880       }
881     }
882     return kvList;
883   }
884 
885   /**
886    * Test to ensure correctness when using Stores with multiple timestamps
887    * @throws IOException
888    */
889   @Test
890   public void testMultipleTimestamps() throws IOException {
891     int numRows = 1;
892     long[] timestamps1 = new long[] {1,5,10,20};
893     long[] timestamps2 = new long[] {30,80};
894 
895     init(this.name.getMethodName());
896 
897     List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
898     for (Cell kv : kvList1) {
899       this.store.add(KeyValueUtil.ensureKeyValue(kv));
900     }
901 
902     this.store.snapshot();
903     flushStore(store, id++);
904 
905     List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
906     for(Cell kv : kvList2) {
907       this.store.add(KeyValueUtil.ensureKeyValue(kv));
908     }
909 
910     List<Cell> result;
911     Get get = new Get(Bytes.toBytes(1));
912     get.addColumn(family,qf1);
913 
914     get.setTimeRange(0,15);
915     result = HBaseTestingUtility.getFromStoreFile(store, get);
916     Assert.assertTrue(result.size()>0);
917 
918     get.setTimeRange(40,90);
919     result = HBaseTestingUtility.getFromStoreFile(store, get);
920     Assert.assertTrue(result.size()>0);
921 
922     get.setTimeRange(10,45);
923     result = HBaseTestingUtility.getFromStoreFile(store, get);
924     Assert.assertTrue(result.size()>0);
925 
926     get.setTimeRange(80,145);
927     result = HBaseTestingUtility.getFromStoreFile(store, get);
928     Assert.assertTrue(result.size()>0);
929 
930     get.setTimeRange(1,2);
931     result = HBaseTestingUtility.getFromStoreFile(store, get);
932     Assert.assertTrue(result.size()>0);
933 
934     get.setTimeRange(90,200);
935     result = HBaseTestingUtility.getFromStoreFile(store, get);
936     Assert.assertTrue(result.size()==0);
937   }
938 
939   /**
940    * Test for HBASE-3492 - Test split on empty colfam (no store files).
941    *
942    * @throws IOException When the IO operations fail.
943    */
944   @Test
945   public void testSplitWithEmptyColFam() throws IOException {
946     init(this.name.getMethodName());
947     Assert.assertNull(store.getSplitPoint());
948     store.getHRegion().forceSplit(null);
949     Assert.assertNull(store.getSplitPoint());
950     store.getHRegion().clearSplit();
951   }
952 
953   @Test
954   public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
955     final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
956     long anyValue = 10;
957 
958     // We'll check that it uses correct config and propagates it appropriately by going thru
959     // the simplest "real" path I can find - "throttleCompaction", which just checks whether
960     // a number we pass in is higher than some config value, inside compactionPolicy.
961     Configuration conf = HBaseConfiguration.create();
962     conf.setLong(CONFIG_KEY, anyValue);
963     init(name.getMethodName() + "-xml", conf);
964     Assert.assertTrue(store.throttleCompaction(anyValue + 1));
965     Assert.assertFalse(store.throttleCompaction(anyValue));
966 
967     // HTD overrides XML.
968     --anyValue;
969     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
970     HColumnDescriptor hcd = new HColumnDescriptor(family);
971     htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
972     init(name.getMethodName() + "-htd", conf, htd, hcd);
973     Assert.assertTrue(store.throttleCompaction(anyValue + 1));
974     Assert.assertFalse(store.throttleCompaction(anyValue));
975 
976     // HCD overrides them both.
977     --anyValue;
978     hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
979     init(name.getMethodName() + "-hcd", conf, htd, hcd);
980     Assert.assertTrue(store.throttleCompaction(anyValue + 1));
981     Assert.assertFalse(store.throttleCompaction(anyValue));
982   }
983 
984   public static class DummyStoreEngine extends DefaultStoreEngine {
985     public static DefaultCompactor lastCreatedCompactor = null;
986     @Override
987     protected void createComponents(
988         Configuration conf, Store store, KVComparator comparator) throws IOException {
989       super.createComponents(conf, store, comparator);
990       lastCreatedCompactor = this.compactor;
991     }
992   }
993 
994   @Test
995   public void testStoreUsesSearchEngineOverride() throws Exception {
996     Configuration conf = HBaseConfiguration.create();
997     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
998     init(this.name.getMethodName(), conf);
999     Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor,
1000       this.store.storeEngine.getCompactor());
1001   }
1002 
1003   private void addStoreFile() throws IOException {
1004     StoreFile f = this.store.getStorefiles().iterator().next();
1005     Path storedir = f.getPath().getParent();
1006     long seqid = this.store.getMaxSequenceId();
1007     Configuration c = TEST_UTIL.getConfiguration();
1008     FileSystem fs = FileSystem.get(c);
1009     HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1010     StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
1011         fs)
1012             .withOutputDir(storedir)
1013             .withFileContext(fileContext)
1014             .build();
1015     w.appendMetadata(seqid + 1, false);
1016     w.close();
1017     LOG.info("Added store file:" + w.getPath());
1018   }
1019 
1020   private void archiveStoreFile(int index) throws IOException {
1021     Collection<StoreFile> files = this.store.getStorefiles();
1022     StoreFile sf = null;
1023     Iterator<StoreFile> it = files.iterator();
1024     for (int i = 0; i <= index; i++) {
1025       sf = it.next();
1026     }
1027     store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
1028   }
1029 
1030   @Test
1031   public void testRefreshStoreFiles() throws Exception {
1032     init(name.getMethodName());
1033 
1034     assertEquals(0, this.store.getStorefilesCount());
1035 
1036     // add some data, flush
1037     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
1038     flush(1);
1039     assertEquals(1, this.store.getStorefilesCount());
1040 
1041     // add one more file
1042     addStoreFile();
1043 
1044     assertEquals(1, this.store.getStorefilesCount());
1045     store.refreshStoreFiles();
1046     assertEquals(2, this.store.getStorefilesCount());
1047 
1048     // add three more files
1049     addStoreFile();
1050     addStoreFile();
1051     addStoreFile();
1052 
1053     assertEquals(2, this.store.getStorefilesCount());
1054     store.refreshStoreFiles();
1055     assertEquals(5, this.store.getStorefilesCount());
1056 
1057     archiveStoreFile(0);
1058 
1059     assertEquals(5, this.store.getStorefilesCount());
1060     store.refreshStoreFiles();
1061     assertEquals(4, this.store.getStorefilesCount());
1062 
1063     archiveStoreFile(0);
1064     archiveStoreFile(1);
1065     archiveStoreFile(2);
1066 
1067     assertEquals(4, this.store.getStorefilesCount());
1068     store.refreshStoreFiles();
1069     assertEquals(1, this.store.getStorefilesCount());
1070 
1071     archiveStoreFile(0);
1072     store.refreshStoreFiles();
1073     assertEquals(0, this.store.getStorefilesCount());
1074   }
1075 
1076   @SuppressWarnings("unchecked")
1077   @Test
1078   public void testRefreshStoreFilesNotChanged() throws IOException {
1079     init(name.getMethodName());
1080 
1081     assertEquals(0, this.store.getStorefilesCount());
1082 
1083     // add some data, flush
1084     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
1085     flush(1);
1086     // add one more file
1087     addStoreFile();
1088 
1089     HStore spiedStore = spy(store);
1090 
1091     // call first time after files changed
1092     spiedStore.refreshStoreFiles();
1093     assertEquals(2, this.store.getStorefilesCount());
1094     verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class));
1095 
1096     // call second time
1097     spiedStore.refreshStoreFiles();
1098 
1099     //ensure that replaceStoreFiles is not called if files are not refreshed
1100     verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1101   }
1102 }