View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.hbase.codec.Codec;
37  import org.apache.hadoop.hbase.io.LimitInputStream;
38  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
41  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
42  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
43  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.wal.WAL.Entry;
46  
47  import com.google.protobuf.CodedInputStream;
48  import com.google.protobuf.InvalidProtocolBufferException;
49  
50  /**
51   * A Protobuf based WAL has the following structure:
52   * <p>
53   * &lt;PB_WAL_MAGIC&gt;&lt;WALHeader&gt;&lt;WALEdits&gt;...&lt;WALEdits&gt;&lt;Trailer&gt;
54   * &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt;
55   * </p>
56   * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
57   * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure
58   * which is appended at the end of the WAL. This is empty for now; it can contain some meta
59   * information such as Region level stats, etc in future.
60   */
61  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
62    HBaseInterfaceAudience.CONFIG})
63  public class ProtobufLogReader extends ReaderBase {
64    private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
65    // public for WALFactory until we move everything to o.a.h.h.wal
66    @InterfaceAudience.Private
67    public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
68    // public for TestWALSplit
69    @InterfaceAudience.Private
70    public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
71    /**
72     * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
73     * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
74     */
75    static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
76    static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
77  
78    protected FSDataInputStream inputStream;
79    protected Codec.Decoder cellDecoder;
80    protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
81    protected boolean hasCompression = false;
82    protected boolean hasTagCompression = false;
83    // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit
84    // entry in the wal, the inputstream's position is equal to walEditsStopOffset.
85    private long walEditsStopOffset;
86    private boolean trailerPresent;
87    protected WALTrailer trailer;
88    // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
89    // than this size, it is written/read respectively, with a WARN message in the log.
90    protected int trailerWarnSize;
91    private static List<String> writerClsNames = new ArrayList<String>();
92    static {
93      writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
94    }
95    
96    // cell codec classname
97    private String codecClsName = null;
98  
99    enum WALHdrResult {
100     EOF,                   // stream is at EOF when method starts
101     SUCCESS,
102     UNKNOWN_WRITER_CLS     // name of writer class isn't recognized
103   }
104   
105   // context for WALHdr carrying information such as Cell Codec classname
106   static class WALHdrContext {
107     WALHdrResult result;
108     String cellCodecClsName;
109     
110     WALHdrContext(WALHdrResult result, String cellCodecClsName) {
111       this.result = result;
112       this.cellCodecClsName = cellCodecClsName;
113     }
114     WALHdrResult getResult() {
115       return result;
116     }
117     String getCellCodecClsName() {
118       return cellCodecClsName;
119     }
120   }
121 
122   public ProtobufLogReader() {
123     super();
124   }
125 
126   @Override
127   public void close() throws IOException {
128     if (this.inputStream != null) {
129       this.inputStream.close();
130       this.inputStream = null;
131     }
132   }
133 
134   @Override
135   public long getPosition() throws IOException {
136     return inputStream.getPos();
137   }
138 
139   @Override
140   public void reset() throws IOException {
141     String clsName = initInternal(null, false);
142     initAfterCompression(clsName); // We need a new decoder (at least).
143   }
144 
145   @Override
146   public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
147       throws IOException {
148     this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
149     super.init(fs, path, conf, stream);
150   }
151 
152   @Override
153   protected String initReader(FSDataInputStream stream) throws IOException {
154     return initInternal(stream, true);
155   }
156 
157   /*
158    * Returns names of the accepted writer classes
159    */
160   public List<String> getWriterClsNames() {
161     return writerClsNames;
162   }
163   
164   /*
165    * Returns the cell codec classname
166    */
167   public String getCodecClsName() {
168       return codecClsName;
169   }
170 
171   protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream)
172       throws IOException {
173      boolean res = builder.mergeDelimitedFrom(stream);
174      if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
175      if (builder.hasWriterClsName() &&
176          !getWriterClsNames().contains(builder.getWriterClsName())) {
177        return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
178      }
179      String clsName = null;
180      if (builder.hasCellCodecClsName()) {
181        clsName = builder.getCellCodecClsName();
182      }
183      return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
184   }
185 
186   private String initInternal(FSDataInputStream stream, boolean isFirst)
187       throws IOException {
188     close();
189     long expectedPos = PB_WAL_MAGIC.length;
190     if (stream == null) {
191       stream = fs.open(path);
192       stream.seek(expectedPos);
193     }
194     if (stream.getPos() != expectedPos) {
195       throw new IOException("The stream is at invalid position: " + stream.getPos());
196     }
197     // Initialize metadata or, when we reset, just skip the header.
198     WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
199     WALHdrContext hdrCtxt = readHeader(builder, stream);
200     WALHdrResult walHdrRes = hdrCtxt.getResult();
201     if (walHdrRes == WALHdrResult.EOF) {
202       throw new EOFException("Couldn't read WAL PB header");
203     }
204     if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
205       throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
206     }
207     if (isFirst) {
208       WALProtos.WALHeader header = builder.build();
209       this.hasCompression = header.hasHasCompression() && header.getHasCompression();
210       this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
211     }
212     this.inputStream = stream;
213     this.walEditsStopOffset = this.fileLength;
214     long currentPosition = stream.getPos();
215     trailerPresent = setTrailerIfPresent();
216     this.seekOnFs(currentPosition);
217     if (LOG.isTraceEnabled()) {
218       LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
219           + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
220     }
221     
222     codecClsName = hdrCtxt.getCellCodecClsName();
223     
224     return hdrCtxt.getCellCodecClsName();
225   }
226 
227   /**
228    * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
229    * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
230    * the trailer, and checks whether the trailer is present at the end or not by comparing the last
231    * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
232    * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
233    * before the trailer.
234    * <ul>
235    * The trailer is ignored in case:
236    * <li>fileLength is 0 or not correct (when file is under recovery, etc).
237    * <li>the trailer size is negative.
238    * </ul>
239    * <p>
240    * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
241    * @return true if a valid trailer is present
242    * @throws IOException
243    */
244   private boolean setTrailerIfPresent() {
245     try {
246       long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
247       if (trailerSizeOffset <= 0) return false;// no trailer possible.
248       this.seekOnFs(trailerSizeOffset);
249       // read the int as trailer size.
250       int trailerSize = this.inputStream.readInt();
251       ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
252       this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
253       if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
254         LOG.trace("No trailer found.");
255         return false;
256       }
257       if (trailerSize < 0) {
258         LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
259         return false;
260       } else if (trailerSize > this.trailerWarnSize) {
261         // continue reading after warning the user.
262         LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
263           + trailerSize + " > " + this.trailerWarnSize);
264       }
265       // seek to the position where trailer starts.
266       long positionOfTrailer = trailerSizeOffset - trailerSize;
267       this.seekOnFs(positionOfTrailer);
268       // read the trailer.
269       buf = ByteBuffer.allocate(trailerSize);// for trailer.
270       this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
271       trailer = WALTrailer.parseFrom(buf.array());
272       this.walEditsStopOffset = positionOfTrailer;
273       return true;
274     } catch (IOException ioe) {
275       LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
276     }
277     return false;
278   }
279 
280   protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
281       CompressionContext compressionContext) throws IOException {
282     return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
283   }
284 
285   @Override
286   protected void initAfterCompression() throws IOException {
287     initAfterCompression(null);
288   }
289   
290   @Override
291   protected void initAfterCompression(String cellCodecClsName) throws IOException {
292     WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
293     this.cellDecoder = codec.getDecoder(this.inputStream);
294     if (this.hasCompression) {
295       this.byteStringUncompressor = codec.getByteStringUncompressor();
296     }
297   }
298 
299   @Override
300   protected boolean hasCompression() {
301     return this.hasCompression;
302   }
303 
304   @Override
305   protected boolean hasTagCompression() {
306     return this.hasTagCompression;
307   }
308 
309   @Override
310   protected boolean readNext(Entry entry) throws IOException {
311     while (true) {
312       // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
313       long originalPosition = this.inputStream.getPos();
314       if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
315         return false;
316       }
317       WALKey.Builder builder = WALKey.newBuilder();
318       long size = 0;
319       try {
320         long available = -1;
321         try {
322           int firstByte = this.inputStream.read();
323           if (firstByte == -1) {
324             throw new EOFException("First byte is negative");
325           }
326           size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
327           // available may be < 0 on local fs for instance.  If so, can't depend on it.
328           available = this.inputStream.available();
329           if (available > 0 && available < size) {
330             throw new EOFException("Available stream not enough for edit, " +
331                 "inputStream.available()= " + this.inputStream.available() + ", " +
332                 "entry size= " + size);
333           }
334           ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
335             (int)size);
336         } catch (InvalidProtocolBufferException ipbe) {
337           throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
338             originalPosition + ", currentPosition=" + this.inputStream.getPos() +
339             ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
340         }
341         if (!builder.isInitialized()) {
342           // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
343           //       If we can get the KV count, we could, theoretically, try to get next record.
344           throw new EOFException("Partial PB while reading WAL, " +
345               "probably an unexpected EOF, ignoring");
346         }
347         WALKey walKey = builder.build();
348         entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
349         if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
350           LOG.trace("WALKey has no KVs that follow it; trying the next one");
351           continue;
352         }
353         int expectedCells = walKey.getFollowingKvCount();
354         long posBefore = this.inputStream.getPos();
355         try {
356           int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
357           if (expectedCells != actualCells) {
358             throw new EOFException("Only read " + actualCells); // other info added in catch
359           }
360         } catch (Exception ex) {
361           String posAfterStr = "<unknown>";
362           try {
363             posAfterStr = this.inputStream.getPos() + "";
364           } catch (Throwable t) {
365             LOG.trace("Error getting pos for error message - ignoring", t);
366           }
367           String message = " while reading " + expectedCells + " WAL KVs; started reading at "
368               + posBefore + " and read up to " + posAfterStr;
369           IOException realEofEx = extractHiddenEof(ex);
370           throw (EOFException) new EOFException("EOF " + message).
371               initCause(realEofEx != null ? realEofEx : ex);
372         }
373         if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
374           LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
375               + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
376               + this.walEditsStopOffset);
377           throw new EOFException("Read WALTrailer while reading WALEdits");
378         }
379       } catch (EOFException eof) {
380         LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
381         // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
382         if (originalPosition < 0) throw eof;
383         // Else restore our position to original location in hope that next time through we will
384         // read successfully.
385         seekOnFs(originalPosition);
386         return false;
387       }
388       return true;
389     }
390   }
391 
392   private IOException extractHiddenEof(Exception ex) {
393     // There are two problems we are dealing with here. Hadoop stream throws generic exception
394     // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
395     IOException ioEx = null;
396     if (ex instanceof EOFException) {
397       return (EOFException)ex;
398     } else if (ex instanceof IOException) {
399       ioEx = (IOException)ex;
400     } else if (ex instanceof RuntimeException
401         && ex.getCause() != null && ex.getCause() instanceof IOException) {
402       ioEx = (IOException)ex.getCause();
403     }
404     if (ioEx != null) {
405       if (ioEx.getMessage().contains("EOF")) return ioEx;
406       return null;
407     }
408     return null;
409   }
410 
411   @Override
412   protected void seekOnFs(long pos) throws IOException {
413     this.inputStream.seek(pos);
414   }
415 }