2020-08-21
< view all posts在需要对大量交易数据进行特征工程时,可以编写MapReduce应用实现。在Map端主要进行的是数据的读取和一些简单的转换,之后在Reduce端实现具体的特征生成逻辑。
在将Map的输出发送给Reduce时,MapReduce会对数据进行Shuffle,当中就会自动将记录按照Map输出的key值进行聚合以及排序,这是很方便的功能。但有时我们不仅希望记录能根据Key值进行排序,还希望能够按照其它的某些字段排序。例如,Map输出的Key值为卡号,每次执行Reduce任务都会聚合同一张卡号的所有交易,这是我们需要的。但同时在Value中还保存了交易时间,我们还希望在Reduce得到的数据是对时间进行排序的。
实现这个功能有两种方案,一种是把时间保存在Value中,使用分隔符标注出来,在Reduce里截取出这段数据,之后进行排序。但这个方法容易导致Reduce阶段出现内存不足的情况。另一种方法就是使用自定义的Key类型,实现SecondarySort。
自定义的Key类型需要实现WirtableComparable接口,之后有两种方式指定自定义Key的排序方式:
1、重写Key类型中的compareTo()方法。默认的Comparator会调用重写之后compareTo()方法。
public class CardTimeCustomKey implements WritableComparable<CardTimeCustomKey>, CustomKey { private String cardNo; private String yearDtTm; public CardTimeCustomKey() { } public CardTimeCustomKey(String cardNo, String yearDtTm) { this.cardNo = cardNo; this.yearDtTm = yearDtTm; } // ...... (getter and setter) @Override public void write(DataOutput out) throws IOException { WritableUtils.writeString(out, cardNo); WritableUtils.writeString(out, yearDtTm); } @Override public void readFields(DataInput in) throws IOException { cardNo = WritableUtils.readString(in); yearDtTm = WritableUtils.readString(in); } @Override public int compareTo(CardTimeCustomKey o) { int result = cardNo.compareTo(o.getCardNo()); if (result != 0) { return result; } else { return yearDtTm.compareTo(o.getYearDtTm()); } } }
2、使用自定义的Comparator:自定义一个类,继承WirtableComparator,之后在MapReduce的任务设置中用job.setSortComparatorClass()来指定自定义的Comparator。
这两种方式二选一即可。
之后还需要两个步骤。一是自定义一个Partitioner,继承Partitioner类并且重写getPartition()方法,这个方法指定Map的结果按照什么分组后发送到Reduce。以上面的例子来说,我们只希望Key中的时间用于排序,而只按照卡号分组,因此对卡号取哈希值作为返回值即可。需要注意的是哈希值可能为负数,需要转换为正数。
public class CardTimePartitioner extends Partitioner<CardTimeCustomKey, Text> { public CardTimePartitioner(){ } @Override public int getPartition(CardTimeCustomKey key, Text value, int numReduceTasks){ // 避免 hashCode 出现负值 return ( (key.getCardNo().hashCode()&Integer.MAX_VALUE)%numReduceTasks ); } }
下一个步骤是自定义一个Grouping Comparator,继承WirtableComparator类,并重写Compare()方法。这个类用来指定在Reduce中对于得到的输入按照什么进行聚合。在Compare()中只对卡号进行排序,即可实现只按照卡号聚合。
public class CardTimeGroupingComparator extends WritableComparator { public CardTimeGroupingComparator() { super(CardTimeCustomKey.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2){ CardTimeCustomKey key1 = (CardTimeCustomKey) w1; CardTimeCustomKey key2 = (CardTimeCustomKey) w2; return key1.getCardNo().compareTo(key2.getCardNo()); } }
最后说明一下Partitioner和Grouping Comparator的区别,它们都对数据进行了分组或者说聚合的操作。它们的区别,简单来说,Partitioner在Mapper发送给Reducer时调用,决定Mapper的输出如何分组。在有多个Reduceer时,不同分组会发送给不同的Reducer,Partitioner就决定了每个Reducer收到怎样的数据,因此它和性能的相关性比较大。进一步的,还可以自定义Combiner,将Partitioner得到的键值对当中同样的键值作合并,这样就不需要再传输同样的键值了,可以减少Mapper到Reducer的传输过程中对带宽的消耗。
而在每个分组到达Reducer之后,会再次按照Grouping Comparator作比较和聚合,这是单个Reducer内部对收到的数据在逻辑上的分组,不涉及到Mapper和Reducer之间的传输。
一个比较极端的情况,如果只有一个reducer,虽然Partitioner对Mapper的结果分组了,但是因为只有一个Reducer在接受数据,所以实际上数据都到达了一个Reducer。此时我们在Reducer当中对得到的数据进行遍历,每次遍历的依然会是同一张卡的数据,而不是所有的数据,这就是靠Grouping Comparator的分组来保证的。