神刀安全网

本地通过Eclipse链接Hadoop操作Mysql数据库问题小结

前一段时间,在上一篇博文中描述了自己抽时间在构建的完全分布式Hadoop环境过程中遇到的一些问题以及构建成功后,通过Eclipse操作HDFS的时候遇到的一些问题,最近又想进一步学习学习Hadoop操作Mysql数据库的一些知识,在这里网上存在很多分歧,很多人可能会笑话,用那么“笨重”的Hadoop来操作数据库,脑子有问题吧,Hadoop的HDFS优势在于处理分布式文件系统,这种说法没有任何错误,数据库的操作讲究“安全、轻便、快捷”,用Hadoop操作完全是不符合常理啊,那为啥还要学习这个东西呢?其实退一步讲,在之前access数据库的应用占一定份额的时候,很多人选择使用文件作为数据的仓储,增删查改全部是操作文件,一个文件可能就是一个数据库或者一个数据表,那么对于一些实时性要求不是很高且数据量比较小的操作,选择用hadoop操作数据库,其实说来也不是不可以考录,不说了,每个人有自己的观点,当然这个也与每个人所在的公司的要求有关系,下面就说说自己遇到的比较恼人的一个问题:还是classNotFound的问题:

首先要说明的是: 你的运行环境,先的明白你的代码到底是在服务器端还是在本地,其次再参考不同的代码进行模拟。

参考文章:http://www.cnblogs.com/xia520pi/archive/2012/06/12/2546261.html 项目的简单结构:

本地通过Eclipse链接Hadoop操作Mysql数据库问题小结

下面说说本地运行的时候3种classNotFount的问题

(1)MySql的驱动找不到,这个很容易解决,在自己的项目中引入MySql的官方驱动jar包就可以解决了,如上图红色框

(2) 对JDBC的Jar包处理

因为程序虽然用Eclipse编译运行但最终要提交到Hadoop集群上,所以JDBC的jar必须放到Hadoop集群中。有两种方式:

<1>在每个节点下的${HADOOP_HOME}/lib下添加该包,重启集群,一般是比较原始的方法。

我们的Hadoop安装包在 " /usr/hadoop ",所以把Jar放到" /usr/hadoop/lib "下面,然后重启,记得是Hadoop集群中所有的节点都要放,因为执行分布式是程序是在每个节点本地机器上进行。

<2> 在Hadoop集群的分布式文件系统中创建"/lib"文件夹,并把我们的的JDBC的jar包上传上去,然后在主程序添加如下语句,就能保证 Hadoop集群中所有的节点都能使用这个jar包。因为这个jar包放在了HDFS上,而不是本地系统,这个要理解清楚。

(3)关联数据库表的实体类找不到( 本篇文章解决的重点 ),StudentRecord.class not found。。。。

