1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.testclassification.MediumTests;
32 import org.apache.hadoop.hbase.MiniHBaseCluster;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.client.Admin;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.regionserver.HRegion;
40 import org.apache.hadoop.hbase.regionserver.Region;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.JVMClusterUtil;
43 import org.junit.AfterClass;
44 import org.junit.BeforeClass;
45 import org.junit.Test;
46 import org.junit.experimental.categories.Category;
47
48 @Category(MediumTests.class)
49 public class TestAssignmentListener {
50 private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class);
51
52 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
53
54 static class DummyListener {
55 protected AtomicInteger modified = new AtomicInteger(0);
56
57 public void awaitModifications(int count) throws InterruptedException {
58 while (!modified.compareAndSet(count, 0)) {
59 Thread.sleep(100);
60 }
61 }
62 }
63
64 static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
65 private AtomicInteger closeCount = new AtomicInteger(0);
66 private AtomicInteger openCount = new AtomicInteger(0);
67
68 public DummyAssignmentListener() {
69 }
70
71 @Override
72 public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) {
73 LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
74 openCount.incrementAndGet();
75 modified.incrementAndGet();
76 }
77
78 @Override
79 public void regionClosed(final HRegionInfo regionInfo) {
80 LOG.info("Assignment close region=" + regionInfo);
81 closeCount.incrementAndGet();
82 modified.incrementAndGet();
83 }
84
85 public void reset() {
86 openCount.set(0);
87 closeCount.set(0);
88 }
89
90 public int getLoadCount() {
91 return openCount.get();
92 }
93
94 public int getCloseCount() {
95 return closeCount.get();
96 }
97 }
98
99 static class DummyServerListener extends DummyListener implements ServerListener {
100 private AtomicInteger removedCount = new AtomicInteger(0);
101 private AtomicInteger addedCount = new AtomicInteger(0);
102
103 public DummyServerListener() {
104 }
105
106 @Override
107 public void serverAdded(final ServerName serverName) {
108 LOG.info("Server added " + serverName);
109 addedCount.incrementAndGet();
110 modified.incrementAndGet();
111 }
112
113 @Override
114 public void serverRemoved(final ServerName serverName) {
115 LOG.info("Server removed " + serverName);
116 removedCount.incrementAndGet();
117 modified.incrementAndGet();
118 }
119
120 public void reset() {
121 addedCount.set(0);
122 removedCount.set(0);
123 }
124
125 public int getAddedCount() {
126 return addedCount.get();
127 }
128
129 public int getRemovedCount() {
130 return removedCount.get();
131 }
132 }
133
134 @BeforeClass
135 public static void beforeAllTests() throws Exception {
136 TEST_UTIL.startMiniCluster(2);
137 }
138
139 @AfterClass
140 public static void afterAllTests() throws Exception {
141 TEST_UTIL.shutdownMiniCluster();
142 }
143
144 @Test(timeout=60000)
145 public void testServerListener() throws IOException, InterruptedException {
146 ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
147
148 DummyServerListener listener = new DummyServerListener();
149 serverManager.registerListener(listener);
150 try {
151 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
152
153
154 miniCluster.startRegionServer();
155 listener.awaitModifications(1);
156 assertEquals(1, listener.getAddedCount());
157 assertEquals(0, listener.getRemovedCount());
158
159
160 listener.reset();
161 miniCluster.startRegionServer();
162 listener.awaitModifications(1);
163 assertEquals(1, listener.getAddedCount());
164 assertEquals(0, listener.getRemovedCount());
165
166 int nrs = miniCluster.getRegionServerThreads().size();
167
168
169 listener.reset();
170 miniCluster.stopRegionServer(nrs - 1);
171 listener.awaitModifications(1);
172 assertEquals(0, listener.getAddedCount());
173 assertEquals(1, listener.getRemovedCount());
174
175
176 listener.reset();
177 miniCluster.stopRegionServer(nrs - 2);
178 listener.awaitModifications(1);
179 assertEquals(0, listener.getAddedCount());
180 assertEquals(1, listener.getRemovedCount());
181 } finally {
182 serverManager.unregisterListener(listener);
183 }
184 }
185
186 @Test(timeout=60000)
187 public void testAssignmentListener() throws IOException, InterruptedException {
188 AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
189 Admin admin = TEST_UTIL.getHBaseAdmin();
190
191 DummyAssignmentListener listener = new DummyAssignmentListener();
192 am.registerListener(listener);
193 try {
194 final String TABLE_NAME_STR = "testtb";
195 final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
196 final byte[] FAMILY = Bytes.toBytes("cf");
197
198
199 LOG.info("Create Table");
200 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
201 listener.awaitModifications(1);
202 assertEquals(1, listener.getLoadCount());
203 assertEquals(0, listener.getCloseCount());
204
205
206 Table table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
207 try {
208 for (int i = 0; i < 10; ++i) {
209 byte[] key = Bytes.toBytes("row-" + i);
210 Put put = new Put(key);
211 put.add(FAMILY, null, key);
212 table.put(put);
213 }
214 } finally {
215 table.close();
216 }
217
218
219 LOG.info("Split Table");
220 listener.reset();
221 admin.split(TABLE_NAME, Bytes.toBytes("row-3"));
222 listener.awaitModifications(3);
223 assertEquals(2, listener.getLoadCount());
224 assertEquals(1, listener.getCloseCount());
225
226
227 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
228 int mergeable = 0;
229 while (mergeable < 2) {
230 Thread.sleep(100);
231 admin.majorCompact(TABLE_NAME);
232 mergeable = 0;
233 for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
234 for (Region region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) {
235 mergeable += ((HRegion)region).isMergeable() ? 1 : 0;
236 }
237 }
238 }
239
240
241 LOG.info("Merge Regions");
242 listener.reset();
243 List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
244 assertEquals(2, regions.size());
245 admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
246 regions.get(1).getEncodedNameAsBytes(), true);
247 listener.awaitModifications(3);
248 assertEquals(1, admin.getTableRegions(TABLE_NAME).size());
249 assertEquals(1, listener.getLoadCount());
250 assertEquals(2, listener.getCloseCount());
251
252
253 LOG.info("Drop Table");
254 listener.reset();
255 TEST_UTIL.deleteTable(TABLE_NAME);
256 listener.awaitModifications(1);
257 assertEquals(0, listener.getLoadCount());
258 assertEquals(1, listener.getCloseCount());
259 } finally {
260 am.unregisterListener(listener);
261 }
262 }
263 }