0
点赞
收藏
分享

微信扫一扫

Spark Transformation算子->zip、zipWithIndex

1.zip

将两个 RDD 中的元素(KV 格式/非 KV 格式)变成一个 KV 格式的 RDD,两个 RDD 的 每个分区元素个数必须相同。

  1. java
package transformations;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

/**
* @Author yqq
* @Date 2021/12/10 00:04
* @Version 1.0
*/
public class ZipAndZipWithIndexTest {
public static void main(String[] args) {
JavaSparkContext context = new JavaSparkContext(
new SparkConf()
.setMaster("local")
.setAppName("zipandzipwithindex")
);
context.setLogLevel("Error");
JavaRDD<String> rdd = context.parallelize(Arrays.asList("a", "b", "c", "d"));
JavaRDD<Integer> rdd1 = context.parallelize(Arrays.asList(100,200,300,400));
JavaPairRDD<String, Integer> zip = rdd.zip(rdd1);
zip.foreach(e-> System.out.println(e));
}
}

Spark Transformation算子->zip、zipWithIndex_spark
2. scala

package transformation

import org.apache.spark.{SparkConf, SparkContext}

/**
* @Author yqq
* @Date 2021/12/10 00:19
* @Version 1.0
*/
object ZipAndZipWithIndexTest {
def main(args: Array[String]): Unit = {
val context = new SparkContext(
new SparkConf()
.setAppName("ZipAndZipWithIndex")
.setMaster("local")
)
context.setLogLevel("Error")
val rdd = context.parallelize(Array[String]("a", "b", "c", "c"))
val rdd1 = context.parallelize(Array[Int](1, 2, 3, 4))
rdd.zip(rdd1).foreach(println)
}
}

Spark Transformation算子->zip、zipWithIndex_spark_02

2.zipWithIndex

该函数将 RDD 中的元素和这个元素在 RDD 中的索引号(从 0 开始)组合成(K,V) 对。

  1. java
package transformations;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

/**
* @Author yqq
* @Date 2021/12/10 00:04
* @Version 1.0
*/
public class ZipAndZipWithIndexTest {
public static void main(String[] args) {
JavaSparkContext context = new JavaSparkContext(
new SparkConf()
.setMaster("local")
.setAppName("zipandzipwithindex")
);
context.setLogLevel("Error");
JavaRDD<String> rdd = context.parallelize(Arrays.asList("a", "b", "c", "d"));
rdd.zipWithIndex().foreach(e-> System.out.println(e));

}
}

Spark Transformation算子->zip、zipWithIndex_scala_03
2. scala

package transformation

import org.apache.spark.{SparkConf, SparkContext}

/**
* @Author yqq
* @Date 2021/12/10 00:19
* @Version 1.0
*/
object ZipAndZipWithIndexTest {
def main(args: Array[String]): Unit = {
val context = new SparkContext(
new SparkConf()
.setAppName("ZipAndZipWithIndex")
.setMaster("local")
)
context.setLogLevel("Error")
val rdd = context.parallelize(Array[String]("a", "b", "c", "c"))
rdd.zipWithIndex().foreach(println)

}
}

Spark Transformation算子->zip、zipWithIndex_java_04


举报

相关推荐

0 条评论