1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.util.PriorityQueue;
23 import java.util.Queue;
24 import java.util.Set;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentSkipListSet;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HRegionLocation;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
38
39
40 public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
41 private static final Log LOG = LogFactory.getLog(MultiThreadedWriterBase.class);
42
43
44
45
46
47
48
49 protected BlockingQueue<Long> wroteKeys;
50
51
52
53
54
55 protected AtomicLong nextKeyToWrite = new AtomicLong();
56
57
58
59
60 protected AtomicLong wroteUpToKey = new AtomicLong();
61
62
63 protected Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
64
65
66
67
68
69
70 protected AtomicLong wroteKeyQueueSize = new AtomicLong();
71
72
73 protected boolean trackWroteKeys;
74
75 public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
76 TableName tableName, String actionLetter) throws IOException {
77 super(dataGen, conf, tableName, actionLetter);
78 this.wroteKeys = createWriteKeysQueue(conf);
79 }
80
81 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
82 return new ArrayBlockingQueue<Long>(10000);
83 }
84
85 @Override
86 public void start(long startKey, long endKey, int numThreads) throws IOException {
87 super.start(startKey, endKey, numThreads);
88 nextKeyToWrite.set(startKey);
89 wroteUpToKey.set(startKey - 1);
90
91 if (trackWroteKeys) {
92 new Thread(new WroteKeysTracker()).start();
93 numThreadsWorking.incrementAndGet();
94 }
95 }
96
97 protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
98 HRegionLocation cached = null, real = null;
99 try {
100 cached = connection.getRegionLocation(tableName, rowKey, false);
101 real = connection.getRegionLocation(tableName, rowKey, true);
102 } catch (Throwable t) {
103
104 }
105 String result = "no information can be obtained";
106 if (cached != null) {
107 result = "cached: " + cached.toString();
108 }
109 if (real != null && real.getServerName() != null) {
110 if (cached != null && cached.getServerName() != null && real.equals(cached)) {
111 result += "; cache is up to date";
112 } else {
113 result = (cached != null) ? (result + "; ") : "";
114 result += "real: " + real.toString();
115 }
116 }
117 return result;
118 }
119
120
121
122
123
124 private class WroteKeysTracker implements Runnable {
125
126 @Override
127 public void run() {
128 Thread.currentThread().setName(getClass().getSimpleName());
129 try {
130 long expectedKey = startKey;
131 Queue<Long> sortedKeys = new PriorityQueue<Long>();
132 while (expectedKey < endKey) {
133
134 Long k;
135 try {
136 k = wroteKeys.poll(1, TimeUnit.SECONDS);
137 } catch (InterruptedException e) {
138 LOG.info("Inserted key tracker thread interrupted", e);
139 break;
140 }
141 if (k == null) {
142 continue;
143 }
144 if (k == expectedKey) {
145
146 wroteUpToKey.set(k);
147 ++expectedKey;
148 } else {
149 sortedKeys.add(k);
150 }
151
152
153 while (!sortedKeys.isEmpty()
154 && ((k = sortedKeys.peek()) == expectedKey)) {
155 sortedKeys.poll();
156 wroteUpToKey.set(k);
157 ++expectedKey;
158 }
159
160 wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
161 }
162 } catch (Exception ex) {
163 LOG.error("Error in inserted/updaed key tracker", ex);
164 } finally {
165 numThreadsWorking.decrementAndGet();
166 }
167 }
168 }
169
170 public int getNumWriteFailures() {
171 return failedKeySet.size();
172 }
173
174
175
176
177
178 public long wroteUpToKey() {
179 return wroteUpToKey.get();
180 }
181
182 public boolean failedToWriteKey(long k) {
183 return failedKeySet.contains(k);
184 }
185
186 @Override
187 protected String progressInfo() {
188 StringBuilder sb = new StringBuilder();
189 appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
190 appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
191 return sb.toString();
192 }
193
194
195
196
197
198
199 public void setTrackWroteKeys(boolean enable) {
200 trackWroteKeys = enable;
201 }
202 }