0
点赞
收藏
分享

微信扫一扫

spark: 简单的自定义RDD分区器


一、定义分区器

package example

import org.apache.spark.Partitioner


class MyPartitioner extends Partitioner{
  override def numPartitions: Int = 2;

  override def getPartition(key: Any): Int = key match {
    case 1 => 0
    case 2 => 0
    case 3 => 0
    case 4 => 0


    case _ => 1
  }
}

二、使用自定义分区器

package example

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object HelloRdd22 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    sparkConf.set("spark.default.parallelism","4");
    val sc = new SparkContext(sparkConf);

    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2);
    val mapRdd: RDD[(Int, Int)] = rdd1.map((_, 1));
    //val result: RDD[(Int, Int)] = mapRdd.partitionBy(new HashPartitioner(2))
    val result: RDD[(Int, Int)] = mapRdd.partitionBy(new MyPartitioner())

    result.saveAsTextFile("output");


  }

}


举报

相关推荐

0 条评论