0
点赞
收藏
分享

微信扫一扫

31.电视采集项目流程spark篇通过sparksql处理业务逻辑

新建包

31.电视采集项目流程spark篇通过sparksql处理业务逻辑_java

 

 

package com.it19gong.clickproject;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AccessLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Text text = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String itr[] = value.toString().split(" ");
if (itr.length < 11)
{
return;
}
String ip = itr[0];
String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
String url = itr[6];
String upFlow = itr[9];

text.set(ip+","+date+","+url+","+upFlow);
context.write(text, NullWritable.get());

}
}

 

 

package com.it19gong.clickproject;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AnalysisNginxTool
{
private static Logger logger = LoggerFactory.getLogger(AnalysisNginxTool.class);

public static String nginxDateStmpToDate(String date)
{
String res = "";
try
{
SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
String datetmp = date.split(" ")[0].toUpperCase();
String mtmp = datetmp.split("/")[1];
DateToNUM.initMap();
datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));
System.out.println(datetmp);
Date d = df.parse(datetmp);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
res = sdf.format(d);
}
catch (ParseException e)
{
logger.error("error:" + date, e);
}
return res;
}

public static long nginxDateStmpToDateTime(String date)
{
long l = 0;
try
{
SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
String datetmp = date.split(" ")[0].toUpperCase();
String mtmp = datetmp.split("/")[1];
datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));

Date d = df.parse(datetmp);
l = d.getTime();
}
catch (ParseException e)
{
logger.error("error:" + date, e);
}
return l;
}
}

 

 

package com.it19gong.clickproject;

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;

/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}

/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}

/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
}

 

 

package com.it19gong.clickproject;

import java.util.HashMap;

public class DateToNUM
{
public static HashMap map = new HashMap();

public static void initMap()
{
map.put("JAN", "01");
map.put("FEB", "02");
map.put("MAR", "03");
map.put("APR", "04");
map.put("MAY", "05");
map.put("JUN", "06");
map.put("JUL", "07");
map.put("AUG", "08");
map.put("SEPT", "09");
map.put("OCT", "10");
map.put("NOV", "11");
map.put("DEC", "12");
}
}

 

 

 

新建AccessLogDriver类

