0
点赞
收藏
分享

微信扫一扫

【MapReduce】猫眼电影数据库传输数据



文章目录

  • ​​猫眼电影数据库传输数据​​
  • ​​1.数据集样式​​
  • ​​分析​​
  • ​​2.封装数据库Bean阶段​​
  • ​​3.Maper阶段​​
  • ​​4.Reducer阶段​​
  • ​​5.Driver阶段​​
  • ​​6.结果展示​​

猫眼电影数据库传输数据

1.数据集样式

【MapReduce】猫眼电影数据库传输数据_hadoop

分析

将数据存储到数据库中,按照“,”号拆分数据集;定义Bean对象,封装字段属性、自定义比较(按照排名输出到数据库);Map阶段获取数据,对每个字段处理(去重双引号),封装;Reducer阶端输出。

​​返回顶部​​

2.封装数据库Bean阶段

创建数据库表格存储数据

create table film
(
ranks int(255),
name varchar(255),
actors varchar(255),
grade varchar(255),
number varchar(255),
common varchar(255)
);

package 猫眼电影数据处理;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class SQLBean implements WritableComparable<SQLBean>, DBWritable {

// 封装数据库记录字段
private int ranks;
private String name;
private String actors;
private String grade;
private String number;
private String common;

@Override
public int compareTo(SQLBean o) {
if (this.ranks > o.ranks){
return 1;
}else if (this.ranks < o.ranks){
return -1;
} else{
return 0;
}
}

public void set(int rank, String name, String actors, String grade, String number, String common) {
this.ranks = rank;
this.name = name;
this.actors = actors;
this.grade = grade;
this.number = number;
this.common = common;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(ranks);
out.writeUTF(name);
out.writeUTF(actors);
out.writeUTF(grade);
out.writeUTF(number);
out.writeUTF(common);
}

@Override
public void readFields(DataInput in) throws IOException {
this.ranks = in.readInt();
this.name = in.readUTF();
this.actors = in.readUTF();
this.grade = in.readUTF();
this.number = in.readUTF();
this.common = in.readUTF();
}

@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1,this.ranks);
statement.setString(2,this.name);
statement.setString(3,this.actors);
statement.setString(4,this.grade);
statement.setString(5,this.number);
statement.setString(6,this.common);
}

@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.ranks = resultSet.getInt(1);
this.name = resultSet.getString(2);
this.actors = resultSet.getString(3);
this.grade = resultSet.getString(4);
this.number = resultSet.getString(5);
this.common = resultSet.getString(6);
}

@Override
public String toString() {
return "SQLBean{" +
"ranks='" + ranks + '\'' +
", name='" + name + '\'' +
", actors='" + actors + '\'' +
", grade='" + grade + '\'' +
", number='" + number + '\'' +
", common='" + common + '\'' +
'}';
}

public int getRanks() {
return ranks;
}

public void setRanks(int ranks) {
this.ranks = ranks;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getActors() {
return actors;
}

public void setActors(String actors) {
this.actors = actors;
}

public String getGrade() {
return grade;
}

public void setGrade(String grade) {
this.grade = grade;
}

public String getNumber() {
return number;
}

public void setNumber(String number) {
this.number = number;
}

public String getCommon() {
return common;
}

public void setCommon(String common) {
this.common = common;
}


}

​​返回顶部​​

3.Maper阶段

package 猫眼电影数据处理;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class SQLMap extends Mapper <LongWritable, Text, IntWritable,SQLBean>{
SQLBean v = new SQLBean();
IntWritable k = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行数据进行拆分
String[] fields = value.toString().split(",");
// 对数据进行处理 --- 去除引号
for (int i = 0; i < fields.length ; i++) {
fields[i] = fields[i].substring(1,fields[i].length()-1);
}
// 封装sql
k.set(Integer.parseInt(fields[0]));
v.set(Integer.parseInt(fields[0]),fields[1],fields[2],fields[3],fields[4],fields[5]);
// 写出
context.write(k,v);
System.out.println(k);
System.out.println(fields.length);
}
}

​​返回顶部​​

4.Reducer阶段

package 猫眼电影数据处理;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class SQLReducer extends Reducer<IntWritable,SQLBean,SQLBean,NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<SQLBean> values, Context context) throws IOException, InterruptedException {
// 写出
for (SQLBean bean:values){
context.write(bean,NullWritable.get());
}
}
}

​​返回顶部​​

5.Driver阶段

package 猫眼电影数据处理;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class SQLDriver {
public static void main(String[] args) {
try {
// 获取job
Configuration conf = new Configuration();
// 创建数据库连接
// DBConfiguration.configureDB(
// conf,
// "com.mysql.jdbc.Driver",
// "jdbc:mysql://192.168.64.178:3306/school",
// "root","123456"
// );
DBConfiguration.configureDB(
conf,
"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost:3306/mr",
"root","123456"
);
Job job = Job.getInstance(conf);

// 配置
job.setMapperClass(SQLMap.class);
job.setReducerClass(SQLReducer.class);
job.setJarByClass(SQLDriver.class);

job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(SQLBean.class);
job.setOutputKeyClass(SQLBean.class);
job.setOutputValueClass(NullWritable.class);

// 配置输入文件路径
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);

Path in = new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\猫眼电影数据处理\\data\\douban.txt");
FileInputFormat.setInputPaths(job,in);

String[] fields = {"ranks","name","actors","grade","number","common"};
DBOutputFormat.setOutput(job,"film",fields);

// 提交
System.exit(job.waitForCompletion(true) ? 0:1);
} catch (Exception e){
e.printStackTrace();
}
}
}

​​返回顶部​​

6.结果展示

【MapReduce】猫眼电影数据库传输数据_apache_02

​​返回顶部​​

踩的坑:

  • 报错:java.lang.Exception: java.lang.ArrayIndexOutOfBoundsException: 5
  • 报错:java.lang.Exception: java.io.IOException: Initialization of all the collectors failed.
  • 报错:java.lang.Exception: java.io.IOException: Could not create connection to database server.


举报

相关推荐

MapReduce 读写数据库

0 条评论