0
点赞
收藏
分享

微信扫一扫

3 Spark入门distinct、union、intersection,subtract,cartesian等数学运算


这一篇是一些简单的Spark操作,如去重、合并、取交集等,不管用不用的上,做个档案记录。

distinct去重

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

/**
* 去除重复的元素,不过此方法涉及到混洗,操作开销很大
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestDistinct {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> data = Arrays.asList(1, 1, 2, 3, 4, 5);
JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data);
List<Integer> results = originRDD.distinct().collect();
System.out.println(results);
}
}

结果是[4, 1, 3, 5, 2]

union合并,不去重

这个就是简单的将两个RDD合并到一起

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

/**
* 合并两个RDD
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestUnion {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> one = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> two = Arrays.asList(1, 6, 7, 8, 9);
JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one);
JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two);
List<Integer> results = oneRDD.union(twoRDD).collect();
System.out.println(results);
}
}

结果是[1, 2, 3, 4, 5, 1, 6, 7, 8, 9]

intersection取交集

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

/**
* 返回两个RDD的交集
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestIntersection {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> one = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> two = Arrays.asList(1, 6, 7, 8, 9);
JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one);
JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two);
List<Integer> results = oneRDD.intersection(twoRDD).collect();
System.out.println(results);
}
}

结果[1]

subtract

RDD1.subtract(RDD2),返回在RDD1中出现,但是不在RDD2中出现的元素,不去重 


import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

/**
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestSubtract {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> one = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> two = Arrays.asList(1, 6, 7, 8, 9);
JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one);
JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two);

List<Integer> results = oneRDD.subtract(twoRDD).collect();
System.out.println(results);
}
}

结果:[2, 3, 4, 5]

cartesian返回笛卡尔积

笛卡尔积就是两两组合的所有组合,这个的开销非常大,譬如A是["a","b","c"],B是["1","2","3"],那笛卡尔积就是

( 1 a)( 1 b)( 1 c)( 2 a)( 2 b)( 2 c)( 3 a)( 3 b)( 3 c)

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
* 返回笛卡尔积,开销很大
* @author wuweifeng wrote on 2018/4/16.
*/
public class TestCartesian {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> one = Arrays.asList(1, 2, 3);
List<Integer> two = Arrays.asList(1, 4, 5);
JavaRDD<Integer> oneRDD = javaSparkContext.parallelize(one);
JavaRDD<Integer> twoRDD = javaSparkContext.parallelize(two);
List<Tuple2<Integer, Integer>> results = oneRDD.cartesian(twoRDD).collect();
System.out.println(results);
}
}

注意,返回的是键值对

[(1,1), (1,4), (1,5), (2,1), (2,4), (2,5), (3,1), (3,4), (3,5)]


举报

相关推荐

0 条评论