1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.collect.ImmutableMap;
22 import com.google.common.collect.ImmutableSet;
23 import com.google.common.collect.Maps;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import com.google.protobuf.Message;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.client.Put;
31 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
32 import org.apache.hadoop.hbase.security.User;
33 import org.apache.hadoop.hbase.testclassification.SmallTests;
34 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
35 import org.apache.hadoop.hbase.protobuf.RequestConverter;
36 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
37 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.Threads;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46
47 import java.io.IOException;
48 import java.net.InetSocketAddress;
49 import java.util.ArrayList;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.concurrent.CountDownLatch;
53
54 import static org.junit.Assert.assertEquals;
55 import static org.junit.Assert.assertNotEquals;
56 import static org.mockito.Matchers.any;
57 import static org.mockito.Matchers.anyObject;
58 import static org.mockito.Matchers.eq;
59 import static org.mockito.Mockito.doAnswer;
60 import static org.mockito.Mockito.mock;
61 import static org.mockito.Mockito.timeout;
62 import static org.mockito.Mockito.verify;
63 import static org.mockito.Mockito.when;
64
65 @Category(SmallTests.class)
66 public class TestSimpleRpcScheduler {
67 private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
68
69 private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
70 @Override
71 public InetSocketAddress getListenerAddress() {
72 return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
73 }
74 };
75 private Configuration conf;
76
77 @Before
78 public void setUp() {
79 conf = HBaseConfiguration.create();
80 }
81
82 @Test
83 public void testBasic() throws IOException, InterruptedException {
84 PriorityFunction qosFunction = mock(PriorityFunction.class);
85 RpcScheduler scheduler = new SimpleRpcScheduler(
86 conf, 10, 0, 0, qosFunction, 0);
87 scheduler.init(CONTEXT);
88 scheduler.start();
89 CallRunner task = createMockTask();
90 task.setStatus(new MonitoredRPCHandlerImpl());
91 scheduler.dispatch(task);
92 verify(task, timeout(1000)).run();
93 scheduler.stop();
94 }
95
96 @Test
97 public void testHandlerIsolation() throws IOException, InterruptedException {
98 CallRunner generalTask = createMockTask();
99 CallRunner priorityTask = createMockTask();
100 CallRunner replicationTask = createMockTask();
101 List<CallRunner> tasks = ImmutableList.of(
102 generalTask,
103 priorityTask,
104 replicationTask);
105 Map<CallRunner, Integer> qos = ImmutableMap.of(
106 generalTask, 0,
107 priorityTask, HConstants.HIGH_QOS + 1,
108 replicationTask, HConstants.REPLICATION_QOS);
109 PriorityFunction qosFunction = mock(PriorityFunction.class);
110 final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
111 final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
112 Answer<Void> answerToRun = new Answer<Void>() {
113 @Override
114 public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
115 synchronized (handlerThreads) {
116 handlerThreads.put(
117 (CallRunner) invocationOnMock.getMock(),
118 Thread.currentThread());
119 }
120 countDownLatch.countDown();
121 return null;
122 }
123 };
124 for (CallRunner task : tasks) {
125 task.setStatus(new MonitoredRPCHandlerImpl());
126 doAnswer(answerToRun).when(task).run();
127 }
128
129 RpcScheduler scheduler = new SimpleRpcScheduler(
130 conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
131 scheduler.init(CONTEXT);
132 scheduler.start();
133 for (CallRunner task : tasks) {
134 when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(),
135 (Message) anyObject(), (User) anyObject()))
136 .thenReturn(qos.get(task));
137 scheduler.dispatch(task);
138 }
139 for (CallRunner task : tasks) {
140 verify(task, timeout(1000)).run();
141 }
142 scheduler.stop();
143
144
145 countDownLatch.await();
146 assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
147 }
148
149 private CallRunner createMockTask() {
150 Call call = mock(Call.class);
151 CallRunner task = mock(CallRunner.class);
152 when(task.getCall()).thenReturn(call);
153 return task;
154 }
155
156 @Test
157 public void testRpcScheduler() throws Exception {
158 testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
159 testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
160 }
161
162 private void testRpcScheduler(final String queueType) throws Exception {
163 Configuration schedConf = HBaseConfiguration.create();
164 schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType);
165
166 PriorityFunction priority = mock(PriorityFunction.class);
167 when(priority.getPriority(any(RequestHeader.class),
168 any(Message.class), any(User.class)))
169 .thenReturn(HConstants.NORMAL_QOS);
170
171 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
172 HConstants.QOS_THRESHOLD);
173 try {
174 scheduler.start();
175
176 CallRunner smallCallTask = mock(CallRunner.class);
177 RpcServer.Call smallCall = mock(RpcServer.Call.class);
178 RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
179 when(smallCallTask.getCall()).thenReturn(smallCall);
180 when(smallCall.getHeader()).thenReturn(smallHead);
181
182 CallRunner largeCallTask = mock(CallRunner.class);
183 RpcServer.Call largeCall = mock(RpcServer.Call.class);
184 RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
185 when(largeCallTask.getCall()).thenReturn(largeCall);
186 when(largeCall.getHeader()).thenReturn(largeHead);
187
188 CallRunner hugeCallTask = mock(CallRunner.class);
189 RpcServer.Call hugeCall = mock(RpcServer.Call.class);
190 RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
191 when(hugeCallTask.getCall()).thenReturn(hugeCall);
192 when(hugeCall.getHeader()).thenReturn(hugeHead);
193
194 when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L);
195 when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L);
196 when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
197
198 final ArrayList<Integer> work = new ArrayList<Integer>();
199 doAnswerTaskExecution(smallCallTask, work, 10, 250);
200 doAnswerTaskExecution(largeCallTask, work, 50, 250);
201 doAnswerTaskExecution(hugeCallTask, work, 100, 250);
202
203 scheduler.dispatch(smallCallTask);
204 scheduler.dispatch(smallCallTask);
205 scheduler.dispatch(smallCallTask);
206 scheduler.dispatch(hugeCallTask);
207 scheduler.dispatch(smallCallTask);
208 scheduler.dispatch(largeCallTask);
209 scheduler.dispatch(smallCallTask);
210 scheduler.dispatch(smallCallTask);
211
212 while (work.size() < 8) {
213 Threads.sleepWithoutInterrupt(100);
214 }
215
216 int seqSum = 0;
217 int totalTime = 0;
218 for (int i = 0; i < work.size(); ++i) {
219 LOG.debug("Request i=" + i + " value=" + work.get(i));
220 seqSum += work.get(i);
221 totalTime += seqSum;
222 }
223 LOG.debug("Total Time: " + totalTime);
224
225
226
227
228 if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
229 assertEquals(530, totalTime);
230 } else
231 assertEquals(930, totalTime);
232 }
233 } finally {
234 scheduler.stop();
235 }
236 }
237
238 @Test
239 public void testScanQueues() throws Exception {
240 Configuration schedConf = HBaseConfiguration.create();
241 schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
242 schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
243 schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
244
245 PriorityFunction priority = mock(PriorityFunction.class);
246 when(priority.getPriority(any(RequestHeader.class), any(Message.class),
247 any(User.class))).thenReturn(HConstants.NORMAL_QOS);
248
249 RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
250 HConstants.QOS_THRESHOLD);
251 try {
252 scheduler.start();
253
254 CallRunner putCallTask = mock(CallRunner.class);
255 RpcServer.Call putCall = mock(RpcServer.Call.class);
256 putCall.param = RequestConverter.buildMutateRequest(
257 Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
258 RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
259 when(putCallTask.getCall()).thenReturn(putCall);
260 when(putCall.getHeader()).thenReturn(putHead);
261
262 CallRunner getCallTask = mock(CallRunner.class);
263 RpcServer.Call getCall = mock(RpcServer.Call.class);
264 RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
265 when(getCallTask.getCall()).thenReturn(getCall);
266 when(getCall.getHeader()).thenReturn(getHead);
267
268 CallRunner scanCallTask = mock(CallRunner.class);
269 RpcServer.Call scanCall = mock(RpcServer.Call.class);
270 scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
271 RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
272 when(scanCallTask.getCall()).thenReturn(scanCall);
273 when(scanCall.getHeader()).thenReturn(scanHead);
274
275 ArrayList<Integer> work = new ArrayList<Integer>();
276 doAnswerTaskExecution(putCallTask, work, 1, 1000);
277 doAnswerTaskExecution(getCallTask, work, 2, 1000);
278 doAnswerTaskExecution(scanCallTask, work, 3, 1000);
279
280
281
282 scheduler.dispatch(putCallTask);
283 scheduler.dispatch(putCallTask);
284 scheduler.dispatch(putCallTask);
285 scheduler.dispatch(getCallTask);
286 scheduler.dispatch(getCallTask);
287 scheduler.dispatch(getCallTask);
288 scheduler.dispatch(scanCallTask);
289 scheduler.dispatch(scanCallTask);
290 scheduler.dispatch(scanCallTask);
291
292 while (work.size() < 6) {
293 Threads.sleepWithoutInterrupt(100);
294 }
295
296 for (int i = 0; i < work.size() - 2; i += 3) {
297 assertNotEquals(work.get(i + 0), work.get(i + 1));
298 assertNotEquals(work.get(i + 0), work.get(i + 2));
299 assertNotEquals(work.get(i + 1), work.get(i + 2));
300 }
301 } finally {
302 scheduler.stop();
303 }
304 }
305
306 private void doAnswerTaskExecution(final CallRunner callTask,
307 final ArrayList<Integer> results, final int value, final int sleepInterval) {
308 callTask.setStatus(new MonitoredRPCHandlerImpl());
309 doAnswer(new Answer<Object>() {
310 @Override
311 public Object answer(InvocationOnMock invocation) {
312 synchronized (results) {
313 results.add(value);
314 }
315 Threads.sleepWithoutInterrupt(sleepInterval);
316 return null;
317 }
318 }).when(callTask).run();
319 }
320 }