View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.security;
20  
21  import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
22  import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
23  import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertNotSame;
26  import static org.junit.Assert.assertSame;
27  
28  import java.io.File;
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.util.ArrayList;
32  import java.util.List;
33  import java.util.Properties;
34  import java.util.concurrent.ThreadLocalRandom;
35  
36  import com.google.protobuf.RpcController;
37  import com.google.protobuf.ServiceException;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.fs.CommonConfigurationKeys;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellScanner;
42  import org.apache.hadoop.hbase.CellUtil;
43  import org.apache.hadoop.hbase.HBaseTestingUtility;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
47  import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
48  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
49  import org.apache.hadoop.hbase.ipc.RpcClient;
50  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
51  import org.apache.hadoop.hbase.ipc.RpcClientImpl;
52  import org.apache.hadoop.hbase.ipc.RpcServer;
53  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
54  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
55  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
56  import org.apache.hadoop.hbase.testclassification.SmallTests;
57  import org.apache.hadoop.minikdc.MiniKdc;
58  import org.apache.hadoop.security.UserGroupInformation;
59  import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
60  import org.junit.AfterClass;
61  import org.junit.BeforeClass;
62  import org.junit.Test;
63  import org.junit.experimental.categories.Category;
64  import org.mockito.Mockito;
65  
66  import com.google.common.collect.Lists;
67  import com.google.protobuf.BlockingRpcChannel;
68  import com.google.protobuf.BlockingService;
69  
70  @Category(SmallTests.class)
71  public class TestSecureRPC {
72  
73    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
74  
75    private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
76        .getPath());
77  
78    static final BlockingService SERVICE =
79        TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
80            new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
81  
82              @Override
83              public TestProtos.EmptyResponseProto ping(RpcController controller,
84                                                        TestProtos.EmptyRequestProto request)
85                  throws ServiceException {
86                return null;
87              }
88  
89              @Override
90              public TestProtos.EmptyResponseProto error(RpcController controller,
91                                                         TestProtos.EmptyRequestProto request)
92                  throws ServiceException {
93                return null;
94              }
95  
96              @Override
97              public TestProtos.EchoResponseProto echo(RpcController controller,
98                                                       TestProtos.EchoRequestProto request)
99                  throws ServiceException {
100               if (controller instanceof PayloadCarryingRpcController) {
101                 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
102                 // If cells, scan them to check we are able to iterate what we were given and since
103                 // this is
104                 // an echo, just put them back on the controller creating a new block. Tests our
105                 // block
106                 // building.
107                 CellScanner cellScanner = pcrc.cellScanner();
108                 List<Cell> list = null;
109                 if (cellScanner != null) {
110                   list = new ArrayList<Cell>();
111                   try {
112                     while (cellScanner.advance()) {
113                       list.add(cellScanner.current());
114                     }
115                   } catch (IOException e) {
116                     throw new ServiceException(e);
117                   }
118                 }
119                 cellScanner = CellUtil.createCellScanner(list);
120                 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
121               }
122               return TestProtos.EchoResponseProto.newBuilder()
123                   .setMessage(request.getMessage()).build();
124             }
125           });
126 
127   private static MiniKdc KDC;
128 
129   private static String HOST = "localhost";
130 
131   private static String PRINCIPAL;
132 
133   @BeforeClass
134   public static void setUp() throws Exception {
135     Properties conf = MiniKdc.createConf();
136     conf.put(MiniKdc.DEBUG, true);
137     KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
138     KDC.start();
139     PRINCIPAL = "hbase/" + HOST;
140     KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
141     HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
142     HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
143   }
144 
145   @AfterClass
146   public static void tearDown() throws IOException {
147     if (KDC != null) {
148       KDC.stop();
149     }
150     TEST_UTIL.cleanupTestDir();
151   }
152 
153   @Test
154   public void testRpc() throws Exception {
155     testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
156   }
157 
158   @Test
159   public void testRpcWithInsecureFallback() throws Exception {
160     testRpcFallbackToSimpleAuth(RpcClientImpl.class);
161   }
162 
163   @Test
164   public void testAsyncRpc() throws Exception {
165     testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
166   }
167 
168   @Test
169   public void testAsyncRpcWithInsecureFallback() throws Exception {
170     testRpcFallbackToSimpleAuth(AsyncRpcClient.class);
171   }
172 
173   private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
174       throws Exception {
175     String krbKeytab = getKeytabFileForTesting();
176     String krbPrincipal = getPrincipalForTesting();
177 
178     UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
179     UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
180 
181     // check that the login user is okay:
182     assertSame(ugi, ugi2);
183     assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
184     assertEquals(krbPrincipal, ugi.getUserName());
185 
186     Configuration clientConf = getSecuredConfiguration();
187     callRpcService(rpcImplClass, User.create(ugi2), clientConf, false);
188   }
189 
190   private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
191       throws Exception {
192     Configuration cnf = new Configuration();
193     cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
194     UserGroupInformation.setConfiguration(cnf);
195     UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
196     return UserGroupInformation.getLoginUser();
197   }
198 
199   private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser,
200                               Configuration clientConf, boolean allowInsecureFallback)
201       throws Exception {
202     Configuration clientConfCopy = new Configuration(clientConf);
203     clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
204 
205     Configuration conf = getSecuredConfiguration();
206     conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback);
207 
208     SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
209     Mockito.when(securityInfoMock.getServerPrincipal())
210         .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
211     SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
212 
213     InetSocketAddress isa = new InetSocketAddress(HOST, 0);
214 
215     RpcServerInterface rpcServer =
216         new RpcServer(null, "AbstractTestSecureIPC",
217             Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa,
218             conf, new FifoRpcScheduler(conf, 1));
219     rpcServer.start();
220     try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
221         HConstants.DEFAULT_CLUSTER_ID.toString())) {
222       InetSocketAddress address = rpcServer.getListenerAddress();
223       if (address == null) {
224         throw new IOException("Listener channel is closed");
225       }
226       BlockingRpcChannel channel =
227           rpcClient.createBlockingRpcChannel(
228 
229             ServerName.valueOf(address.getHostName(), address.getPort(),
230             System.currentTimeMillis()), clientUser, 5000);
231       TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
232           TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
233       List<String> results = new ArrayList<String>();
234       TestThread th1 = new TestThread(stub, results);
235       th1.start();
236       th1.join();
237 
238     } finally {
239       rpcServer.stop();
240     }
241   }
242 
243   public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception {
244     String krbKeytab = getKeytabFileForTesting();
245     String krbPrincipal = getPrincipalForTesting();
246 
247     UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
248     assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
249     assertEquals(krbPrincipal, ugi.getUserName());
250 
251     String clientUsername = "testuser";
252     UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
253         new String[]{clientUsername});
254 
255     // check that the client user is insecure
256     assertNotSame(ugi, clientUgi);
257     assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
258     assertEquals(clientUsername, clientUgi.getUserName());
259 
260     Configuration clientConf = new Configuration();
261     clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
262     callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
263   }
264 
265   public static class TestThread extends Thread {
266       private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
267 
268       private final List<String> results;
269 
270           public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) {
271           this.stub = stub;
272           this.results = results;
273         }
274 
275           @Override
276       public void run() {
277           String result;
278           try {
279               result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(String.valueOf(
280                   ThreadLocalRandom.current().nextInt())).build()).getMessage();
281             } catch (ServiceException e) {
282               throw new RuntimeException(e);
283             }
284           if (results != null) {
285               synchronized (results) {
286                   results.add(result);
287                 }
288             }
289         }
290     }
291 }