1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
23 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.ByteArrayInputStream;
28 import java.io.DataInputStream;
29 import java.io.DataOutputStream;
30 import java.io.IOException;
31 import java.nio.BufferUnderflowException;
32 import java.nio.ByteBuffer;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Iterator;
36 import java.util.List;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.fs.FSDataInputStream;
41 import org.apache.hadoop.fs.FSDataOutputStream;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.testclassification.SmallTests;
47 import org.apache.hadoop.hbase.fs.HFileSystem;
48 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
49 import org.apache.hadoop.hbase.io.compress.Compression;
50 import org.apache.hadoop.hbase.util.ChecksumType;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.junit.experimental.categories.Category;
54
55 @Category(SmallTests.class)
56 public class TestChecksum {
57 private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
58
59 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
60 NONE, GZ };
61
62 static final int[] BYTES_PER_CHECKSUM = {
63 50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
64
65 private static final HBaseTestingUtility TEST_UTIL =
66 new HBaseTestingUtility();
67 private FileSystem fs;
68 private HFileSystem hfs;
69
70 @Before
71 public void setUp() throws Exception {
72 fs = HFileSystem.get(TEST_UTIL.getConfiguration());
73 hfs = (HFileSystem)fs;
74 }
75
76 @Test
77 public void testNewBlocksHaveDefaultChecksum() throws IOException {
78 Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
79 FSDataOutputStream os = fs.create(path);
80 HFileContext meta = new HFileContextBuilder().build();
81 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
82 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
83 for (int i = 0; i < 1000; ++i)
84 dos.writeInt(i);
85 hbw.writeHeaderAndData(os);
86 int totalSize = hbw.getOnDiskSizeWithHeader();
87 os.close();
88
89
90 assertEquals(true, hfs.useHBaseChecksum());
91
92 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
93 meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
94 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
95 is, totalSize, (HFileSystem) fs, path, meta);
96 HFileBlock b = hbr.readBlockData(0, -1, -1, false);
97 assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
98 }
99
100
101
102
103 @Test
104 public void testAllChecksumTypes() throws IOException {
105 List<ChecksumType> cktypes = new ArrayList<>(Arrays.asList(ChecksumType.values()));
106 for (Iterator<ChecksumType> itr = cktypes.iterator(); itr.hasNext(); ) {
107 ChecksumType cktype = itr.next();
108 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
109 FSDataOutputStream os = fs.create(path);
110 HFileContext meta = new HFileContextBuilder()
111 .withChecksumType(cktype).build();
112 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
113 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
114 for (int i = 0; i < 1000; ++i)
115 dos.writeInt(i);
116 hbw.writeHeaderAndData(os);
117 int totalSize = hbw.getOnDiskSizeWithHeader();
118 os.close();
119
120
121 assertEquals(true, hfs.useHBaseChecksum());
122
123 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
124 meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
125 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
126 is, totalSize, (HFileSystem) fs, path, meta);
127 HFileBlock b = hbr.readBlockData(0, -1, -1, false);
128 ByteBuffer data = b.getBufferWithoutHeader();
129 for (int i = 0; i < 1000; i++) {
130 assertEquals(i, data.getInt());
131 }
132 boolean exception_thrown = false;
133 try {
134 data.getInt();
135 } catch (BufferUnderflowException e) {
136 exception_thrown = true;
137 }
138 assertTrue(exception_thrown);
139 assertEquals(0, HFile.getChecksumFailuresCount());
140 }
141 }
142
143
144
145
146
147 @Test
148 public void testChecksumCorruption() throws IOException {
149 testChecksumCorruptionInternals(false);
150 testChecksumCorruptionInternals(true);
151 }
152
153 protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
154 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
155 for (boolean pread : new boolean[] { false, true }) {
156 LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
157 ", pread=" + pread);
158 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
159 + algo);
160 FSDataOutputStream os = fs.create(path);
161 HFileContext meta = new HFileContextBuilder()
162 .withCompression(algo)
163 .withIncludesMvcc(true)
164 .withIncludesTags(useTags)
165 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
166 .build();
167 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
168 long totalSize = 0;
169 for (int blockId = 0; blockId < 2; ++blockId) {
170 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
171 for (int i = 0; i < 1234; ++i)
172 dos.writeInt(i);
173 hbw.writeHeaderAndData(os);
174 totalSize += hbw.getOnDiskSizeWithHeader();
175 }
176 os.close();
177
178
179 assertEquals(true, hfs.useHBaseChecksum());
180
181
182 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
183 meta = new HFileContextBuilder()
184 .withCompression(algo)
185 .withIncludesMvcc(true)
186 .withIncludesTags(useTags)
187 .withHBaseCheckSum(true)
188 .build();
189 HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta);
190 HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
191 b.sanityCheck();
192 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
193 assertEquals(algo == GZ ? 2173 : 4936,
194 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
195
196 ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader();
197 DataInputStream in = new DataInputStream(
198 new ByteArrayInputStream(
199 bb.array(), bb.arrayOffset(), bb.limit()));
200
201
202
203 assertEquals(1, HFile.getChecksumFailuresCount());
204 validateData(in);
205
206
207
208
209 for (int i = 0; i <
210 HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
211 b = hbr.readBlockData(0, -1, -1, pread);
212 assertEquals(0, HFile.getChecksumFailuresCount());
213 }
214
215
216 b = hbr.readBlockData(0, -1, -1, pread);
217 assertEquals(1, HFile.getChecksumFailuresCount());
218
219
220
221 b = hbr.readBlockData(0, -1, -1, pread);
222 assertEquals(0, HFile.getChecksumFailuresCount());
223 is.close();
224
225
226
227
228 HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
229 assertEquals(false, newfs.useHBaseChecksum());
230 is = new FSDataInputStreamWrapper(newfs, path);
231 hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta);
232 b = hbr.readBlockData(0, -1, -1, pread);
233 is.close();
234 b.sanityCheck();
235 b = b.unpack(meta, hbr);
236 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
237 assertEquals(algo == GZ ? 2173 : 4936,
238 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
239
240 bb = b.getBufferWithoutHeader();
241 in = new DataInputStream(new ByteArrayInputStream(
242 bb.array(), bb.arrayOffset(), bb.limit()));
243
244
245
246 assertEquals(0, HFile.getChecksumFailuresCount());
247 validateData(in);
248 }
249 }
250 }
251
252
253
254
255 @Test
256 public void testChecksumChunks() throws IOException {
257 testChecksumInternals(false);
258 testChecksumInternals(true);
259 }
260
261 protected void testChecksumInternals(boolean useTags) throws IOException {
262 Compression.Algorithm algo = NONE;
263 for (boolean pread : new boolean[] { false, true }) {
264 for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
265 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" +
266 algo + bytesPerChecksum);
267 FSDataOutputStream os = fs.create(path);
268 HFileContext meta = new HFileContextBuilder()
269 .withCompression(algo)
270 .withIncludesMvcc(true)
271 .withIncludesTags(useTags)
272 .withHBaseCheckSum(true)
273 .withBytesPerCheckSum(bytesPerChecksum)
274 .build();
275 HFileBlock.Writer hbw = new HFileBlock.Writer(null,
276 meta);
277
278
279
280 long dataSize = 0;
281 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
282 for (; dataSize < 6 * bytesPerChecksum;) {
283 for (int i = 0; i < 1234; ++i) {
284 dos.writeInt(i);
285 dataSize += 4;
286 }
287 }
288 hbw.writeHeaderAndData(os);
289 long totalSize = hbw.getOnDiskSizeWithHeader();
290 os.close();
291
292 long expectedChunks = ChecksumUtil.numChunks(
293 dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
294 bytesPerChecksum);
295 LOG.info("testChecksumChunks: pread=" + pread +
296 ", bytesPerChecksum=" + bytesPerChecksum +
297 ", fileSize=" + totalSize +
298 ", dataSize=" + dataSize +
299 ", expectedChunks=" + expectedChunks);
300
301
302 assertEquals(true, hfs.useHBaseChecksum());
303
304
305 FSDataInputStream is = fs.open(path);
306 FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
307 meta = new HFileContextBuilder()
308 .withCompression(algo)
309 .withIncludesMvcc(true)
310 .withIncludesTags(useTags)
311 .withHBaseCheckSum(true)
312 .withBytesPerCheckSum(bytesPerChecksum)
313 .build();
314 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
315 is, nochecksum), totalSize, hfs, path, meta);
316 HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
317 is.close();
318 b.sanityCheck();
319 assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
320
321
322 assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
323 expectedChunks * HFileBlock.CHECKSUM_SIZE);
324
325
326 assertEquals(0, HFile.getChecksumFailuresCount());
327 }
328 }
329 }
330
331 private void validateData(DataInputStream in) throws IOException {
332
333 for (int i = 0; i < 1234; i++) {
334 int val = in.readInt();
335 assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val);
336 }
337 }
338
339
340
341
342
343
344 static private class FSReaderImplTest extends HFileBlock.FSReaderImpl {
345 public FSReaderImplTest(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
346 Path path, HFileContext meta) throws IOException {
347 super(istream, fileSize, (HFileSystem) fs, path, meta);
348 }
349
350 @Override
351 protected boolean validateBlockChecksum(HFileBlock block, long offset,
352 byte[] data, int hdrSize) throws IOException {
353 return false;
354 }
355 }
356 }
357