1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.CellScanner;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HBaseIOException;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HRegionLocation;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.RegionLocations;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.TableNotFoundException;
50 import org.apache.hadoop.hbase.TableDescriptors;
51 import org.apache.hadoop.hbase.client.ConnectionFactory;
52 import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
53 import org.apache.hadoop.hbase.client.ClusterConnection;
54 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
55 import org.apache.hadoop.hbase.client.RetryingCallable;
56 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
57 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
58 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
59 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
61 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
62 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
63 import org.apache.hadoop.hbase.wal.WALKey;
64 import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
65 import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
66 import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
67 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
68 import org.apache.hadoop.hbase.wal.WAL.Entry;
69 import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
70 import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
71 import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
72 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
73 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
74 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
75 import org.apache.hadoop.hbase.replication.WALEntryFilter;
76 import org.apache.hadoop.hbase.util.Bytes;
77 import org.apache.hadoop.hbase.util.Pair;
78 import org.apache.hadoop.hbase.util.Threads;
79 import org.apache.hadoop.util.StringUtils;
80
81 import com.google.common.cache.Cache;
82 import com.google.common.cache.CacheBuilder;
83 import com.google.common.collect.Lists;
84 import com.google.protobuf.ServiceException;
85
86
87
88
89
90 @InterfaceAudience.Private
91 public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
92
93 private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class);
94
95
96 private static String CLIENT_RETRIES_NUMBER
97 = "hbase.region.replica.replication.client.retries.number";
98
99 private Configuration conf;
100 private ClusterConnection connection;
101 private TableDescriptors tableDescriptors;
102
103
104 private PipelineController controller;
105 private RegionReplicaOutputSink outputSink;
106 private EntryBuffers entryBuffers;
107
108
109 private int numWriterThreads;
110
111 private int operationTimeout;
112
113 private ExecutorService pool;
114
115
116
117
118
119 private static class SkipReplayedEditsFilter extends BaseWALEntryFilter {
120 @Override
121 public Entry filter(Entry entry) {
122
123 if (entry.getKey().getOrigLogSeqNum() > 0) {
124 return null;
125 }
126 return entry;
127 }
128 }
129
130 @Override
131 public WALEntryFilter getWALEntryfilter() {
132 WALEntryFilter superFilter = super.getWALEntryfilter();
133 WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter();
134
135 if (superFilter == null) {
136 return skipReplayedEditsFilter;
137 }
138
139 if (skipReplayedEditsFilter == null) {
140 return superFilter;
141 }
142
143 ArrayList<WALEntryFilter> filters = Lists.newArrayList();
144 filters.add(superFilter);
145 filters.add(skipReplayedEditsFilter);
146 return new ChainWALEntryFilter(filters);
147 }
148
149 protected WALEntryFilter getSkipReplayedEditsFilter() {
150 return new SkipReplayedEditsFilter();
151 }
152
153 @Override
154 public void init(Context context) throws IOException {
155 super.init(context);
156
157 this.conf = HBaseConfiguration.create(context.getConfiguration());
158 this.tableDescriptors = context.getTableDescriptors();
159
160
161
162
163 int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
164 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
165 if (defaultNumRetries > 10) {
166 int mult = conf.getInt("hbase.client.serverside.retries.multiplier", 10);
167 defaultNumRetries = defaultNumRetries / mult;
168 }
169
170 conf.setInt("hbase.client.serverside.retries.multiplier", 1);
171 int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
172 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
173
174 this.numWriterThreads = this.conf.getInt(
175 "hbase.region.replica.replication.writer.threads", 3);
176 controller = new PipelineController();
177 entryBuffers = new EntryBuffers(controller,
178 this.conf.getInt("hbase.region.replica.replication.buffersize",
179 128*1024*1024));
180
181
182 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
183 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
184 }
185
186 @Override
187 protected void doStart() {
188 try {
189 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
190 this.pool = getDefaultThreadPool(conf);
191 outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
192 connection, pool, numWriterThreads, operationTimeout);
193 outputSink.startWriterThreads();
194 super.doStart();
195 } catch (IOException ex) {
196 LOG.warn("Received exception while creating connection :" + ex);
197 notifyFailed(ex);
198 }
199 }
200
201 @Override
202 protected void doStop() {
203 if (outputSink != null) {
204 try {
205 outputSink.finishWritingAndClose();
206 } catch (IOException ex) {
207 LOG.warn("Got exception while trying to close OutputSink");
208 LOG.warn(ex);
209 }
210 }
211 if (this.pool != null) {
212 this.pool.shutdownNow();
213 try {
214
215 boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
216 if (!shutdown) {
217 LOG.warn("Failed to shutdown the thread pool after 10 seconds");
218 }
219 } catch (InterruptedException e) {
220 LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
221 }
222 }
223 if (connection != null) {
224 try {
225 connection.close();
226 } catch (IOException ex) {
227 LOG.warn("Got exception closing connection :" + ex);
228 }
229 }
230 super.doStop();
231 }
232
233
234
235
236
237 private ExecutorService getDefaultThreadPool(Configuration conf) {
238 int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
239 int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16);
240 if (maxThreads == 0) {
241 maxThreads = Runtime.getRuntime().availableProcessors() * 8;
242 }
243 if (coreThreads == 0) {
244 coreThreads = Runtime.getRuntime().availableProcessors() * 8;
245 }
246 long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
247 LinkedBlockingQueue<Runnable> workQueue =
248 new LinkedBlockingQueue<Runnable>(maxThreads *
249 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
250 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
251 ThreadPoolExecutor tpe = new ThreadPoolExecutor(
252 coreThreads,
253 maxThreads,
254 keepAliveTime,
255 TimeUnit.SECONDS,
256 workQueue,
257 Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
258 tpe.allowCoreThreadTimeOut(true);
259 return tpe;
260 }
261
262 @Override
263 public boolean replicate(ReplicateContext replicateContext) {
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 while (this.isRunning()) {
288 try {
289 for (Entry entry: replicateContext.getEntries()) {
290 entryBuffers.appendEntry(entry);
291 }
292 outputSink.flush();
293 ctx.getMetrics().incrLogEditsFiltered(
294 outputSink.getSkippedEditsCounter().getAndSet(0));
295 return true;
296 } catch (InterruptedException e) {
297 Thread.currentThread().interrupt();
298 return false;
299 } catch (IOException e) {
300 LOG.warn("Received IOException while trying to replicate"
301 + StringUtils.stringifyException(e));
302 }
303 }
304
305 return false;
306 }
307
308 @Override
309 public boolean canReplicateToSameCluster() {
310 return true;
311 }
312
313 @Override
314 protected WALEntryFilter getScopeWALEntryFilter() {
315
316 return null;
317 }
318
319 static class RegionReplicaOutputSink extends OutputSink {
320 private final RegionReplicaSinkWriter sinkWriter;
321 private final TableDescriptors tableDescriptors;
322 private final Cache<TableName, Boolean> memstoreReplicationEnabled;
323
324 public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
325 EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
326 int numWriters, int operationTimeout) {
327 super(controller, entryBuffers, numWriters);
328 this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
329 this.tableDescriptors = tableDescriptors;
330
331
332
333
334
335 int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
336 .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
337 this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
338 .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
339 .initialCapacity(10)
340 .maximumSize(1000)
341 .build();
342 }
343
344 @Override
345 public void append(RegionEntryBuffer buffer) throws IOException {
346 List<Entry> entries = buffer.getEntryBuffer();
347
348 if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
349 return;
350 }
351
352
353
354 if (!requiresReplication(buffer.getTableName(), entries)) {
355 return;
356 }
357
358 sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
359 entries.get(0).getEdit().getCells().get(0).getRow(), entries);
360 }
361
362 @Override
363 public boolean flush() throws IOException {
364
365
366 entryBuffers.waitUntilDrained();
367 return super.flush();
368 }
369
370 @Override
371 public boolean keepRegionEvents() {
372 return true;
373 }
374
375 @Override
376 public List<Path> finishWritingAndClose() throws IOException {
377 finishWriting(true);
378 return null;
379 }
380
381 @Override
382 public Map<byte[], Long> getOutputCounts() {
383 return null;
384 }
385
386 @Override
387 public int getNumberOfRecoveredRegions() {
388 return 0;
389 }
390
391 AtomicLong getSkippedEditsCounter() {
392 return skippedEdits;
393 }
394
395
396
397
398
399
400 private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
401 throws IOException {
402
403 if (tableDescriptors == null) return true;
404
405 Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
406 if (requiresReplication == null) {
407
408
409 HTableDescriptor htd = tableDescriptors.get(tableName);
410 requiresReplication = htd == null || htd.hasRegionMemstoreReplication();
411 memstoreReplicationEnabled.put(tableName, requiresReplication);
412 }
413
414
415
416 if (!requiresReplication) {
417 int skipEdits = 0;
418 java.util.Iterator<Entry> it = entries.iterator();
419 while (it.hasNext()) {
420 Entry entry = it.next();
421 if (entry.getEdit().isMetaEdit()) {
422 requiresReplication = true;
423 } else {
424 it.remove();
425 skipEdits++;
426 }
427 }
428 skippedEdits.addAndGet(skipEdits);
429 }
430 return requiresReplication;
431 }
432 }
433
434 static class RegionReplicaSinkWriter extends SinkWriter {
435 RegionReplicaOutputSink sink;
436 ClusterConnection connection;
437 RpcControllerFactory rpcControllerFactory;
438 RpcRetryingCallerFactory rpcRetryingCallerFactory;
439 int operationTimeout;
440 ExecutorService pool;
441 Cache<TableName, Boolean> disabledAndDroppedTables;
442
443 public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
444 ExecutorService pool, int operationTimeout) {
445 this.sink = sink;
446 this.connection = connection;
447 this.operationTimeout = operationTimeout;
448 this.rpcRetryingCallerFactory
449 = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
450 this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
451 this.pool = pool;
452
453 int nonExistentTableCacheExpiryMs = connection.getConfiguration()
454 .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
455
456
457
458 disabledAndDroppedTables = CacheBuilder.newBuilder()
459 .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
460 .initialCapacity(10)
461 .maximumSize(1000)
462 .build();
463 }
464
465 public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
466 List<Entry> entries) throws IOException {
467
468 if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
469 if (LOG.isTraceEnabled()) {
470 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
471 + " is cached as a disabled or dropped table");
472 for (Entry entry : entries) {
473 LOG.trace("Skipping : " + entry);
474 }
475 }
476 sink.getSkippedEditsCounter().addAndGet(entries.size());
477 return;
478 }
479
480
481
482
483 RegionLocations locations = null;
484 boolean useCache = true;
485 while (true) {
486
487 try {
488 locations = RegionReplicaReplayCallable
489 .getRegionLocations(connection, tableName, row, useCache, 0);
490
491 if (locations == null) {
492 throw new HBaseIOException("Cannot locate locations for "
493 + tableName + ", row:" + Bytes.toStringBinary(row));
494 }
495 } catch (TableNotFoundException e) {
496 if (LOG.isTraceEnabled()) {
497 LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
498 + " is dropped. Adding table to cache.");
499 for (Entry entry : entries) {
500 LOG.trace("Skipping : " + entry);
501 }
502 }
503 disabledAndDroppedTables.put(tableName, Boolean.TRUE);
504
505 sink.getSkippedEditsCounter().addAndGet(entries.size());
506 return;
507 }
508
509
510
511 HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
512 if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
513 encodedRegionName)) {
514 if (useCache) {
515 useCache = false;
516 continue;
517 }
518 if (LOG.isTraceEnabled()) {
519 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
520 + " because located region " + primaryLocation.getRegionInfo().getEncodedName()
521 + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
522 + " from WALEdit");
523 for (Entry entry : entries) {
524 LOG.trace("Skipping : " + entry);
525 }
526 }
527 sink.getSkippedEditsCounter().addAndGet(entries.size());
528 return;
529 }
530 break;
531 }
532
533 if (locations.size() == 1) {
534 return;
535 }
536
537 ArrayList<Future<ReplicateWALEntryResponse>> tasks
538 = new ArrayList<Future<ReplicateWALEntryResponse>>(locations.size() - 1);
539
540
541
542 for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
543 HRegionLocation location = locations.getRegionLocation(replicaId);
544 if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
545 HRegionInfo regionInfo = location == null
546 ? RegionReplicaUtil.getRegionInfoForReplica(
547 locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
548 : location.getRegionInfo();
549 RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
550 rpcControllerFactory, tableName, location, regionInfo, row, entries,
551 sink.getSkippedEditsCounter());
552 Future<ReplicateWALEntryResponse> task = pool.submit(
553 new RetryingRpcCallable<ReplicateWALEntryResponse>(rpcRetryingCallerFactory,
554 callable, operationTimeout));
555 tasks.add(task);
556 }
557 }
558
559 boolean tasksCancelled = false;
560 for (Future<ReplicateWALEntryResponse> task : tasks) {
561 try {
562 task.get();
563 } catch (InterruptedException e) {
564 throw new InterruptedIOException(e.getMessage());
565 } catch (ExecutionException e) {
566 Throwable cause = e.getCause();
567 if (cause instanceof IOException) {
568
569
570
571
572
573
574 if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
575 if (LOG.isTraceEnabled()) {
576 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
577 + " because received exception for dropped or disabled table", cause);
578 for (Entry entry : entries) {
579 LOG.trace("Skipping : " + entry);
580 }
581 }
582 disabledAndDroppedTables.put(tableName, Boolean.TRUE);
583 if (!tasksCancelled) {
584 sink.getSkippedEditsCounter().addAndGet(entries.size());
585 tasksCancelled = true;
586 }
587 continue;
588 }
589
590 throw (IOException)cause;
591 }
592
593 throw new IOException(cause);
594 }
595 }
596 }
597 }
598
599 static class RetryingRpcCallable<V> implements Callable<V> {
600 RpcRetryingCallerFactory factory;
601 RetryingCallable<V> callable;
602 int timeout;
603 public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
604 int timeout) {
605 this.factory = factory;
606 this.callable = callable;
607 this.timeout = timeout;
608 }
609 @Override
610 public V call() throws Exception {
611 return factory.<V>newCaller().callWithRetries(callable, timeout);
612 }
613 }
614
615
616
617
618
619 static class RegionReplicaReplayCallable
620 extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
621
622 private final List<Entry> entries;
623 private final byte[] initialEncodedRegionName;
624 private final AtomicLong skippedEntries;
625
626 public RegionReplicaReplayCallable(ClusterConnection connection,
627 RpcControllerFactory rpcControllerFactory, TableName tableName,
628 HRegionLocation location, HRegionInfo regionInfo, byte[] row,List<Entry> entries,
629 AtomicLong skippedEntries) {
630 super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
631 this.entries = entries;
632 this.skippedEntries = skippedEntries;
633 this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
634 }
635
636 @Override
637 public ReplicateWALEntryResponse call(int timeout) throws IOException {
638 return replayToServer(this.entries, timeout);
639 }
640
641 private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
642 throws IOException {
643
644
645
646 boolean skip = false;
647
648 if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
649 initialEncodedRegionName)) {
650 skip = true;
651 }
652 if (!entries.isEmpty() && !skip) {
653 Entry[] entriesArray = new Entry[entries.size()];
654 entriesArray = entries.toArray(entriesArray);
655
656
657 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
658 ReplicationProtbufUtil.buildReplicateWALEntryRequest(
659 entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
660 try {
661 PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
662 controller.setCallTimeout(timeout);
663 controller.setPriority(tableName);
664 return stub.replay(controller, p.getFirst());
665 } catch (ServiceException se) {
666 throw ProtobufUtil.getRemoteException(se);
667 }
668 }
669
670 if (skip) {
671 if (LOG.isTraceEnabled()) {
672 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
673 + " because located region " + location.getRegionInfo().getEncodedName()
674 + " is different than the original region "
675 + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
676 for (Entry entry : entries) {
677 LOG.trace("Skipping : " + entry);
678 }
679 }
680 skippedEntries.addAndGet(entries.size());
681 }
682 return ReplicateWALEntryResponse.newBuilder().build();
683 }
684 }
685 }