1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.locks.Lock;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.ChoreService;
28 import org.apache.hadoop.hbase.ScheduledChore;
29 import org.apache.hadoop.hbase.Stoppable;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.Admin;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.RegionLocator;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.security.User;
38 import org.apache.hadoop.hbase.security.UserProvider;
39 import org.apache.hadoop.security.UserGroupInformation;
40 import org.apache.log4j.Logger;
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class ConnectionCache {
50 private static final Logger LOG = Logger.getLogger(ConnectionCache.class);
51
52 private final Map<String, ConnectionInfo>
53 connections = new ConcurrentHashMap<String, ConnectionInfo>();
54 private final KeyLocker<String> locker = new KeyLocker<String>();
55 private final String realUserName;
56 private final UserGroupInformation realUser;
57 private final UserProvider userProvider;
58 private final Configuration conf;
59 private final ChoreService choreService;
60
61 private final ThreadLocal<String> effectiveUserNames =
62 new ThreadLocal<String>() {
63 protected String initialValue() {
64 return realUserName;
65 }
66 };
67
68 public ConnectionCache(final Configuration conf,
69 final UserProvider userProvider,
70 final int cleanInterval, final int maxIdleTime) throws IOException {
71 Stoppable stoppable = new Stoppable() {
72 private volatile boolean isStopped = false;
73 @Override public void stop(String why) { isStopped = true;}
74 @Override public boolean isStopped() {return isStopped;}
75 };
76 this.choreService = new ChoreService("ConnectionCache");
77 ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) {
78 @Override
79 protected void chore() {
80 for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
81 ConnectionInfo connInfo = entry.getValue();
82 if (connInfo.timedOut(maxIdleTime)) {
83 if (connInfo.admin != null) {
84 try {
85 connInfo.admin.close();
86 } catch (Throwable t) {
87 LOG.info("Got exception in closing idle admin", t);
88 }
89 }
90 try {
91 connInfo.connection.close();
92 } catch (Throwable t) {
93 LOG.info("Got exception in closing idle connection", t);
94 }
95 }
96 }
97 }
98 };
99
100 choreService.scheduleChore(cleaner);
101 this.realUser = userProvider.getCurrent().getUGI();
102 this.realUserName = realUser.getShortUserName();
103 this.userProvider = userProvider;
104 this.conf = conf;
105 }
106
107
108
109
110 public void setEffectiveUser(String user) {
111 effectiveUserNames.set(user);
112 }
113
114
115
116
117 public String getEffectiveUser() {
118 return effectiveUserNames.get();
119 }
120
121
122
123
124 public void shutdown() {
125 if (choreService != null) choreService.shutdown();
126 }
127
128
129
130
131
132 public Admin getAdmin() throws IOException {
133 ConnectionInfo connInfo = getCurrentConnection();
134 if (connInfo.admin == null) {
135 Lock lock = locker.acquireLock(getEffectiveUser());
136 try {
137 if (connInfo.admin == null) {
138 connInfo.admin = connInfo.connection.getAdmin();
139 }
140 } finally {
141 lock.unlock();
142 }
143 }
144 return connInfo.admin;
145 }
146
147
148
149
150 public Table getTable(String tableName) throws IOException {
151 ConnectionInfo connInfo = getCurrentConnection();
152 return connInfo.connection.getTable(TableName.valueOf(tableName));
153 }
154
155
156
157
158 public RegionLocator getRegionLocator(byte[] tableName) throws IOException {
159 return getCurrentConnection().connection.getRegionLocator(TableName.valueOf(tableName));
160 }
161
162
163
164
165
166 ConnectionInfo getCurrentConnection() throws IOException {
167 String userName = getEffectiveUser();
168 ConnectionInfo connInfo = connections.get(userName);
169 if (connInfo == null || !connInfo.updateAccessTime()) {
170 Lock lock = locker.acquireLock(userName);
171 try {
172 connInfo = connections.get(userName);
173 if (connInfo == null) {
174 UserGroupInformation ugi = realUser;
175 if (!userName.equals(realUserName)) {
176 ugi = UserGroupInformation.createProxyUser(userName, realUser);
177 }
178 User user = userProvider.create(ugi);
179 Connection conn = ConnectionFactory.createConnection(conf, user);
180 connInfo = new ConnectionInfo(conn, userName);
181 connections.put(userName, connInfo);
182 }
183 } finally {
184 lock.unlock();
185 }
186 }
187 return connInfo;
188 }
189
190 class ConnectionInfo {
191 final Connection connection;
192 final String userName;
193
194 volatile Admin admin;
195 private long lastAccessTime;
196 private boolean closed;
197
198 ConnectionInfo(Connection conn, String user) {
199 lastAccessTime = EnvironmentEdgeManager.currentTime();
200 connection = conn;
201 closed = false;
202 userName = user;
203 }
204
205 synchronized boolean updateAccessTime() {
206 if (closed) {
207 return false;
208 }
209 if (connection.isAborted() || connection.isClosed()) {
210 LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
211 connections.remove(userName);
212 return false;
213 }
214 lastAccessTime = EnvironmentEdgeManager.currentTime();
215 return true;
216 }
217
218 synchronized boolean timedOut(int maxIdleTime) {
219 long timeoutTime = lastAccessTime + maxIdleTime;
220 if (EnvironmentEdgeManager.currentTime() > timeoutTime) {
221 connections.remove(userName);
222 closed = true;
223 return true;
224 }
225 return false;
226 }
227 }
228 }