1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.security.PrivilegedExceptionAction;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.KeyValueUtil;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.io.compress.Compression;
39 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
41 import org.apache.hadoop.hbase.regionserver.HStore;
42 import org.apache.hadoop.hbase.regionserver.InternalScanner;
43 import org.apache.hadoop.hbase.regionserver.ScanType;
44 import org.apache.hadoop.hbase.regionserver.ScannerContext;
45 import org.apache.hadoop.hbase.regionserver.Store;
46 import org.apache.hadoop.hbase.regionserver.StoreFile;
47 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
48 import org.apache.hadoop.hbase.regionserver.StoreScanner;
49 import org.apache.hadoop.hbase.security.User;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
53
54
55
56
57
58 @InterfaceAudience.Private
59 public abstract class Compactor {
60 private static final Log LOG = LogFactory.getLog(Compactor.class);
61 protected CompactionProgress progress;
62 protected Configuration conf;
63 protected Store store;
64
65 private int compactionKVMax;
66 protected Compression.Algorithm compactionCompression;
67
68
69 protected int keepSeqIdPeriod;
70
71
72 Compactor(final Configuration conf, final Store store) {
73 this.conf = conf;
74 this.store = store;
75 this.compactionKVMax =
76 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
77 this.compactionCompression = (this.store.getFamily() == null) ?
78 Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
79 this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
80 HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
81 }
82
83 public interface CellSink {
84 void append(Cell cell) throws IOException;
85 }
86
87 public CompactionProgress getProgress() {
88 return this.progress;
89 }
90
91
92 protected static class FileDetails {
93
94 public long maxKeyCount = 0;
95
96 public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
97
98 public long maxSeqId = 0;
99
100 public long maxMVCCReadpoint = 0;
101
102 public int maxTagsLength = 0;
103
104 public long minSeqIdToKeep = 0;
105 }
106
107
108
109
110
111
112
113 protected FileDetails getFileDetails(
114 Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
115 FileDetails fd = new FileDetails();
116 long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() -
117 (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
118
119 for (StoreFile file : filesToCompact) {
120 if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
121
122
123 if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
124 fd.minSeqIdToKeep = file.getMaxMemstoreTS();
125 }
126 }
127 long seqNum = file.getMaxSequenceId();
128 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
129 StoreFile.Reader r = file.getReader();
130 if (r == null) {
131 LOG.warn("Null reader for " + file.getPath());
132 continue;
133 }
134
135
136
137 long keyCount = r.getEntries();
138 fd.maxKeyCount += keyCount;
139
140 Map<byte[], byte[]> fileInfo = r.loadFileInfo();
141 byte tmp[] = null;
142
143
144 if (r.isBulkLoaded()) {
145 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
146 }
147 else {
148 tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
149 if (tmp != null) {
150 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
151 }
152 }
153 tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
154 if (tmp != null) {
155 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
156 }
157
158
159 long earliestPutTs = 0;
160 if (allFiles) {
161 tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
162 if (tmp == null) {
163
164
165 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
166 } else {
167 earliestPutTs = Bytes.toLong(tmp);
168 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
169 }
170 }
171 if (LOG.isDebugEnabled()) {
172 LOG.debug("Compacting " + file +
173 ", keycount=" + keyCount +
174 ", bloomtype=" + r.getBloomFilterType().toString() +
175 ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
176 ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
177 ", seqNum=" + seqNum +
178 (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
179 }
180 }
181 return fd;
182 }
183
184
185
186
187
188
189 protected List<StoreFileScanner> createFileScanners(
190 final Collection<StoreFile> filesToCompact,
191 long smallestReadPoint,
192 boolean useDropBehind) throws IOException {
193 return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
194
195
196
197
198 smallestReadPoint);
199 }
200
201 protected long getSmallestReadPoint() {
202 return store.getSmallestReadPoint();
203 }
204
205
206
207
208
209
210
211
212
213 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
214 ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
215 return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null);
216 }
217
218 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
219 final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
220 User user) throws IOException {
221 if (store.getCoprocessorHost() == null) return null;
222 if (user == null) {
223 return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
224 earliestPutTs, request);
225 } else {
226 try {
227 return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
228 @Override
229 public InternalScanner run() throws Exception {
230 return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
231 scanType, earliestPutTs, request);
232 }
233 });
234 } catch (InterruptedException ie) {
235 InterruptedIOException iioe = new InterruptedIOException();
236 iioe.initCause(ie);
237 throw iioe;
238 }
239 }
240 }
241
242
243
244
245
246
247
248
249 protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
250 final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
251 if (store.getCoprocessorHost() == null) return scanner;
252 if (user == null) {
253 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
254 } else {
255 try {
256 return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
257 @Override
258 public InternalScanner run() throws Exception {
259 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
260 }
261 });
262 } catch (InterruptedException ie) {
263 InterruptedIOException iioe = new InterruptedIOException();
264 iioe.initCause(ie);
265 throw iioe;
266 }
267 }
268 }
269
270
271
272
273
274 private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
275
276 private String generateCompactionName() {
277 int counter;
278 for (;;) {
279 counter = NAME_COUNTER.get();
280 int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
281 if (NAME_COUNTER.compareAndSet(counter, next)) {
282 break;
283 }
284 }
285 return store.getRegionInfo().getRegionNameAsString() + "#"
286 + store.getFamily().getNameAsString() + "#" + counter;
287 }
288
289
290
291
292
293
294
295
296 protected boolean performCompaction(InternalScanner scanner, CellSink writer,
297 long smallestReadPoint, boolean cleanSeqId,
298 CompactionThroughputController throughputController) throws IOException {
299 long bytesWritten = 0;
300 long bytesWrittenProgress = 0;
301
302
303 List<Cell> cells = new ArrayList<Cell>();
304 long closeCheckInterval = HStore.getCloseCheckInterval();
305 long lastMillis = 0;
306 if (LOG.isDebugEnabled()) {
307 lastMillis = EnvironmentEdgeManager.currentTime();
308 }
309 String compactionName = generateCompactionName();
310 long now = 0;
311 boolean hasMore;
312 ScannerContext scannerContext =
313 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
314
315 throughputController.start(compactionName);
316 try {
317 do {
318 hasMore = scanner.next(cells, scannerContext);
319 if (LOG.isDebugEnabled()) {
320 now = EnvironmentEdgeManager.currentTime();
321 }
322
323 for (Cell c : cells) {
324 if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
325 CellUtil.setSequenceId(c, 0);
326 }
327 writer.append(c);
328 int len = KeyValueUtil.length(c);
329 ++progress.currentCompactedKVs;
330 progress.totalCompactedSize += len;
331 if (LOG.isDebugEnabled()) {
332 bytesWrittenProgress += len;
333 }
334 throughputController.control(compactionName, len);
335
336 if (closeCheckInterval > 0) {
337 bytesWritten += len;
338 if (bytesWritten > closeCheckInterval) {
339 bytesWritten = 0;
340 if (!store.areWritesEnabled()) {
341 progress.cancel();
342 return false;
343 }
344 }
345 }
346 }
347
348
349 if (LOG.isDebugEnabled()) {
350 if ((now - lastMillis) >= 60 * 1000) {
351 LOG.debug("Compaction progress: "
352 + compactionName
353 + " "
354 + progress
355 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
356 / ((now - lastMillis) / 1000.0)) + ", throughputController is "
357 + throughputController);
358 lastMillis = now;
359 bytesWrittenProgress = 0;
360 }
361 }
362 cells.clear();
363 } while (hasMore);
364 } catch (InterruptedException e) {
365 progress.cancel();
366 throw new InterruptedIOException("Interrupted while control throughput of compacting "
367 + compactionName);
368 } finally {
369 throughputController.finish(compactionName);
370 }
371 progress.complete();
372 return true;
373 }
374
375
376
377
378
379
380
381
382
383 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
384 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
385 Scan scan = new Scan();
386 scan.setMaxVersions(store.getFamily().getMaxVersions());
387 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
388 scanType, smallestReadPoint, earliestPutTs);
389 }
390
391
392
393
394
395
396
397
398
399
400 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
401 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
402 byte[] dropDeletesToRow) throws IOException {
403 Scan scan = new Scan();
404 scan.setMaxVersions(store.getFamily().getMaxVersions());
405 return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
406 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
407 }
408 }