View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.apache.hadoop.hbase.client.Table;
29  import org.apache.hadoop.hbase.util.ByteStringer;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.testclassification.MediumTests;
39  import org.apache.hadoop.hbase.Stoppable;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.client.Get;
42  import org.apache.hadoop.hbase.client.Result;
43  import org.apache.hadoop.hbase.client.ResultScanner;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
46  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
47  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.junit.AfterClass;
50  import org.junit.Before;
51  import org.junit.BeforeClass;
52  import org.junit.Test;
53  import org.junit.experimental.categories.Category;
54  
55  @Category(MediumTests.class)
56  public class TestReplicationSink {
57    private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
58    private static final int BATCH_SIZE = 10;
59  
60    private final static HBaseTestingUtility TEST_UTIL =
61        new HBaseTestingUtility();
62  
63    private static ReplicationSink SINK;
64  
65    private static final TableName TABLE_NAME1 =
66        TableName.valueOf("table1");
67    private static final TableName TABLE_NAME2 =
68        TableName.valueOf("table2");
69  
70    private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
71    private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
72  
73    private static Table table1;
74    private static Stoppable STOPPABLE = new Stoppable() {
75      final AtomicBoolean stop = new AtomicBoolean(false);
76  
77      @Override
78      public boolean isStopped() {
79        return this.stop.get();
80      }
81  
82      @Override
83      public void stop(String why) {
84        LOG.info("STOPPING BECAUSE: " + why);
85        this.stop.set(true);
86      }
87      
88    };
89  
90    private static Table table2;
91  
92     /**
93     * @throws java.lang.Exception
94     */
95    @BeforeClass
96    public static void setUpBeforeClass() throws Exception {
97      TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
98      TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
99          HConstants.REPLICATION_ENABLE_DEFAULT);
100     TEST_UTIL.startMiniCluster(3);
101     SINK =
102       new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
103     table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
104     table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
105   }
106 
107   /**
108    * @throws java.lang.Exception
109    */
110   @AfterClass
111   public static void tearDownAfterClass() throws Exception {
112     STOPPABLE.stop("Shutting down");
113     TEST_UTIL.shutdownMiniCluster();
114   }
115 
116   /**
117    * @throws java.lang.Exception
118    */
119   @Before
120   public void setUp() throws Exception {
121     table1 = TEST_UTIL.deleteTableData(TABLE_NAME1);
122     table2 = TEST_UTIL.deleteTableData(TABLE_NAME2);
123   }
124 
125   /**
126    * Insert a whole batch of entries
127    * @throws Exception
128    */
129   @Test
130   public void testBatchSink() throws Exception {
131     List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE);
132     List<Cell> cells = new ArrayList<Cell>();
133     for(int i = 0; i < BATCH_SIZE; i++) {
134       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
135     }
136     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
137     Scan scan = new Scan();
138     ResultScanner scanRes = table1.getScanner(scan);
139     assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
140   }
141 
142   /**
143    * Insert a mix of puts and deletes
144    * @throws Exception
145    */
146   @Test
147   public void testMixedPutDelete() throws Exception {
148     List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
149     List<Cell> cells = new ArrayList<Cell>();
150     for(int i = 0; i < BATCH_SIZE/2; i++) {
151       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
152     }
153     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells));
154 
155     entries = new ArrayList<WALEntry>(BATCH_SIZE);
156     cells = new ArrayList<Cell>();
157     for(int i = 0; i < BATCH_SIZE; i++) {
158       entries.add(createEntry(TABLE_NAME1, i,
159           i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
160     }
161 
162     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
163     Scan scan = new Scan();
164     ResultScanner scanRes = table1.getScanner(scan);
165     assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
166   }
167 
168   /**
169    * Insert to 2 different tables
170    * @throws Exception
171    */
172   @Test
173   public void testMixedPutTables() throws Exception {
174     List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
175     List<Cell> cells = new ArrayList<Cell>();
176     for(int i = 0; i < BATCH_SIZE; i++) {
177       entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
178               i, KeyValue.Type.Put, cells));
179     }
180 
181     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
182     Scan scan = new Scan();
183     ResultScanner scanRes = table2.getScanner(scan);
184     for(Result res : scanRes) {
185       assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
186     }
187   }
188 
189   /**
190    * Insert then do different types of deletes
191    * @throws Exception
192    */
193   @Test
194   public void testMixedDeletes() throws Exception {
195     List<WALEntry> entries = new ArrayList<WALEntry>(3);
196     List<Cell> cells = new ArrayList<Cell>();
197     for(int i = 0; i < 3; i++) {
198       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
199     }
200     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
201     entries = new ArrayList<WALEntry>(3);
202     cells = new ArrayList<Cell>();
203     entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
204     entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
205     entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
206 
207     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
208 
209     Scan scan = new Scan();
210     ResultScanner scanRes = table1.getScanner(scan);
211     assertEquals(0, scanRes.next(3).length);
212   }
213 
214   /**
215    * Puts are buffered, but this tests when a delete (not-buffered) is applied
216    * before the actual Put that creates it.
217    * @throws Exception
218    */
219   @Test
220   public void testApplyDeleteBeforePut() throws Exception {
221     List<WALEntry> entries = new ArrayList<WALEntry>(5);
222     List<Cell> cells = new ArrayList<Cell>();
223     for(int i = 0; i < 2; i++) {
224       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
225     }
226     entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
227     for(int i = 3; i < 5; i++) {
228       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
229     }
230     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
231     Get get = new Get(Bytes.toBytes(1));
232     Result res = table1.get(get);
233     assertEquals(0, res.size());
234   }
235 
236   private WALEntry createEntry(TableName table, int row,  KeyValue.Type type, List<Cell> cells) {
237     byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
238     byte[] rowBytes = Bytes.toBytes(row);
239     // Just make sure we don't get the same ts for two consecutive rows with
240     // same key
241     try {
242       Thread.sleep(1);
243     } catch (InterruptedException e) {
244       LOG.info("Was interrupted while sleep, meh", e);
245     }
246     final long now = System.currentTimeMillis();
247     KeyValue kv = null;
248     if(type.getCode() == KeyValue.Type.Put.getCode()) {
249       kv = new KeyValue(rowBytes, fam, fam, now,
250           KeyValue.Type.Put, Bytes.toBytes(row));
251     } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
252         kv = new KeyValue(rowBytes, fam, fam,
253             now, KeyValue.Type.DeleteColumn);
254     } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
255         kv = new KeyValue(rowBytes, fam, null,
256             now, KeyValue.Type.DeleteFamily);
257     }
258     WALEntry.Builder builder = WALEntry.newBuilder();
259     builder.setAssociatedCellCount(1);
260     WALKey.Builder keyBuilder = WALKey.newBuilder();
261     UUID.Builder uuidBuilder = UUID.newBuilder();
262     uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
263     uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
264     keyBuilder.setClusterId(uuidBuilder.build());
265     keyBuilder.setTableName(ByteStringer.wrap(table.getName()));
266     keyBuilder.setWriteTime(now);
267     keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
268     keyBuilder.setLogSequenceNumber(-1);
269     builder.setKey(keyBuilder.build());
270     cells.add(kv);
271 
272     return builder.build();
273   }
274 
275 }