1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.Random;
26 import java.util.SortedMap;
27 import java.util.TreeMap;
28
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.testclassification.MediumTests;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.Delete;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.client.Table;
35 import org.apache.hadoop.hbase.master.HMaster;
36 import org.apache.hadoop.hbase.protobuf.RequestConverter;
37 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
38 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
39 import org.apache.hadoop.hbase.regionserver.HRegionServer;
40 import org.apache.hadoop.hbase.regionserver.Region;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46
47 import com.google.common.collect.Lists;
48 import com.google.protobuf.ServiceException;
49
50 @Category(MediumTests.class)
51 public class TestWALFiltering {
52 private static final int NUM_MASTERS = 1;
53 private static final int NUM_RS = 4;
54
55 private static final TableName TABLE_NAME =
56 TableName.valueOf("TestWALFiltering");
57 private static final byte[] CF1 = Bytes.toBytes("MyCF1");
58 private static final byte[] CF2 = Bytes.toBytes("MyCF2");
59 private static final byte[][] FAMILIES = { CF1, CF2 };
60
61 private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
62
63 @Before
64 public void setUp() throws Exception {
65 TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
66 fillTable();
67 }
68
69 @After
70 public void tearDown() throws Exception {
71 TEST_UTIL.shutdownMiniCluster();
72 }
73
74 private void fillTable() throws IOException, InterruptedException {
75 Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3,
76 Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS);
77 Random rand = new Random(19387129L);
78 for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) {
79 for (int iRow = 0; iRow < 100; ++iRow) {
80 final byte[] row = Bytes.toBytes("row" + iRow);
81 Put put = new Put(row);
82 Delete del = new Delete(row);
83 for (int iCol = 0; iCol < 10; ++iCol) {
84 final byte[] cf = rand.nextBoolean() ? CF1 : CF2;
85 final long ts = Math.abs(rand.nextInt());
86 final byte[] qual = Bytes.toBytes("col" + iCol);
87 if (rand.nextBoolean()) {
88 final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
89 "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
90 ts + "_random_" + rand.nextLong());
91 put.add(cf, qual, ts, value);
92 } else if (rand.nextDouble() < 0.8) {
93 del.addColumn(cf, qual, ts);
94 } else {
95 del.addColumn(cf, qual, ts);
96 }
97 }
98 table.put(put);
99 table.delete(del);
100 }
101 }
102 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
103 }
104
105 @Test
106 public void testFlushedSequenceIdsSentToHMaster()
107 throws IOException, InterruptedException, ServiceException {
108 SortedMap<byte[], Long> allFlushedSequenceIds =
109 new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
110 for (int i = 0; i < NUM_RS; ++i) {
111 flushAllRegions(i);
112 }
113 Thread.sleep(10000);
114 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
115 for (int i = 0; i < NUM_RS; ++i) {
116 for (byte[] regionName : getRegionsByServer(i)) {
117 if (allFlushedSequenceIds.containsKey(regionName)) {
118 GetLastFlushedSequenceIdRequest req =
119 RequestConverter.buildGetLastFlushedSequenceIdRequest(regionName);
120
121 assertEquals((long)allFlushedSequenceIds.get(regionName),
122 master.getMasterRpcServices().getLastFlushedSequenceId(
123 null, req).getLastFlushedSequenceId());
124 }
125 }
126 }
127 }
128
129 private List<byte[]> getRegionsByServer(int rsId) throws IOException {
130 List<byte[]> regionNames = Lists.newArrayList();
131 HRegionServer hrs = getRegionServer(rsId);
132 for (Region r : hrs.getOnlineRegions(TABLE_NAME)) {
133 regionNames.add(r.getRegionInfo().getRegionName());
134 }
135 return regionNames;
136 }
137
138 private HRegionServer getRegionServer(int rsId) {
139 return TEST_UTIL.getMiniHBaseCluster().getRegionServer(rsId);
140 }
141
142 private void flushAllRegions(int rsId)
143 throws ServiceException, IOException {
144 HRegionServer hrs = getRegionServer(rsId);
145 for (byte[] regionName : getRegionsByServer(rsId)) {
146 FlushRegionRequest request =
147 RequestConverter.buildFlushRegionRequest(regionName);
148 hrs.getRSRpcServices().flushRegion(null, request);
149 }
150 }
151
152 }