文章目录
- 猫眼电影数据库传输数据
- 1.数据集样式
- 分析
- 2.封装数据库Bean阶段
- 3.Maper阶段
- 4.Reducer阶段
- 5.Driver阶段
- 6.结果展示
猫眼电影数据库传输数据
1.数据集样式
分析
将数据存储到数据库中,按照“,”号拆分数据集;定义Bean对象,封装字段属性、自定义比较(按照排名输出到数据库);Map阶段获取数据,对每个字段处理(去重双引号),封装;Reducer阶端输出。
返回顶部
2.封装数据库Bean阶段
创建数据库表格存储数据
create table film
(
ranks int(255),
name varchar(255),
actors varchar(255),
grade varchar(255),
number varchar(255),
common varchar(255)
);
package 猫眼电影数据处理;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class SQLBean implements WritableComparable<SQLBean>, DBWritable {
// 封装数据库记录字段
private int ranks;
private String name;
private String actors;
private String grade;
private String number;
private String common;
@Override
public int compareTo(SQLBean o) {
if (this.ranks > o.ranks){
return 1;
}else if (this.ranks < o.ranks){
return -1;
} else{
return 0;
}
}
public void set(int rank, String name, String actors, String grade, String number, String common) {
this.ranks = rank;
this.name = name;
this.actors = actors;
this.grade = grade;
this.number = number;
this.common = common;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(ranks);
out.writeUTF(name);
out.writeUTF(actors);
out.writeUTF(grade);
out.writeUTF(number);
out.writeUTF(common);
}
@Override
public void readFields(DataInput in) throws IOException {
this.ranks = in.readInt();
this.name = in.readUTF();
this.actors = in.readUTF();
this.grade = in.readUTF();
this.number = in.readUTF();
this.common = in.readUTF();
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1,this.ranks);
statement.setString(2,this.name);
statement.setString(3,this.actors);
statement.setString(4,this.grade);
statement.setString(5,this.number);
statement.setString(6,this.common);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.ranks = resultSet.getInt(1);
this.name = resultSet.getString(2);
this.actors = resultSet.getString(3);
this.grade = resultSet.getString(4);
this.number = resultSet.getString(5);
this.common = resultSet.getString(6);
}
@Override
public String toString() {
return "SQLBean{" +
"ranks='" + ranks + '\'' +
", name='" + name + '\'' +
", actors='" + actors + '\'' +
", grade='" + grade + '\'' +
", number='" + number + '\'' +
", common='" + common + '\'' +
'}';
}
public int getRanks() {
return ranks;
}
public void setRanks(int ranks) {
this.ranks = ranks;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getActors() {
return actors;
}
public void setActors(String actors) {
this.actors = actors;
}
public String getGrade() {
return grade;
}
public void setGrade(String grade) {
this.grade = grade;
}
public String getNumber() {
return number;
}
public void setNumber(String number) {
this.number = number;
}
public String getCommon() {
return common;
}
public void setCommon(String common) {
this.common = common;
}
}
返回顶部
3.Maper阶段
package 猫眼电影数据处理;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SQLMap extends Mapper <LongWritable, Text, IntWritable,SQLBean>{
SQLBean v = new SQLBean();
IntWritable k = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行数据进行拆分
String[] fields = value.toString().split(",");
// 对数据进行处理 --- 去除引号
for (int i = 0; i < fields.length ; i++) {
fields[i] = fields[i].substring(1,fields[i].length()-1);
}
// 封装sql
k.set(Integer.parseInt(fields[0]));
v.set(Integer.parseInt(fields[0]),fields[1],fields[2],fields[3],fields[4],fields[5]);
// 写出
context.write(k,v);
System.out.println(k);
System.out.println(fields.length);
}
}
返回顶部
4.Reducer阶段
package 猫眼电影数据处理;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SQLReducer extends Reducer<IntWritable,SQLBean,SQLBean,NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<SQLBean> values, Context context) throws IOException, InterruptedException {
// 写出
for (SQLBean bean:values){
context.write(bean,NullWritable.get());
}
}
}
返回顶部
5.Driver阶段
package 猫眼电影数据处理;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class SQLDriver {
public static void main(String[] args) {
try {
// 获取job
Configuration conf = new Configuration();
// 创建数据库连接
// DBConfiguration.configureDB(
// conf,
// "com.mysql.jdbc.Driver",
// "jdbc:mysql://192.168.64.178:3306/school",
// "root","123456"
// );
DBConfiguration.configureDB(
conf,
"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost:3306/mr",
"root","123456"
);
Job job = Job.getInstance(conf);
// 配置
job.setMapperClass(SQLMap.class);
job.setReducerClass(SQLReducer.class);
job.setJarByClass(SQLDriver.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(SQLBean.class);
job.setOutputKeyClass(SQLBean.class);
job.setOutputValueClass(NullWritable.class);
// 配置输入文件路径
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
Path in = new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\猫眼电影数据处理\\data\\douban.txt");
FileInputFormat.setInputPaths(job,in);
String[] fields = {"ranks","name","actors","grade","number","common"};
DBOutputFormat.setOutput(job,"film",fields);
// 提交
System.exit(job.waitForCompletion(true) ? 0:1);
} catch (Exception e){
e.printStackTrace();
}
}
}
返回顶部
6.结果展示
返回顶部
踩的坑:
- 报错:java.lang.Exception: java.lang.ArrayIndexOutOfBoundsException: 5
- 报错:java.lang.Exception: java.io.IOException: Initialization of all the collectors failed.
- 报错:java.lang.Exception: java.io.IOException: Could not create connection to database server.