package com.it19gong.clickproject;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class AccessLogDriver {

public static void main(String[] args) throws Exception {

// 创建SparkConf、JavaSparkContext、SQLContext
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("RDD2DataFrameProgrammatically");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

// 第一步,创建一个普通的RDD,但是,必须将其转换为RDD<Row>的这种格式
JavaRDD<String> lines = sc.textFile("E:\\Mycode\\dianshixiangmu\\sparkproject\\data\\access.log");

// 分析一下
// 它报了一个,不能直接从String转换为Integer的一个类型转换的错误
// 就说明什么,说明有个数据,给定义成了String类型,结果使用的时候,要用Integer类型来使用
// 而且,错误报在sql相关的代码中
// 所以,基本可以断定,就是说,在sql中,用到age<=18的语法,所以就强行就将age转换为Integer来使用
// 但是,肯定是之前有些步骤,将age定义为了String
// 所以就往前找,就找到了这里
// 往Row中塞数据的时候,要注意,什么格式的数据,就用什么格式转换一下,再塞进去
JavaRDD<Row> clickRDD = lines.map(new Function<String, Row>() {

private static final long serialVersionUID = 1L;

@Override
public Row call(String line) throws Exception {
String itr[] = line.split(" ");

String ip = itr[0];
String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
String url = itr[6];
String upFlow = itr[9];

return RowFactory.create(
ip,
date,
url,
Integer.valueOf(upFlow)
);
}

});

// 第二步,动态构造元数据
// 比如说,id、name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql db里
// 或者是配置文件中,加载出来的,是不固定的
// 所以特别适合用这种编程的方式,来构造元数据
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("ip", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("date", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("url", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("upflow", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(structFields);

// 第三步,使用动态构造的元数据,将RDD转换为DataFrame
DataFrame studentDF = sqlContext.createDataFrame(clickRDD, structType);

// 后面,就可以使用DataFrame了
studentDF.registerTempTable("log");

DataFrame sumFlowDF = sqlContext.sql("select ip,sum(upflow) as sum from log group by ip order by sum desc");

List<Row> rows = sumFlowDF.javaRDD().collect();
for(Row row : rows) {
System.out.println(row);
}

}

}

 

 

运行程序

31.电视采集项目流程spark篇通过sparksql处理业务逻辑_sql_02

 

 

 

 

新建DBHelper类

31.电视采集项目流程spark篇通过sparksql处理业务逻辑_sql_03

 

 

package com.it19gong.clickproject;


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class DBHelper {

public static final String url ="jdbc:mysql://192.168.86.131:3306/userdb";
public static final String name="com.mysql.jdbc.Driver";
public static final String user="sqoop";
public static final String password="sqoop";

//获取数据库连接
public Connection conn=null;

public DBHelper(){
try {
Class.forName(name);
conn = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}


public void close(){
try {
this.conn.close();
} catch (SQLException e) {
// TODO: handle exception
e.printStackTrace();
}
}

}

 

 

 

修改AccessLogDriver类

package com.it19gong.clickproject;



import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;



import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;



public class AccessLogDriver {
static DBHelper db1=null;
public static void main(String[] args) throws Exception {

// 创建SparkConf、JavaSparkContext、SQLContext
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("RDD2DataFrameProgrammatically");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

// 第一步,创建一个普通的RDD,但是,必须将其转换为RDD<Row>的这种格式
JavaRDD<String> lines = sc.textFile("E:\\Mycode\\dianshixiangmu\\sparkproject\\data\\access.log");

// 分析一下
// 它报了一个,不能直接从String转换为Integer的一个类型转换的错误
// 就说明什么,说明有个数据,给定义成了String类型,结果使用的时候,要用Integer类型来使用
// 而且,错误报在sql相关的代码中
// 所以,基本可以断定,就是说,在sql中,用到age<=18的语法,所以就强行就将age转换为Integer来使用
// 但是,肯定是之前有些步骤,将age定义为了String
// 所以就往前找,就找到了这里
// 往Row中塞数据的时候,要注意,什么格式的数据,就用什么格式转换一下,再塞进去
JavaRDD<Row> clickRDD = lines.map(new Function<String, Row>() {

private static final long serialVersionUID = 1L;

@Override
public Row call(String line) throws Exception {
String itr[] = line.split(" ");

String ip = itr[0];
String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
String url = itr[6];
String upFlow = itr[9];

return RowFactory.create(
ip,
date,
url,
Integer.valueOf(upFlow)
);
}

});

// 第二步,动态构造元数据
// 比如说,id、name等,field的名称和类型,可能都是在程序运行过程中,动态从mysql db里
// 或者是配置文件中,加载出来的,是不固定的
// 所以特别适合用这种编程的方式,来构造元数据
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("ip", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("date", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("url", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("upflow", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(structFields);

// 第三步,使用动态构造的元数据,将RDD转换为DataFrame
DataFrame studentDF = sqlContext.createDataFrame(clickRDD, structType);

// 后面,就可以使用DataFrame了
studentDF.registerTempTable("log");

DataFrame sumFlowDF = sqlContext.sql("select ip,sum(upflow) as sum from log group by ip order by sum desc");

db1=new DBHelper();
final String sql="insert into upflow(ip,sum) values(?,?) ";
sumFlowDF.javaRDD().foreach(new VoidFunction<Row>() {

@Override
public void call(Row t) throws Exception {
// TODO Auto-generated method stub
PreparedStatement pt = db1.conn.prepareStatement(sql);
pt.setString(1,t.getString(0));
pt.setString(2,String.valueOf(t.getLong(1)));
pt.executeUpdate();
}
});;


}

}

 

 

运行

31.电视采集项目流程spark篇通过sparksql处理业务逻辑_spark_04

 

 

 

可以看到mysql数据库里面对了两条数据

31.电视采集项目流程spark篇通过sparksql处理业务逻辑_sql_05

 

举报

相关推荐

0 条评论