1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import org.apache.hadoop.io.LongWritable;
21 import org.apache.hadoop.io.Text;
22 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23 import org.apache.hadoop.hbase.client.Put;
24 import org.apache.hadoop.hbase.client.Durability;
25 import org.apache.hadoop.hbase.util.Bytes;
26 import org.apache.hadoop.hbase.KeyValue;
27
28 import java.io.IOException;
29
30
31
32
33
34
35 public class TsvImporterCustomTestMapper extends TsvImporterMapper {
36
37 @Override
38 protected void setup(Context context) {
39 doSetup(context);
40 }
41
42
43
44
45
46 @Override
47 public void map(LongWritable offset, Text value, Context context)
48 throws IOException {
49 byte[] family = Bytes.toBytes("FAM");
50 final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
51
52
53 byte[] lineBytes = value.getBytes();
54 String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b");
55
56
57 ImmutableBytesWritable rowKey =
58 new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
59 Put put = new Put(rowKey.copyBytes());
60 put.setDurability(Durability.SKIP_WAL);
61
62
63
64 for(int i = 1; i < valueTokens.length; i++) {
65 String prefix = valueTokens[i].substring(0, "VALUE".length());
66 String suffix = valueTokens[i].substring("VALUE".length());
67 String newValue = prefix + Integer.parseInt(suffix) * 3;
68
69 KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
70 qualifiers[i-1], Bytes.toBytes(newValue));
71 put.add(kv);
72 }
73
74 try {
75 context.write(rowKey, put);
76 } catch (InterruptedException e) {
77 e.printStackTrace();
78 }
79 }
80 }