出现此问题的源代码如下:

   1 package cn.hadoop.db;   2    3 import java.io.DataInput;   4 import java.io.DataOutput;   5 import java.io.IOException;   6 import java.net.URI;   7 import java.sql.PreparedStatement;   8 import java.sql.ResultSet;   9 import java.sql.SQLException;  10   11 import org.apache.hadoop.filecache.DistributedCache;  12 import org.apache.hadoop.fs.FileSystem;  13 import org.apache.hadoop.fs.Path;  14 import org.apache.hadoop.io.LongWritable;  15 import org.apache.hadoop.io.Text;  16 import org.apache.hadoop.io.Writable;  17 import org.apache.hadoop.mapred.FileOutputFormat;  18 import org.apache.hadoop.mapred.JobClient;  19 import org.apache.hadoop.mapred.JobConf;  20 import org.apache.hadoop.mapred.MapReduceBase;  21 import org.apache.hadoop.mapred.Mapper;  22 import org.apache.hadoop.mapred.OutputCollector;  23 import org.apache.hadoop.mapred.Reporter;  24 import org.apache.hadoop.mapred.lib.IdentityReducer;  25 import org.apache.hadoop.mapred.lib.db.DBConfiguration;  26 import org.apache.hadoop.mapred.lib.db.DBInputFormat;  27 import org.apache.hadoop.mapred.lib.db.DBWritable;  28   29 import cn.hadoop.db.DBAccessReader.Student.DBInputMapper;  30   31 public class DBAccessReader {  32       33     public static class Student implements Writable, DBWritable{  34         public int id;  35         public  String name;  36         public  String sex;  37         public  int age;  38           39         public Student() {  40               41         }  42         @Override  43         public void write(PreparedStatement statement) throws SQLException {  44             statement.setInt(1, this.id);  45             statement.setString(2, this.name);  46             statement.setString(3, this.sex);  47             statement.setInt(4, this.age);  48         }  49   50         @Override  51         public void readFields(ResultSet resultSet) throws SQLException {  52             this.id = resultSet.getInt(1);  53             this.name = resultSet.getString(2);  54             this.sex = resultSet.getString(3);  55             this.age = resultSet.getInt(4);  56         }  57   58         @Override  59         public void write(DataOutput out) throws IOException {  60             out.writeInt(this.id);  61             Text.writeString(out, this.name);  62             Text.writeString(out, this.sex);  63             out.writeInt(this.age);  64         }  65   66         @Override  67         public void readFields(DataInput in) throws IOException {  68             this.id = in.readInt();  69             this.name = Text.readString(in);  70             this.sex = Text.readString(in);  71             this.age = in.readInt();  72         }  73   74         @Override  75         public String toString() {  76             return new String("Student [id=" + id + ", name=" + name + ", sex=" + sex  77                     + ", age=" + age + "]");  78         }  79           80         public static class DBInputMapper extends MapReduceBase implements Mapper<LongWritable, cn.hadoop.db.DBAccessReader.Student, LongWritable, Text>{  81   82             @Override  83             public void map(LongWritable key, cn.hadoop.db.DBAccessReader.Student value,  84                     OutputCollector<LongWritable, Text> collector,  85                     Reporter reporter) throws IOException {  86                 collector.collect(new LongWritable(value.id), new Text(value.toString()));  87                   88             }  89               90         }  91           92           93           94     }  95     public static void main(String[] args) throws IOException{  96           97         JobConf conf = new JobConf(DBAccessReader.class);  98         conf.set("mapred.job.tracker", "192.168.56.10:9001");   99          100             FileSystem fileSystem = FileSystem.get( 101                     URI.create("hdfs://192.168.56.10:9000/"), conf); 102              103             DistributedCache 104             .addFileToClassPath( 105                     new Path( 106                             "hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar"), 107                             conf, fileSystem); 108         conf.setOutputKeyClass(LongWritable.class); 109         conf.setOutputValueClass(Text.class); 110  111         conf.setInputFormat(DBInputFormat.class); 112  113  114  115         FileOutputFormat.setOutputPath(conf, new Path( 116                 "hdfs://192.168.56.10:9000/user/studentInfo")); 117  118         DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", 119                 "jdbc:mysql://192.168.56.109:3306/school", "root", "1qaz2wsx"); 120  121         String[] fields = { "id", "name", "sex", "age" }; 122  123         DBInputFormat.setInput(conf, cn.hadoop.db.DBAccessReader.Student.class, "student", null, 124                 "id", fields); 125  126         conf.setMapperClass(DBInputMapper.class); 127         conf.setReducerClass(IdentityReducer.class); 128          129             JobClient.runJob(conf); 130     } 131 } 

运行的时候,报的错误如下:

本地通过Eclipse链接Hadoop操作Mysql数据库问题小结

错误很明显,就是找不到实体类Student,可是看代码好多遍,这个类明明在啊,为啥会报错找不到呢???我也迷糊了很长时间,各种尝试都是不行,最后还是将目标锁定在日志信息里面,很明显,这是在服务器端去找DBAccessReader这个Job的jar,明显我们没有上传,肯定是找不到到,所以报错,错误很明显,就在main方法下面的这里:

 1 JobConf conf = new JobConf(DBAccessReader.class); 2 conf.set("mapred.job.tracker", "192.168.56.10:9001");  

所以,修改代码如下以后,问题得到解决:

   1 package cn.hadoop.db;   2    3 import java.io.DataInput;   4 import java.io.DataOutput;   5 import java.io.IOException;   6 import java.net.URI;   7 import java.sql.PreparedStatement;   8 import java.sql.ResultSet;   9 import java.sql.SQLException;  10   11 import org.apache.hadoop.filecache.DistributedCache;  12 import org.apache.hadoop.fs.FileSystem;  13 import org.apache.hadoop.fs.Path;  14 import org.apache.hadoop.io.LongWritable;  15 import org.apache.hadoop.io.Text;  16 import org.apache.hadoop.io.Writable;  17 import org.apache.hadoop.mapred.FileOutputFormat;  18 import org.apache.hadoop.mapred.JobClient;  19 import org.apache.hadoop.mapred.JobConf;  20 import org.apache.hadoop.mapred.MapReduceBase;  21 import org.apache.hadoop.mapred.Mapper;  22 import org.apache.hadoop.mapred.OutputCollector;  23 import org.apache.hadoop.mapred.Reporter;  24 import org.apache.hadoop.mapred.lib.IdentityReducer;  25 import org.apache.hadoop.mapred.lib.db.DBConfiguration;  26 import org.apache.hadoop.mapred.lib.db.DBInputFormat;  27 import org.apache.hadoop.mapred.lib.db.DBWritable;  28   29 import cn.hadoop.db.DBAccessReader.Student.DBInputMapper;  30   31 public class DBAccessReader {  32   33     public static class Student implements Writable, DBWritable {  34         public int id;  35         public String name;  36         public String sex;  37         public int age;  38   39         public Student() {  40   41         }  42   43         @Override  44         public void write(PreparedStatement statement) throws SQLException {  45             statement.setInt(1, this.id);  46             statement.setString(2, this.name);  47             statement.setString(3, this.sex);  48             statement.setInt(4, this.age);  49         }  50   51         @Override  52         public void readFields(ResultSet resultSet) throws SQLException {  53             this.id = resultSet.getInt(1);  54             this.name = resultSet.getString(2);  55             this.sex = resultSet.getString(3);  56             this.age = resultSet.getInt(4);  57         }  58   59         @Override  60         public void write(DataOutput out) throws IOException {  61             out.writeInt(this.id);  62             Text.writeString(out, this.name);  63             Text.writeString(out, this.sex);  64             out.writeInt(this.age);  65         }  66   67         @Override  68         public void readFields(DataInput in) throws IOException {  69             this.id = in.readInt();  70             this.name = Text.readString(in);  71             this.sex = Text.readString(in);  72             this.age = in.readInt();  73         }  74   75         @Override  76         public String toString() {  77             return new String("Student [id=" + id + ", name=" + name + ", sex="  78                     + sex + ", age=" + age + "]");  79         }  80   81         public static class DBInputMapper extends MapReduceBase  82                 implements  83                 Mapper<LongWritable, cn.hadoop.db.DBAccessReader.Student, LongWritable, Text> {  84   85             @Override  86             public void map(LongWritable key,  87                     cn.hadoop.db.DBAccessReader.Student value,  88                     OutputCollector<LongWritable, Text> collector,  89                     Reporter reporter) throws IOException {  90                 collector.collect(new LongWritable(value.id),  91                         new Text(value.toString()));  92   93             }  94   95         }  96   97     }  98   99     public static void main(String[] args) throws IOException { 100  101         JobConf conf = new JobConf(); 102         FileSystem fileSystem = FileSystem.get( 103                 URI.create("hdfs://192.168.56.10:9000/"), conf); 104  105         DistributedCache 106                 .addFileToClassPath( 107                         new Path( 108                                 "hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar"), 109                         conf, fileSystem); 110         conf.setOutputKeyClass(LongWritable.class); 111         conf.setOutputValueClass(Text.class); 112  113         conf.setInputFormat(DBInputFormat.class); 114  115         FileOutputFormat.setOutputPath(conf, new Path( 116                 "hdfs://192.168.56.10:9000/user/studentInfo")); 117  118         DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", 119                 "jdbc:mysql://192.168.56.109:3306/school", "root", "1qaz2wsx"); 120  121         String[] fields = { "id", "name", "sex", "age" }; 122  123         DBInputFormat.setInput(conf, cn.hadoop.db.DBAccessReader.Student.class, 124                 "student", null, "id", fields); 125  126         conf.setMapperClass(DBInputMapper.class); 127         conf.setReducerClass(IdentityReducer.class); 128  129         JobClient.runJob(conf); 130     } 131 } 

以下是运行时打印出的日志信息:

 三月 13, 2016 5:39:57 下午 org.apache.hadoop.util.NativeCodeLoader <clinit> 警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 三月 13, 2016 5:39:57 下午 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 三月 13, 2016 5:39:57 下午 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 三月 13, 2016 5:39:57 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager downloadCacheObject 信息: Creating mysql-connector-java-5.1.18-bin.jar in /tmp/hadoop-hadoop/mapred/local/archive/2605709384407216388_-2048973133_91096108/192.168.56.10/lib-work-2076365714246383853 with rwxr-xr-x 三月 13, 2016 5:39:58 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager downloadCacheObject 信息: Cached hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar as /tmp/hadoop-hadoop/mapred/local/archive/2605709384407216388_-2048973133_91096108/192.168.56.10/lib/mysql-connector-java-5.1.18-bin.jar 三月 13, 2016 5:39:58 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager localizePublicCacheObject 信息: Cached hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar as /tmp/hadoop-hadoop/mapred/local/archive/2605709384407216388_-2048973133_91096108/192.168.56.10/lib/mysql-connector-java-5.1.18-bin.jar 三月 13, 2016 5:39:58 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Running job: job_local_0001 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.Task initialize 信息:  Using ResourceCalculatorPlugin : null 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.MapTask runOldMapper 信息: numReduceTasks: 1 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: io.sort.mb = 100 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: data buffer = 79691776/99614720 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: record buffer = 262144/327680 三月 13, 2016 5:39:59 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息:  map 0% reduce 0% 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 5:40:04 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000000_0' done. 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息:  map 100% reduce 0% 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.Task initialize 信息:  Using ResourceCalculatorPlugin : null 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Merging 1 sorted segments 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Down to the last merge-pass, with 1 segments left of total size: 542 bytes 三月 13, 2016 5:40:05 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 5:40:06 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 三月 13, 2016 5:40:06 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 三月 13, 2016 5:40:06 下午 org.apache.hadoop.mapred.Task commit 信息: Task attempt_local_0001_r_000000_0 is allowed to commit now 三月 13, 2016 5:40:06 下午 org.apache.hadoop.mapred.FileOutputCommitter commitTask 信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.56.10:9000/user/studentInfo 三月 13, 2016 5:40:08 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: reduce > reduce 三月 13, 2016 5:40:08 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_r_000000_0' done. 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息:  map 100% reduce 100% 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Job complete: job_local_0001 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息: Counters: 20 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:   File Input Format Counters 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Bytes Read=0 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:   File Output Format Counters 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Bytes Written=513 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:   FileSystemCounters 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     FILE_BYTES_READ=1592914 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     HDFS_BYTES_READ=1579770 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     FILE_BYTES_WRITTEN=3270914 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     HDFS_BYTES_WRITTEN=513 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:   Map-Reduce Framework 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Reduce input groups=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Map output materialized bytes=546 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Combine output records=0 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Map input records=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Reduce shuffle bytes=0 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Reduce output records=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Spilled Records=18 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Map output bytes=522 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Total committed heap usage (bytes)=231874560 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Map input bytes=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Combine input records=0 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Map output records=9 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     SPLIT_RAW_BYTES=75 三月 13, 2016 5:40:09 下午 org.apache.hadoop.mapred.Counters log 信息:     Reduce input records=9 

这是运行的结果:

本地通过Eclipse链接Hadoop操作Mysql数据库问题小结

到此,Hadoop连接数据库读取数据表输出的操作完成了,当然这就是一个简单的演示,实际项目中不会用到,只是可以帮我们熟悉熟悉Hadoop操作数据库的流程,下面给出

Hadoop处理文件以后,将结果写入数据库的示例代码,和上面的差不多:

   1 package cn.hadoop.db;   2    3 import java.io.DataInput;   4 import java.io.DataOutput;   5 import java.io.IOException;   6 import java.net.URI;   7 import java.sql.PreparedStatement;   8 import java.sql.ResultSet;   9 import java.sql.SQLException;  10 import java.util.Iterator;  11 import java.util.StringTokenizer;  12   13 import org.apache.hadoop.filecache.DistributedCache;  14 import org.apache.hadoop.fs.FileSystem;  15 import org.apache.hadoop.fs.Path;  16 import org.apache.hadoop.io.IntWritable;  17 import org.apache.hadoop.io.Text;  18 import org.apache.hadoop.io.Writable;  19 import org.apache.hadoop.mapred.FileInputFormat;  20 import org.apache.hadoop.mapred.JobClient;  21 import org.apache.hadoop.mapred.JobConf;  22 import org.apache.hadoop.mapred.MapReduceBase;  23 import org.apache.hadoop.mapred.Mapper;  24 import org.apache.hadoop.mapred.OutputCollector;  25 import org.apache.hadoop.mapred.Reducer;  26 import org.apache.hadoop.mapred.Reporter;  27 import org.apache.hadoop.mapred.TextInputFormat;  28 import org.apache.hadoop.mapred.lib.db.DBConfiguration;  29 import org.apache.hadoop.mapred.lib.db.DBOutputFormat;  30 import org.apache.hadoop.mapred.lib.db.DBWritable;  31   32 public class WriteDB {  33   34     public static void main(String[] args) throws IOException {  35         JobConf conf = new JobConf();  36   37         FileSystem fileSystem = FileSystem.get(  38                 URI.create("hdfs://192.168.56.10:9000/"), conf);  39         DistributedCache  40                 .addFileToClassPath(  41                         new Path(  42                                 "hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar"),  43                         conf, fileSystem);  44         conf.setInputFormat(TextInputFormat.class);  45         conf.setOutputFormat(DBOutputFormat.class);  46   47         conf.setOutputKeyClass(Text.class);  48         conf.setOutputValueClass(IntWritable.class);  49   50         conf.setMapperClass(Map.class);  51         conf.setCombinerClass(Combine.class);  52         conf.setReducerClass(Reduce.class);  53   54         FileInputFormat.setInputPaths(conf, new Path(  55                 "hdfs://192.168.56.10:9000/user/db_in"));  56   57         DBConfiguration  58                 .configureDB(  59                         conf,  60                         "com.mysql.jdbc.Driver",  61                         "jdbc:mysql://192.168.56.109:3306/school?characterEncoding=UTF-8",  62                         "root", "1qaz2wsx");  63   64         String[] fields = { "word", "number" };  65   66         DBOutputFormat.setOutput(conf, "wordcount", fields);  67         JobClient.runJob(conf);  68   69     }  70 }  71   72 class Map extends MapReduceBase implements  73         Mapper<Object, Text, Text, IntWritable> {  74   75     private final static IntWritable one = new IntWritable(1);  76   77     private Text word = new Text();  78   79     @Override  80     public void map(Object key, Text value,  81             OutputCollector<Text, IntWritable> output, Reporter reporter)  82             throws IOException {  83         String line = value.toString();  84         StringTokenizer tokenizer = new StringTokenizer(line);  85         while (tokenizer.hasMoreTokens()) {  86             word.set(tokenizer.nextToken());  87             output.collect(word, one);  88         }  89     }  90   91 }  92   93 class Combine extends MapReduceBase implements  94         Reducer<Text, IntWritable, Text, IntWritable> {  95   96     @Override  97     public void reduce(Text key, Iterator<IntWritable> values,  98             OutputCollector<Text, IntWritable> output, Reporter reporter)  99             throws IOException { 100         int sum = 0; 101         while (values.hasNext()) { 102             sum += values.next().get(); 103         } 104         output.collect(key, new IntWritable(sum)); 105     } 106  107 } 108  109 class Reduce extends MapReduceBase implements 110         Reducer<Text, IntWritable, WordRecord, Text> { 111  112     @Override 113     public void reduce(Text key, Iterator<IntWritable> values, 114             OutputCollector<WordRecord, Text> output, Reporter reporter) 115             throws IOException { 116         int sum = 0; 117         while (values.hasNext()) { 118             sum += values.next().get(); 119         } 120         WordRecord wordcount = new WordRecord(); 121         wordcount.word = key.toString(); 122         wordcount.number = sum; 123         output.collect(wordcount, new Text()); 124     } 125  126 } 127  128 class WordRecord implements Writable, DBWritable { 129  130     public String word; 131     public int number; 132  133     @Override 134     public void write(PreparedStatement statement) throws SQLException { 135         statement.setString(1, this.word); 136         statement.setInt(2, this.number); 137     } 138  139     @Override 140     public void readFields(ResultSet resultSet) throws SQLException { 141         this.word = resultSet.getString(1); 142         this.number = resultSet.getInt(2); 143     } 144  145     @Override 146     public void write(DataOutput out) throws IOException { 147         Text.writeString(out, this.word); 148         out.writeInt(this.number); 149     } 150  151     @Override 152     public void readFields(DataInput in) throws IOException { 153         this.word = Text.readString(in); 154         this.number = in.readInt(); 155     } 156  157 } 

运行打印的日志信息如下:

 三月 13, 2016 6:09:31 下午 org.apache.hadoop.util.NativeCodeLoader <clinit> 警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 三月 13, 2016 6:09:31 下午 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 三月 13, 2016 6:09:31 下午 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 三月 13, 2016 6:09:31 下午 org.apache.hadoop.mapred.FileInputFormat listStatus 信息: Total input paths to process : 2 三月 13, 2016 6:09:32 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager downloadCacheObject 信息: Creating mysql-connector-java-5.1.18-bin.jar in /tmp/hadoop-hadoop/mapred/local/archive/-8205516116475251282_-2048973133_91096108/192.168.56.10/lib-work-1371358416408211818 with rwxr-xr-x 三月 13, 2016 6:09:33 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager downloadCacheObject 信息: Cached hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar as /tmp/hadoop-hadoop/mapred/local/archive/-8205516116475251282_-2048973133_91096108/192.168.56.10/lib/mysql-connector-java-5.1.18-bin.jar 三月 13, 2016 6:09:33 下午 org.apache.hadoop.filecache.TrackerDistributedCacheManager localizePublicCacheObject 信息: Cached hdfs://192.168.56.10:9000/lib/mysql-connector-java-5.1.18-bin.jar as /tmp/hadoop-hadoop/mapred/local/archive/-8205516116475251282_-2048973133_91096108/192.168.56.10/lib/mysql-connector-java-5.1.18-bin.jar 三月 13, 2016 6:09:33 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Running job: job_local_0001 三月 13, 2016 6:09:33 下午 org.apache.hadoop.mapred.Task initialize 信息:  Using ResourceCalculatorPlugin : null 三月 13, 2016 6:09:33 下午 org.apache.hadoop.mapred.MapTask runOldMapper 信息: numReduceTasks: 1 三月 13, 2016 6:09:33 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: io.sort.mb = 100 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: data buffer = 79691776/99614720 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: record buffer = 262144/327680 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 三月 13, 2016 6:09:34 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息:  map 0% reduce 0% 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: hdfs://192.168.56.10:9000/user/db_in/file2.txt:0+41 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000000_0' done. 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.Task initialize 信息:  Using ResourceCalculatorPlugin : null 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask runOldMapper 信息: numReduceTasks: 1 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: io.sort.mb = 100 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: data buffer = 79691776/99614720 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: record buffer = 262144/327680 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 三月 13, 2016 6:09:36 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 三月 13, 2016 6:09:37 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息:  map 100% reduce 0% 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: hdfs://192.168.56.10:9000/user/db_in/file1.txt:0+24 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000001_0' done. 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.Task initialize 信息:  Using ResourceCalculatorPlugin : null 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息:  三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Merging 2 sorted segments 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Down to the last merge-pass, with 2 segments left of total size: 116 bytes 三月 13, 2016 6:09:39 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息:  三月 13, 2016 6:09:41 下午 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 三月 13, 2016 6:09:42 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: reduce > reduce 三月 13, 2016 6:09:42 下午 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_r_000000_0' done. 三月 13, 2016 6:09:42 下午 org.apache.hadoop.mapred.FileOutputCommitter cleanupJob 警告: Output path is null in cleanup 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息:  map 100% reduce 100% 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Job complete: job_local_0001 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息: Counters: 19 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:   File Input Format Counters  三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Bytes Read=65 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:   File Output Format Counters  三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Bytes Written=0 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:   FileSystemCounters 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     FILE_BYTES_READ=2389740 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     HDFS_BYTES_READ=2369826 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     FILE_BYTES_WRITTEN=4905883 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:   Map-Reduce Framework 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Reduce input groups=7 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Map output materialized bytes=124 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Combine output records=9 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Map input records=5 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Reduce shuffle bytes=0 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Reduce output records=7 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Spilled Records=18 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Map output bytes=104 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Total committed heap usage (bytes)=482291712 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Map input bytes=65 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Combine input records=10 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Map output records=10 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     SPLIT_RAW_BYTES=198 三月 13, 2016 6:09:43 下午 org.apache.hadoop.mapred.Counters log 信息:     Reduce input records=9 

数据库中的结果如下:

本地通过Eclipse链接Hadoop操作Mysql数据库问题小结

以下代码都是本人亲自测试和运行过的,hadoop的版本和服务器环境信息请参看上一篇博文。

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » 本地通过Eclipse链接Hadoop操作Mysql数据库问题小结

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
分享按钮