View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.hamcrest.CoreMatchers.is;
22  import static org.hamcrest.CoreMatchers.not;
23  import static org.junit.Assert.assertEquals;
24  import static org.junit.Assert.assertThat;
25  import static org.junit.Assert.assertTrue;
26  import static org.mockito.Mockito.mock;
27  import static org.mockito.Mockito.when;
28  
29  import java.util.List;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.ChoreService;
36  import org.apache.hadoop.hbase.CoordinatedStateManager;
37  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.testclassification.MediumTests;
42  import org.apache.hadoop.hbase.Server;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.SplitLogCounters;
45  import org.apache.hadoop.hbase.SplitLogTask;
46  import org.apache.hadoop.hbase.Waiter;
47  import org.apache.hadoop.hbase.client.ClusterConnection;
48  import org.apache.hadoop.hbase.executor.ExecutorService;
49  import org.apache.hadoop.hbase.executor.ExecutorType;
50  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
51  import org.apache.hadoop.hbase.util.CancelableProgressable;
52  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
53  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
54  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
55  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
56  import org.apache.log4j.Level;
57  import org.apache.log4j.Logger;
58  import org.apache.zookeeper.CreateMode;
59  import org.apache.zookeeper.ZooDefs.Ids;
60  import org.junit.After;
61  import org.junit.Before;
62  import org.junit.Test;
63  import org.junit.experimental.categories.Category;
64  
65  @Category(MediumTests.class)
66  public class TestSplitLogWorker {
67    private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
68    private static final int WAIT_TIME = 15000;
69    private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
70    static {
71      Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
72    }
73    private final static HBaseTestingUtility TEST_UTIL =
74      new HBaseTestingUtility();
75    private DummyServer ds;
76    private ZooKeeperWatcher zkw;
77    private SplitLogWorker slw;
78    private ExecutorService executorService;
79    private RecoveryMode mode;
80  
81    class DummyServer implements Server {
82      private ZooKeeperWatcher zkw;
83      private Configuration conf;
84      private CoordinatedStateManager cm;
85  
86      public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
87        this.zkw = zkw;
88        this.conf = conf;
89        cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
90        cm.initialize(this);
91      }
92  
93      @Override
94      public void abort(String why, Throwable e) {
95      }
96  
97      @Override
98      public boolean isAborted() {
99        return false;
100     }
101 
102     @Override
103     public void stop(String why) {
104     }
105 
106     @Override
107     public boolean isStopped() {
108       return false;
109     }
110 
111     @Override
112     public Configuration getConfiguration() {
113       return conf;
114     }
115 
116     @Override
117     public ZooKeeperWatcher getZooKeeper() {
118       return zkw;
119     }
120 
121     @Override
122     public ServerName getServerName() {
123       return null;
124     }
125 
126     @Override
127     public CoordinatedStateManager getCoordinatedStateManager() {
128       return cm;
129     }
130 
131     @Override
132     public ClusterConnection getConnection() {
133       return null;
134     }
135 
136     @Override
137     public MetaTableLocator getMetaTableLocator() {
138       return null;
139     }
140 
141     @Override
142     public ChoreService getChoreService() {
143       return null;
144     }
145   }
146 
147   private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
148       throws Exception {
149     assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
150       waitForCounterBoolean(ctr, oldval, newval, timems));
151   }
152 
153   private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
154       long timems) throws Exception {
155 
156     return waitForCounterBoolean(ctr, oldval, newval, timems, true);
157   }
158 
159   private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval,
160       long timems, boolean failIfTimeout) throws Exception {
161 
162     long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
163       new Waiter.Predicate<Exception>() {
164       @Override
165       public boolean evaluate() throws Exception {
166             return (ctr.get() >= newval);
167       }
168     });
169 
170     if( timeWaited > 0) {
171       // when not timed out
172       assertEquals(newval, ctr.get());
173     }
174     return true;
175   }
176 
177   @Before
178   public void setup() throws Exception {
179     TEST_UTIL.startMiniZKCluster();
180     Configuration conf = TEST_UTIL.getConfiguration();
181     zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
182         "split-log-worker-tests", null);
183     ds = new DummyServer(zkw, conf);
184     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
185     ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
186     assertThat(ZKUtil.checkExists(zkw, zkw.baseZNode), not (is(-1)));
187     LOG.debug(zkw.baseZNode + " created");
188     ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
189     assertThat(ZKUtil.checkExists(zkw, zkw.splitLogZNode), not (is(-1)));
190 
191     LOG.debug(zkw.splitLogZNode + " created");
192     ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
193     assertThat(ZKUtil.checkExists(zkw, zkw.rsZNode), not (is(-1)));
194 
195     SplitLogCounters.resetCounters();
196     executorService = new ExecutorService("TestSplitLogWorker");
197     executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
198     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
199         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
200   }
201 
202   @After
203   public void teardown() throws Exception {
204     if (executorService != null) {
205       executorService.shutdown();
206     }
207     TEST_UTIL.shutdownMiniZKCluster();
208   }
209 
210   SplitLogWorker.TaskExecutor neverEndingTask =
211     new SplitLogWorker.TaskExecutor() {
212 
213       @Override
214       public Status exec(String name, RecoveryMode mode, CancelableProgressable p) {
215         while (true) {
216           try {
217             Thread.sleep(1000);
218           } catch (InterruptedException e) {
219             return Status.PREEMPTED;
220           }
221           if (!p.progress()) {
222             return Status.PREEMPTED;
223           }
224         }
225       }
226 
227   };
228 
229   @Test(timeout=60000)
230   public void testAcquireTaskAtStartup() throws Exception {
231     LOG.info("testAcquireTaskAtStartup");
232     SplitLogCounters.resetCounters();
233     final String TATAS = "tatas";
234     final ServerName RS = ServerName.valueOf("rs,1,1");
235     RegionServerServices mockedRS = getRegionServer(RS);
236     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
237       new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
238         Ids.OPEN_ACL_UNSAFE,
239         CreateMode.PERSISTENT);
240 
241     SplitLogWorker slw =
242         new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
243     slw.start();
244     try {
245       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
246       byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
247       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
248       assertTrue(slt.isOwned(RS));
249     } finally {
250      stopSplitLogWorker(slw);
251     }
252   }
253 
254   private void stopSplitLogWorker(final SplitLogWorker slw)
255   throws InterruptedException {
256     if (slw != null) {
257       slw.stop();
258       slw.worker.join(WAIT_TIME);
259       if (slw.worker.isAlive()) {
260         assertTrue(("Could not stop the worker thread slw=" + slw) == null);
261       }
262     }
263   }
264 
265   @Test(timeout=60000)
266   public void testRaceForTask() throws Exception {
267     LOG.info("testRaceForTask");
268     SplitLogCounters.resetCounters();
269     final String TRFT = "trft";
270     final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
271     final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
272     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
273       new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
274         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
275     RegionServerServices mockedRS1 = getRegionServer(SVR1);
276     RegionServerServices mockedRS2 = getRegionServer(SVR2);
277     SplitLogWorker slw1 =
278         new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
279     SplitLogWorker slw2 =
280         new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
281     slw1.start();
282     slw2.start();
283     try {
284       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
285       // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
286       // not it, that we fell through to the next counter in line and it was set.
287       assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
288           WAIT_TIME, false) ||
289         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
290       byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
291       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
292       assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
293     } finally {
294       stopSplitLogWorker(slw1);
295       stopSplitLogWorker(slw2);
296     }
297   }
298 
299   @Test(timeout=60000)
300   public void testPreemptTask() throws Exception {
301     LOG.info("testPreemptTask");
302     SplitLogCounters.resetCounters();
303     final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
304     final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
305     RegionServerServices mockedRS = getRegionServer(SRV);
306     SplitLogWorker slw =
307         new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
308     slw.start();
309     try {
310       Thread.yield(); // let the worker start
311       Thread.sleep(1000);
312       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
313 
314       // this time create a task node after starting the splitLogWorker
315       zkw.getRecoverableZooKeeper().create(PATH,
316         new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
317         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
318 
319       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
320       assertEquals(1, slw.getTaskReadySeq());
321       byte [] bytes = ZKUtil.getData(zkw, PATH);
322       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
323       assertTrue(slt.isOwned(SRV));
324       slt = new SplitLogTask.Owned(MANAGER, this.mode);
325       ZKUtil.setData(zkw, PATH, slt.toByteArray());
326       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
327     } finally {
328       stopSplitLogWorker(slw);
329     }
330   }
331 
332   @Test(timeout=60000)
333   public void testMultipleTasks() throws Exception {
334     LOG.info("testMultipleTasks");
335     SplitLogCounters.resetCounters();
336     final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
337     final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
338     RegionServerServices mockedRS = getRegionServer(SRV);
339     SplitLogWorker slw =
340         new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
341     slw.start();
342     try {
343       Thread.yield(); // let the worker start
344       Thread.sleep(100);
345       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
346 
347       SplitLogTask unassignedManager =
348         new SplitLogTask.Unassigned(MANAGER, this.mode);
349       zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
350         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
351 
352       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
353       // now the worker is busy doing the above task
354 
355       // create another task
356       final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
357       zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
358         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
359 
360       // preempt the first task, have it owned by another worker
361       final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
362       SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
363       ZKUtil.setData(zkw, PATH1, slt.toByteArray());
364       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
365 
366       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
367       assertEquals(2, slw.getTaskReadySeq());
368       byte [] bytes = ZKUtil.getData(zkw, PATH2);
369       slt = SplitLogTask.parseFrom(bytes);
370       assertTrue(slt.isOwned(SRV));
371     } finally {
372       stopSplitLogWorker(slw);
373     }
374   }
375 
376   @Test(timeout=60000)
377   public void testRescan() throws Exception {
378     LOG.info("testRescan");
379     SplitLogCounters.resetCounters();
380     final ServerName SRV = ServerName.valueOf("svr,1,1");
381     RegionServerServices mockedRS = getRegionServer(SRV);
382     slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
383     slw.start();
384     Thread.yield(); // let the worker start
385     Thread.sleep(100);
386 
387     String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
388     SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
389     zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
390       CreateMode.PERSISTENT);
391 
392     waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
393     // now the worker is busy doing the above task
394 
395     // preempt the task, have it owned by another worker
396     ZKUtil.setData(zkw, task, slt.toByteArray());
397     waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
398 
399     // create a RESCAN node
400     String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
401     rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
402       CreateMode.PERSISTENT_SEQUENTIAL);
403 
404     waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
405     // RESCAN node might not have been processed if the worker became busy
406     // with the above task. preempt the task again so that now the RESCAN
407     // node is processed
408     ZKUtil.setData(zkw, task, slt.toByteArray());
409     waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
410     waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
411 
412     List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
413     LOG.debug(nodes);
414     int num = 0;
415     for (String node : nodes) {
416       num++;
417       if (node.startsWith("RESCAN")) {
418         String name = ZKSplitLog.getEncodedNodeName(zkw, node);
419         String fn = ZKSplitLog.getFileName(name);
420         byte [] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(zkw.splitLogZNode, fn));
421         slt = SplitLogTask.parseFrom(data);
422         assertTrue(slt.toString(), slt.isDone(SRV));
423       }
424     }
425     assertEquals(2, num);
426   }
427 
428   @Test(timeout=60000)
429   public void testAcquireMultiTasks() throws Exception {
430     LOG.info("testAcquireMultiTasks");
431     SplitLogCounters.resetCounters();
432     final String TATAS = "tatas";
433     final ServerName RS = ServerName.valueOf("rs,1,1");
434     final int maxTasks = 3;
435     Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
436     testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
437     RegionServerServices mockedRS = getRegionServer(RS);
438     for (int i = 0; i < maxTasks; i++) {
439       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
440         new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
441           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
442     }
443 
444     SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
445     slw.start();
446     try {
447       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
448       for (int i = 0; i < maxTasks; i++) {
449         byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
450         SplitLogTask slt = SplitLogTask.parseFrom(bytes);
451         assertTrue(slt.isOwned(RS));
452       }
453     } finally {
454       stopSplitLogWorker(slw);
455     }
456   }
457 
458   /**
459    * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per
460    * RS
461    * @throws Exception
462    */
463   @Test(timeout=60000)
464   public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
465     LOG.info("testAcquireMultiTasks");
466     SplitLogCounters.resetCounters();
467     final String TATAS = "tatas";
468     final ServerName RS = ServerName.valueOf("rs,1,1");
469     final ServerName RS2 = ServerName.valueOf("rs,1,2");
470     final int maxTasks = 3;
471     Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
472     testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
473     RegionServerServices mockedRS = getRegionServer(RS);
474 
475     // create two RS nodes
476     String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
477     zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
478     rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
479     zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
480 
481     for (int i = 0; i < maxTasks; i++) {
482       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
483         new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
484           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
485     }
486 
487     SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
488     slw.start();
489     try {
490       int acquiredTasks = 0;
491       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
492       for (int i = 0; i < maxTasks; i++) {
493         byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
494         SplitLogTask slt = SplitLogTask.parseFrom(bytes);
495         if (slt.isOwned(RS)) {
496           acquiredTasks++;
497         }
498       }
499       assertEquals(2, acquiredTasks);
500     } finally {
501       stopSplitLogWorker(slw);
502     }
503   }
504 
505   /**
506    * Create a mocked region server service instance
507    * @param server
508    * @return
509    */
510   private RegionServerServices getRegionServer(ServerName name) {
511 
512     RegionServerServices mockedServer = mock(RegionServerServices.class);
513     when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
514     when(mockedServer.getServerName()).thenReturn(name);
515     when(mockedServer.getZooKeeper()).thenReturn(zkw);
516     when(mockedServer.isStopped()).thenReturn(false);
517     when(mockedServer.getExecutorService()).thenReturn(executorService);
518 
519     return mockedServer;
520   }
521 
522 }