View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.UnknownHostException;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellScanner;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.DoNotRetryIOException;
35  import org.apache.hadoop.hbase.HBaseIOException;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.HRegionLocation;
38  import org.apache.hadoop.hbase.NotServingRegionException;
39  import org.apache.hadoop.hbase.RegionLocations;
40  import org.apache.hadoop.hbase.RemoteExceptionHandler;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.UnknownScannerException;
44  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
45  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
46  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
47  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
48  import org.apache.hadoop.hbase.protobuf.RequestConverter;
49  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
50  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
51  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
52  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
53  import org.apache.hadoop.ipc.RemoteException;
54  import org.apache.hadoop.net.DNS;
55  
56  import com.google.protobuf.ServiceException;
57  import com.google.protobuf.TextFormat;
58  
59  /**
60   * Scanner operations such as create, next, etc.
61   * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as
62   * {@link RpcRetryingCaller} so fails are retried.
63   */
64  @InterfaceAudience.Private
65  public class ScannerCallable extends RegionServerCallable<Result[]> {
66    public static final String LOG_SCANNER_LATENCY_CUTOFF
67      = "hbase.client.log.scanner.latency.cutoff";
68    public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
69  
70    // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
71    public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
72    protected long scannerId = -1L;
73    protected boolean instantiated = false;
74    protected boolean closed = false;
75    protected boolean renew = false;
76    private Scan scan;
77    private int caching = 1;
78    protected final ClusterConnection cConnection;
79    protected ScanMetrics scanMetrics;
80    private boolean logScannerActivity = false;
81    private int logCutOffLatency = 1000;
82    private static String myAddress;
83    protected final int id;
84    protected boolean serverHasMoreResultsContext;
85    protected boolean serverHasMoreResults;
86  
87    /**
88     * Saves whether or not the most recent response from the server was a heartbeat message.
89     * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
90     */
91    protected boolean heartbeatMessage = false;
92    static {
93      try {
94        myAddress = DNS.getDefaultHost("default", "default");
95      } catch (UnknownHostException uhe) {
96        LOG.error("cannot determine my address", uhe);
97      }
98    }
99  
100   // indicate if it is a remote server call
101   protected boolean isRegionServerRemote = true;
102   private long nextCallSeq = 0;
103   protected RpcControllerFactory controllerFactory;
104   protected PayloadCarryingRpcController controller;
105 
106   /**
107    * @param connection which connection
108    * @param tableName table callable is on
109    * @param scan the scan to execute
110    * @param scanMetrics the ScanMetrics to used, if it is null, 
111    *        ScannerCallable won't collect metrics
112    * @param rpcControllerFactory factory to use when creating 
113    *        {@link com.google.protobuf.RpcController}
114    */
115   public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
116       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
117     this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
118   }
119   /**
120    *
121    * @param connection
122    * @param tableName
123    * @param scan
124    * @param scanMetrics
125    * @param id the replicaId
126    */
127   public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
128       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
129     super(connection, tableName, scan.getStartRow());
130     this.id = id;
131     this.cConnection = connection;
132     this.scan = scan;
133     this.scanMetrics = scanMetrics;
134     Configuration conf = connection.getConfiguration();
135     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
136     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
137     this.controllerFactory = rpcControllerFactory;
138   }
139 
140   PayloadCarryingRpcController getController() {
141     return controller;
142   }
143 
144   /**
145    * @param reload force reload of server location
146    * @throws IOException
147    */
148   @Override
149   public void prepare(boolean reload) throws IOException {
150     if (Thread.interrupted()) {
151       throw new InterruptedIOException();
152     }
153     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
154         id, getConnection(), getTableName(), getRow());
155     location = id < rl.size() ? rl.getRegionLocation(id) : null;
156     if (location == null || location.getServerName() == null) {
157       // With this exception, there will be a retry. The location can be null for a replica
158       //  when the table is created or after a split.
159       throw new HBaseIOException("There is no location for replica id #" + id);
160     }
161     ServerName dest = location.getServerName();
162     setStub(super.getConnection().getClient(dest));
163     if (!instantiated || reload) {
164       checkIfRegionServerIsRemote();
165       instantiated = true;
166     }
167 
168     // check how often we retry.
169     // HConnectionManager will call instantiateServer with reload==true
170     // if and only if for retries.
171     if (reload && this.scanMetrics != null) {
172       this.scanMetrics.countOfRPCRetries.incrementAndGet();
173       if (isRegionServerRemote) {
174         this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
175       }
176     }
177   }
178 
179   /**
180    * compare the local machine hostname with region server's hostname
181    * to decide if hbase client connects to a remote region server
182    */
183   protected void checkIfRegionServerIsRemote() {
184     if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
185       isRegionServerRemote = false;
186     } else {
187       isRegionServerRemote = true;
188     }
189   }
190 
191 
192   @Override
193   public Result [] call(int callTimeout) throws IOException {
194     if (Thread.interrupted()) {
195       throw new InterruptedIOException();
196     }
197     if (closed) {
198       if (scannerId != -1) {
199         close();
200       }
201     } else {
202       if (scannerId == -1L) {
203         this.scannerId = openScanner();
204       } else {
205         Result [] rrs = null;
206         ScanRequest request = null;
207         // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
208         setHeartbeatMessage(false);
209         try {
210           incRPCcallsMetrics();
211           request =
212               RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
213                 this.scanMetrics != null, renew);
214           ScanResponse response = null;
215           controller = controllerFactory.newController();
216           controller.setPriority(getTableName());
217           controller.setCallTimeout(callTimeout);
218           try {
219             response = getStub().scan(controller, request);
220             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
221             // from client to server will increment this number in both sides. Client passes this
222             // number along with the request and at RS side both the incoming nextCallSeq and its
223             // nextCallSeq will be matched. In case of a timeout this increment at the client side
224             // should not happen. If at the server side fetching of next batch of data was over,
225             // there will be mismatch in the nextCallSeq number. Server will throw
226             // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
227             // as the last successfully retrieved row.
228             // See HBASE-5974
229             nextCallSeq++;
230             long timestamp = System.currentTimeMillis();
231             setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
232             // Results are returned via controller
233             CellScanner cellScanner = controller.cellScanner();
234             rrs = ResponseConverter.getResults(cellScanner, response);
235             if (logScannerActivity) {
236               long now = System.currentTimeMillis();
237               if (now - timestamp > logCutOffLatency) {
238                 int rows = rrs == null ? 0 : rrs.length;
239                 LOG.info("Took " + (now-timestamp) + "ms to fetch "
240                   + rows + " rows from scanner=" + scannerId);
241               }
242             }
243             updateServerSideMetrics(response);
244             // moreResults is only used for the case where a filter exhausts all elements
245             if (response.hasMoreResults() && !response.getMoreResults()) {
246               scannerId = -1L;
247               closed = true;
248               // Implied that no results were returned back, either.
249               return null;
250             }
251             // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
252             // to size or quantity of results in the response.
253             if (response.hasMoreResultsInRegion()) {
254               // Set what the RS said
255               setHasMoreResultsContext(true);
256               setServerHasMoreResults(response.getMoreResultsInRegion());
257             } else {
258               // Server didn't respond whether it has more results or not.
259               setHasMoreResultsContext(false);
260             }
261           } catch (ServiceException se) {
262             throw ProtobufUtil.getRemoteException(se);
263           }
264           updateResultsMetrics(rrs);
265         } catch (IOException e) {
266           if (logScannerActivity) {
267             LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
268               + " to " + getLocation(), e);
269           }
270           IOException ioe = e;
271           if (e instanceof RemoteException) {
272             ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
273           }
274           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
275             try {
276               HRegionLocation location =
277                 getConnection().relocateRegion(getTableName(), scan.getStartRow());
278               LOG.info("Scanner=" + scannerId
279                 + " expired, current region location is " + location.toString());
280             } catch (Throwable t) {
281               LOG.info("Failed to relocate region", t);
282             }
283           }
284           // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
285           // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
286           // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
287           // a scan when doing a next in particular, we want to break out and get the scanner to
288           // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
289           // yeah and hard to follow and in need of a refactor).
290           if (ioe instanceof NotServingRegionException) {
291             // Throw a DNRE so that we break out of cycle of calling NSRE
292             // when what we need is to open scanner against new location.
293             // Attach NSRE to signal client that it needs to re-setup scanner.
294             if (this.scanMetrics != null) {
295               this.scanMetrics.countOfNSRE.incrementAndGet();
296             }
297             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
298           } else if (ioe instanceof RegionServerStoppedException) {
299             // Throw a DNRE so that we break out of cycle of the retries and instead go and
300             // open scanner against new location.
301             throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
302           } else {
303             // The outer layers will retry
304             throw ioe;
305           }
306         }
307         return rrs;
308       }
309     }
310     return null;
311   }
312 
313   /**
314    * @return true when the most recent RPC response indicated that the response was a heartbeat
315    *         message. Heartbeat messages are sent back from the server when the processing of the
316    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
317    *         timeouts during long running scan operations.
318    */
319   protected boolean isHeartbeatMessage() {
320     return heartbeatMessage;
321   }
322 
323   protected void setHeartbeatMessage(boolean heartbeatMessage) {
324     this.heartbeatMessage = heartbeatMessage;
325   }
326 
327   private void incRPCcallsMetrics() {
328     if (this.scanMetrics == null) {
329       return;
330     }
331     this.scanMetrics.countOfRPCcalls.incrementAndGet();
332     if (isRegionServerRemote) {
333       this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
334     }
335   }
336 
337   protected void updateResultsMetrics(Result[] rrs) {
338     if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
339       return;
340     }
341     long resultSize = 0;
342     for (Result rr : rrs) {
343       for (Cell cell : rr.rawCells()) {
344         resultSize += CellUtil.estimatedSerializedSizeOf(cell);
345       }
346     }
347     this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
348     if (isRegionServerRemote) {
349       this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
350     }
351   }
352 
353   /**
354    * Use the scan metrics returned by the server to add to the identically named counters in the
355    * client side metrics. If a counter does not exist with the same name as the server side metric,
356    * the attempt to increase the counter will fail.
357    * @param response
358    */
359   private void updateServerSideMetrics(ScanResponse response) {
360     if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
361 
362     Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
363     for (Entry<String, Long> entry : serverMetrics.entrySet()) {
364       this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
365     }
366   }
367 
368   private void close() {
369     if (this.scannerId == -1L) {
370       return;
371     }
372     try {
373       incRPCcallsMetrics();
374       ScanRequest request =
375           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
376       try {
377         getStub().scan(null, request);
378       } catch (ServiceException se) {
379         throw ProtobufUtil.getRemoteException(se);
380       }
381     } catch (IOException e) {
382       LOG.warn("Ignore, probably already closed", e);
383     }
384     this.scannerId = -1L;
385   }
386 
387   protected long openScanner() throws IOException {
388     incRPCcallsMetrics();
389     ScanRequest request =
390       RequestConverter.buildScanRequest(
391         getLocation().getRegionInfo().getRegionName(),
392         this.scan, 0, false);
393     try {
394       ScanResponse response = getStub().scan(null, request);
395       long id = response.getScannerId();
396       if (logScannerActivity) {
397         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
398           + " on region " + getLocation().toString());
399       }
400       return id;
401     } catch (ServiceException se) {
402       throw ProtobufUtil.getRemoteException(se);
403     }
404   }
405 
406   protected Scan getScan() {
407     return scan;
408   }
409 
410   /**
411    * Call this when the next invocation of call should close the scanner
412    */
413   public void setClose() {
414     this.closed = true;
415   }
416 
417   /**
418    * Indicate whether we make a call only to renew the lease, but without affected the scanner in
419    * any other way.
420    * @param val true if only the lease should be renewed
421    */
422   public void setRenew(boolean val) {
423     this.renew = val;
424   }
425 
426   /**
427    * @return the HRegionInfo for the current region
428    */
429   @Override
430   public HRegionInfo getHRegionInfo() {
431     if (!instantiated) {
432       return null;
433     }
434     return getLocation().getRegionInfo();
435   }
436 
437   /**
438    * Get the number of rows that will be fetched on next
439    * @return the number of rows for caching
440    */
441   public int getCaching() {
442     return caching;
443   }
444 
445   @Override
446   public ClusterConnection getConnection() {
447     return cConnection;
448   }
449 
450   /**
451    * Set the number of rows that will be fetched on next
452    * @param caching the number of rows for caching
453    */
454   public void setCaching(int caching) {
455     this.caching = caching;
456   }
457 
458   public ScannerCallable getScannerCallableForReplica(int id) {
459     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
460         this.getScan(), this.scanMetrics, controllerFactory, id);
461     s.setCaching(this.caching);
462     return s;
463   }
464 
465   /**
466    * Should the client attempt to fetch more results from this region
467    * @return True if the client should attempt to fetch more results, false otherwise.
468    */
469   protected boolean getServerHasMoreResults() {
470     assert serverHasMoreResultsContext;
471     return this.serverHasMoreResults;
472   }
473 
474   protected void setServerHasMoreResults(boolean serverHasMoreResults) {
475     this.serverHasMoreResults = serverHasMoreResults;
476   }
477 
478   /**
479    * Did the server respond with information about whether more results might exist.
480    * Not guaranteed to respond with older server versions
481    * @return True if the server responded with information about more results.
482    */
483   protected boolean hasMoreResultsContext() {
484     return serverHasMoreResultsContext;
485   }
486 
487   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
488     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
489   }
490 }