0
点赞
收藏
分享

微信扫一扫

92课程作业,hive自定义IMFRecordReader,切分^^尖尖头及| 分割符


92课程作业,hive自定义IMFRecordReader,切分^^尖尖头及| 分割符



代码写好了,还没有运行, 到时候在hive中测试一把


IMFInputFormat

package com.dt.spark.hive;
  


 import java.io.IOException;


 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;  
  
 public class IMFInputFormat extends  TextInputFormat implements    
 JobConfigurable 
       {
  public RecordReader<LongWritable, Text> getRecordReader(    
           InputSplit genericSplit, JobConf job, Reporter reporter)    
           throws IOException {    
   
       reporter.setStatus(genericSplit.toString());    
       return new IMFRecordReader((FileSplit) genericSplit,job);    
   }    
   
   
   
 }



IMFRecordReader

package com.dt.spark.hive;


import java.io.IOException;  
 import java.io.InputStream;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;


 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.FSDataInputStream;  
 import org.apache.hadoop.fs.FileSystem;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.io.LongWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.io.compress.CompressionCodec;  
 import org.apache.hadoop.io.compress.CompressionCodecFactory;  
 import org.apache.hadoop.mapred.FileSplit;  
 import org.apache.hadoop.util.LineReader;  
 import org.apache.hadoop.mapred.RecordReader;  
   
   
 public class IMFRecordReader implements  
         RecordReader<LongWritable, Text> {  
   
   
     private CompressionCodecFactory compressionCodecs = null;  
     private long start;  
     private long pos;  
     private long end;  
     private LineReader lineReader;  
     int maxLineLength;  
   
     public IMFRecordReader(FileSplit inputSplit, Configuration job)  
             throws IOException {  
         maxLineLength = job.getInt("mapred.IMFRecordReader.maxlength",  
                 Integer.MAX_VALUE);  
         start = inputSplit.getStart();  
         end = start + inputSplit.getLength();  
         final Path file = inputSplit.getPath();  
         compressionCodecs = new CompressionCodecFactory(job);  
         final CompressionCodec codec = compressionCodecs.getCodec(file);  
   
         // Open file and seek to the start of the split  
         FileSystem fs = file.getFileSystem(job);  
         FSDataInputStream fileIn = fs.open(file);  
         boolean skipFirstLine = false;  
         if (codec != null) {  
             lineReader = new LineReader(codec.createInputStream(fileIn), job);  
             end = Long.MAX_VALUE;  
         } else {  
             if (start != 0) {  
                 skipFirstLine = true;  
                 --start;  
                 fileIn.seek(start);  
             }  
             lineReader = new LineReader(fileIn, job);  
         }  
         if (skipFirstLine) {  
             start += lineReader.readLine(new Text(), 0,  
                     (int) Math.min((long) Integer.MAX_VALUE, end - start));  
         }  
         this.pos = start;  
     }  
   
     public IMFRecordReader(InputStream in, long offset, long endOffset,  
             int maxLineLength) {  
         this.maxLineLength = maxLineLength;  
         this.lineReader = new LineReader(in);  
         this.start = offset;  
         this.pos = offset;  
         this.end = endOffset;  
     }  
   
     public IMFRecordReader(InputStream in, long offset, long endOffset,  
             Configuration job) throws IOException {  
         this.maxLineLength = job.getInt(  
                 "mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);  
         this.lineReader = new LineReader(in, job);  
         this.start = offset;  
         this.pos = offset;  
         this.end = endOffset;  
     }  
   
     public LongWritable createKey() {  
         return new LongWritable();  
     }  
   
     public Text createValue() {  
         return new Text();  
     }  
   
     /** 
      * Reads the next record in the split. get usefull fields from the raw nginx 
      * log. 
      *  
      * @param key 
      *            key of the record which will map to the byte offset of the 
      *            record's line 
      * @param value 
      *            the record in text format 
      * @return true if a record existed, false otherwise 
      * @throws IOException 
      */  
     
     public synchronized boolean next(LongWritable key, Text value)  
             throws IOException {  
         // Stay within the split  
         while (pos < end) {  
             key.set(pos);  
             int newSize = lineReader.readLine(value, maxLineLength,  
                     Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),  
                             maxLineLength));  
   
             if (newSize == 0)  
                 return false;  
   
            // String str = value.toString().toLowerCase()  
              //       .replaceAll("\\@\\_\\@", "\001");  
           //  String str = value.toString().toLowerCase()  
             //        .replaceAll("\\^\\^", "\001");  
             
             String patternhive ="^(.*)\\^\\^(.*)\\^\\^(.*)\\^\\^(.*)\\|(.*)\\|(.*)\\|(.*)\\^\\^(.*)";
       
       Pattern phive = Pattern.compile(patternhive);
       String strhive ="0^^Hadoop^^America^^5000|8000|12000|level8^^male";
       System.out.println(strhive);
       Matcher mhive = phive.matcher(strhive);
       String resultstr  ="defaultisblank";
       while(mhive.find()){
       resultstr  = mhive.group(1)+"\t"+mhive.group(2)+"\t"+mhive.group(3)+"\t"+
       mhive.group(4)+"\t"+mhive.group(5)+"\t"+mhive.group(6)+"\t"+
       mhive.group(7)+"\t"+mhive.group(8);
        
       System.out.println(resultstr);
       } ;     
             
             value.set(resultstr);  
             pos += newSize;  
   
             if (newSize < maxLineLength)  
                 return true;  
         }  
   
         return false;  
     }  
   
     public float getProgress() {  
         if (start == end) {  
             return 0.0f;  
         } else {  
             return Math.min(1.0f, (pos - start) / (float) (end - start));  
         }  
     }  
   
     public synchronized long getPos() throws IOException {  
         return pos;  
     }  
   
     public synchronized void close() throws IOException {  
         if (lineReader != null)  
             lineReader.close();  
     }  
       
     // 测试 输出  
     public static void main(String ags[]){   
          String str1 ="123@_@abcd@_@fk".replaceAll("\\@\\_\\@", "\001"); 
          
          System.out.println(str1);
          System.out.println("\001");
          
          String str2 ="0^^Hadoop^^America^^5000|8000|12000|level8^^male".replaceAll("\\^\\^", "\001"); 
          System.out.println(str2);
          
          String str3 ="0^^Hadoop^^America^^5000|8000|12000|level8^^male".replaceAll("\\^\\^", "\001"); 
          System.out.println(str3);
          
          String patternhive ="^(.*)\\^\\^(.*)\\^\\^(.*)\\^\\^(.*)\\|(.*)\\|(.*)\\|(.*)\\^\\^(.*)";
   
   Pattern phive = Pattern.compile(patternhive);
   String strhive ="0^^Hadoop^^America^^5000|8000|12000|level8^^male";
   System.out.println(strhive);
   Matcher mhive = phive.matcher(strhive);
   while(mhive.find()){
   String resultstr = mhive.group(1)+"\t"+mhive.group(2)+"\t"+mhive.group(3)+"\t"+
   mhive.group(4)+"\t"+mhive.group(5)+"\t"+mhive.group(6)+"\t"+
   mhive.group(7)+"\t"+mhive.group(8);
    
   System.out.println(resultstr);
   }
   
   
   
   System.out.println();
   System.out.println("=========================");
   System.out.println();
   while(mhive.find())


   {
   System.out.println("m.group():"+mhive.group()); //打印一个大组


   System.out.println("m.group(1):"+mhive.group(1)); //打印组1


   System.out.println("m.group(2):"+mhive.group(2)); //打印组2
   System.out.println("m.group(3):"+mhive.group(3)); 
   System.out.println("m.group(4):"+mhive.group(4)); 
   System.out.println("m.group(5):"+mhive.group(5)); 
   System.out.println("m.group(6):"+mhive.group(6)); 
   
   System.out.println("m.group(7):"+mhive.group(7)); 
   System.out.println("m.group(8):"+mhive.group(8)); 
   // System.out.println("m.group(9):"+mhive.group(9)); 
   // System.out.println("m.group(10):"+mhive.group(10)); 
   //System.out.println("m.group(11):"+mhive.group(11)); 
   System.out.println();


   }
          
   System.out.println();
   System.out.println("=========================");
   System.out.println();
/*String pattern ="^(\\S+) (\\S+) (\\S+) \\[([\\w/]+)([\\w:/]+)\\s([+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)";

Pattern p = Pattern.compile(pattern);


String s = "1.1.11.1- - [1 /oct/2016:00:00:05 +0800] \"POST /Pay HTTP/1.1\" 200 1285";


Matcher m = p.matcher(s);
while(m.find())


 {


System.out.println("m.group():"+m.group()); //打印一个大组


 System.out.println("m.group(1):"+m.group(1)); //打印组1


System.out.println("m.group(2):"+m.group(2)); //打印组2
System.out.println("m.group(3):"+m.group(3)); 
System.out.println("m.group(4):"+m.group(4)); 
System.out.println("m.group(5):"+m.group(5)); 
System.out.println("m.group(6):"+m.group(6)); 

System.out.println("m.group(7):"+m.group(7)); 
System.out.println("m.group(8):"+m.group(8)); 
System.out.println("m.group(9):"+m.group(9)); 
System.out.println("m.group(10):"+m.group(10)); 
System.out.println("m.group(11):"+m.group(11)); 
 System.out.println();


 }
 System.out.println("捕获个数:groupCount()="+m.groupCount());
          
          */
          
          
          
          
      }  
 }

1,/usr/local/IMF_testdata/hivestudy/employeesinputformat.txt
0^^Hadoop^^America^^5000|8000|12000|level8^^male
1^^Spark^^America^^8000|10000|15000|level9^^famale
2^^Flink^^America^^7000|8000|13000|level10^^male
3^^Hadoop^^America^^9000|11000|12000|level10^^famale
4^^Spark^^America^^10000|11000|12000|level12^^male
5^^Flink^^America^^11000|12000|18000|level18^^famale
6^^Hadoop^^America^^15000|16000|19000|level16^^male
7^^Spark^^America^^18000|19000|20000|level20^^male
8^^Flink^^America^^15000|16000|19000|level19^^male


二:导出jar包IMFInputFormat.jar


三,加载jar包
hive> add jar /usr/local/IMF_testdata/hivestudy/IMFInputFormat.jar;


四,建立表

hive> CREATE TABLE employee_InputFormat(userid  INT,name String,address String, salarys1 int ,salarys2 int ,salarys3 int ,salarys4 string , gendre string)  stored as INPUTFORMAT 'com.dt.spark.hive.IMFInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
 OK
 Time taken: 0.399 seconds
 hive>  
     >




五,导入数据


LOAD DATA LOCAL INPATH '/usr/local/IMF_testdata/hivestudy/employeesinputformat.txt' INTO TABLE employee_InputFormat;



六,查询 表 描述

hive> desc formatted  employee_inputformat;
 OK
 # col_name              data_type               comment             
                  
 userid                  int                                         
 name                    string                                      
 address                 string                                      
 salarys1                int                                         
 salarys2                int                                         
 salarys3                int                                         
 salarys4                string                                      
 gendre                  string                                      
                  
 # Detailed Table Information             
 Database:               default                  
 Owner:                  root                     
 CreateTime:             Sun Dec 11 18:14:08 CST 2016     
 LastAccessTime:         UNKNOWN                  
 Protect Mode:           None                     
 Retention:              0                        
 Location:               hdfs://master:9000/user/hive/warehouse/employee_inputformat      
 Table Type:             MANAGED_TABLE            
 Table Parameters:                
         COLUMN_STATS_ACCURATE   true                
         numFiles                1                   
         totalSize               467                 
         transient_lastDdlTime   1481451523          
                  
 # Storage Information            
 SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
 InputFormat:            com.dt.spark.hive.IMFInputFormat         
 OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat       
 Compressed:             No                       
 Num Buckets:            -1                       
 Bucket Columns:         []                       
 Sort Columns:           []                       
 Storage Desc Params:             
         serialization.format    1                   
 Time taken: 0.165 seconds, Fetched: 36 row(s)








7,查询表数据




> select * from employee_inputformat;
 OK
 0^^Hadoop^^America^^5000|8000|12000|level8^^male
 0       Hadoop  America 5000    8000    12000   level8  male
 0^^Hadoop^^America^^5000|8000|12000|level8^^male
 0       Hadoop  America 5000    8000    12000   level8  male
 0^^Hadoop^^America^^5000|8000|12000|level8^^male
 0       Hadoop  America 5000    8000    12000   level8  male
 0^^Hadoop^^America^^5000|8000|12000|level8^^male
 0       Hadoop  America 5000    8000    12000   level8  male
 0^^Hadoop^^America^^5000|8000|12000|level8^^male
 0       Hadoop  America 5000    8000    12000   level8  male
 0^^Hadoop^^America^^5000|8000|12000|level8^^male
 0       Hadoop  America 5000    8000    12000   level8  male
 0^^Hadoop^^America^^5000|8000|12000|level8^^male
 0       Hadoop  America 5000    8000    12000   level8  male
 0^^Hadoop^^America^^5000|8000|12000|level8^^male
 0       Hadoop  America 5000    8000    12000   level8  male
 0^^Hadoop^^America^^5000|8000|12000|level8^^male
 0       Hadoop  America 5000    8000    12000   level8  male
 NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
 NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
 NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
 NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
 NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
 NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
 NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
 NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
 NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
 Time taken: 0.563 seconds, Fetched: 9 row(s)



8,里面重复了数据,重新来,原因是代码里测试写的
 String strhive ="0^^Hadoop^^America^^5000|8000|12000|level8^^male";
源文件的数据一部分搞乱了,重新清洗了源数据.
     
删除表,重新来一次
drop table employee_inputformat;



运行结果

rties
hive> 
    > 
    > 
    > 
    > 
    > 
    > drop table employee_inputformat;
OK
Time taken: 1.733 seconds
hive> add jar /usr/local/IMF_testdata/hivestudy/IMFInputFormat2.jar;
Added [/usr/local/IMF_testdata/hivestudy/IMFInputFormat2.jar] to class path
Added resources: [/usr/local/IMF_testdata/hivestudy/IMFInputFormat2.jar]
hive> CREATE TABLE employee_InputFormat(userid  INT,name String,address String, salarys1 int ,salarys2 int ,salarys3 int ,salarys4 string , gendre string)  stored as INPUTFORMAT 'com.dt.spark.hive.IMFInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
OK
Time taken: 0.477 seconds
hive> LOAD DATA LOCAL INPATH '/usr/local/IMF_testdata/hivestudy/employeesinputformat.txt' INTO TABLE employee_InputFormat;
Loading data to table default.employee_inputformat
Table default.employee_inputformat stats: [numFiles=1, totalSize=467]
OK
Time taken: 0.708 seconds
hive> select * from employee_InputFormat;
OK
resultstr============0  Hadoop  America 5000    8000    12000   IMFlevel8       male
resultstr============1  Spark   America 8000    10000   15000   IMFlevel9       famale
resultstr============2  Flink   America 7000    8000    13000   IMFlevel10      male
resultstr============3  Hadoop  America 9000    11000   12000   IMFlevel10      famale
resultstr============4  Spark   America 10000   11000   12000   IMFlevel12      male
resultstr============5  Flink   America 11000   12000   18000   IMFlevel18      famale
resultstr============6  Hadoop  America 15000   16000   19000   IMFlevel16      male
resultstr============7  Spark   America 18000   19000   20000   IMFlevel20      male
resultstr============8  Flink   America 15000   16000   19000   IMFlevel19      male
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
Time taken: 0.412 seconds, Fetched: 9 row(s)
hive> select count(*) from employee_InputFormat;
Query ID = root_20161211194323_fb9af8cb-dda4-45ac-98a3-cb4222e27652
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1481447173435_0001, Tracking URL = http://master:8088/proxy/application_1481447173435_0001/
Kill Command = /usr/local/hadoop-2.6.0/bin/hadoop job  -kill job_1481447173435_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2016-12-11 19:43:42,363 Stage-1 map = 0%,  reduce = 0%
2016-12-11 19:43:58,094 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.67 sec
2016-12-11 19:44:16,900 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 4.67 sec
MapReduce Total cumulative CPU time: 4 seconds 670 msec
Ended Job = job_1481447173435_0001
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 4.67 sec   HDFS Read: 7714 HDFS Write: 2 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 670 msec
OK
9
Time taken: 55.879 seconds, Fetched: 1 row(s)
hive> select address from employee_InputFormat;
OK
resultstr============0  Hadoop  America 5000    8000    12000   IMFlevel8       male
resultstr============1  Spark   America 8000    10000   15000   IMFlevel9       famale
resultstr============2  Flink   America 7000    8000    13000   IMFlevel10      male
resultstr============3  Hadoop  America 9000    11000   12000   IMFlevel10      famale
resultstr============4  Spark   America 10000   11000   12000   IMFlevel12      male
resultstr============5  Flink   America 11000   12000   18000   IMFlevel18      famale
resultstr============6  Hadoop  America 15000   16000   19000   IMFlevel16      male
resultstr============7  Spark   America 18000   19000   20000   IMFlevel20      male
resultstr============8  Flink   America 15000   16000   19000   IMFlevel19      male
NULL
NULL
NULL
NULL
NULL
NULL
NULL
NULL
NULL
Time taken: 0.135 seconds, Fetched: 9 row(s)
hive> 




drop table employee_inputformat;
add jar /usr/local/IMF_testdata/hivestudy/IMFInputFormat2.jar;
CREATE TABLE employee_InputFormat(userid  INT,name String,address String, salarys1 int ,salarys2 int ,salarys3 int ,salarys4 string , gendre string)  stored as INPUTFORMAT 'com.dt.spark.hive.IMFInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
LOAD DATA LOCAL INPATH '/usr/local/IMF_testdata/hivestudy/employeesinputformat.txt' INTO TABLE employee_InputFormat;
select * from employee_InputFormat;





修改以后的代码

package com.dt.spark.hive;


import java.io.IOException;  
 import java.io.InputStream;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;


 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.FSDataInputStream;  
 import org.apache.hadoop.fs.FileSystem;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.io.LongWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.io.compress.CompressionCodec;  
 import org.apache.hadoop.io.compress.CompressionCodecFactory;  
 import org.apache.hadoop.mapred.FileSplit;  
 import org.apache.hadoop.util.LineReader;  
 import org.apache.hadoop.mapred.RecordReader;  
   
   
 public class IMFRecordReader implements  
         RecordReader<LongWritable, Text> {  
   
   
     private CompressionCodecFactory compressionCodecs = null;  
     private long start;  
     private long pos;  
     private long end;  
     private LineReader lineReader;  
     int maxLineLength;  
   
     public IMFRecordReader(FileSplit inputSplit, Configuration job)  
             throws IOException {  
         maxLineLength = job.getInt("mapred.IMFRecordReader.maxlength",  
                 Integer.MAX_VALUE);  
         start = inputSplit.getStart();  
         end = start + inputSplit.getLength();  
         final Path file = inputSplit.getPath();  
         compressionCodecs = new CompressionCodecFactory(job);  
         final CompressionCodec codec = compressionCodecs.getCodec(file);  
   
         // Open file and seek to the start of the split  
         FileSystem fs = file.getFileSystem(job);  
         FSDataInputStream fileIn = fs.open(file);  
         boolean skipFirstLine = false;  
         if (codec != null) {  
             lineReader = new LineReader(codec.createInputStream(fileIn), job);  
             end = Long.MAX_VALUE;  
         } else {  
             if (start != 0) {  
                 skipFirstLine = true;  
                 --start;  
                 fileIn.seek(start);  
             }  
             lineReader = new LineReader(fileIn, job);  
         }  
         if (skipFirstLine) {  
             start += lineReader.readLine(new Text(), 0,  
                     (int) Math.min((long) Integer.MAX_VALUE, end - start));  
         }  
         this.pos = start;  
     }  
   
     public IMFRecordReader(InputStream in, long offset, long endOffset,  
             int maxLineLength) {  
         this.maxLineLength = maxLineLength;  
         this.lineReader = new LineReader(in);  
         this.start = offset;  
         this.pos = offset;  
         this.end = endOffset;  
     }  
   
     public IMFRecordReader(InputStream in, long offset, long endOffset,  
             Configuration job) throws IOException {  
         this.maxLineLength = job.getInt(  
                 "mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);  
         this.lineReader = new LineReader(in, job);  
         this.start = offset;  
         this.pos = offset;  
         this.end = endOffset;  
     }  
   
     public LongWritable createKey() {  
         return new LongWritable();  
     }  
   
     public Text createValue() {  
         return new Text();  
     }  
   
     /** 
      * Reads the next record in the split. get usefull fields from the raw nginx 
      * log. 
      *  
      * @param key 
      *            key of the record which will map to the byte offset of the 
      *            record's line 
      * @param value 
      *            the record in text format 
      * @return true if a record existed, false otherwise 
      * @throws IOException 
      */  
     
     public synchronized boolean next(LongWritable key, Text value)  
             throws IOException {  
         // Stay within the split  
         while (pos < end) {  
             key.set(pos);  
             int newSize = lineReader.readLine(value, maxLineLength,  
                     Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),  
                             maxLineLength));  
   
             if (newSize == 0)  
                 return false;  
   
            // String str = value.toString().toLowerCase()  
              //       .replaceAll("\\@\\_\\@", "\001");  
           //  String str = value.toString().toLowerCase()  
             //        .replaceAll("\\^\\^", "\001");  
             
             String patternhive ="^(.*)\\^\\^(.*)\\^\\^(.*)\\^\\^(.*)\\|(.*)\\|(.*)\\|(.*)\\^\\^(.*)";
       
       Pattern phive = Pattern.compile(patternhive);
       // String strhive ="0^^Hadoop^^America^^5000|8000|12000|level8^^male";
        
       String strhive =value.toString();
       //  System.out.println("value.toString()============"+strhive);
       //System.out.println("===imf=====");
       Matcher mhive = phive.matcher(strhive);
       String resultstr  ="defaultisblank";
       while(mhive.find()){
       resultstr  = mhive.group(1)+"\t"+mhive.group(2)+"\t"+mhive.group(3)+"\t"+
       mhive.group(4)+"\t"+mhive.group(5)+"\t"+mhive.group(6)+"\t"+
        "IMF"+mhive.group(7)+"\t"+mhive.group(8);
        
       //System.out.println(resultstr);
       } ;     
       if (resultstr == null || resultstr =="defaultisblank"  ) {    }
             else { 
             System.out.println("resultstr============"+resultstr);
           
             value.set(resultstr);  
             pos += newSize;  
   
             if (newSize < maxLineLength)  
                 return true;
             
           }
         }  
   
         return false;  
     }  
   
     public float getProgress() {  
         if (start == end) {  
             return 0.0f;  
         } else {  
             return Math.min(1.0f, (pos - start) / (float) (end - start));  
         }  
     }  
   
     public synchronized long getPos() throws IOException {  
         return pos;  
     }  
   
     public synchronized void close() throws IOException {  
         if (lineReader != null)  
             lineReader.close();  
     }  
       
     // 测试 输出  
     
    /* public static void main(String ags[]){   
          String str1 ="123@_@abcd@_@fk".replaceAll("\\@\\_\\@", "\001"); 
          
          System.out.println(str1);
          System.out.println("\001");
          
          String str2 ="0^^Hadoop^^America^^5000|8000|12000|level8^^male".replaceAll("\\^\\^", "\001"); 
          System.out.println(str2);
          
          String str3 ="0^^Hadoop^^America^^5000|8000|12000|level8^^male".replaceAll("\\^\\^", "\001"); 
          System.out.println(str3);
          
          String patternhive ="^(.*)\\^\\^(.*)\\^\\^(.*)\\^\\^(.*)\\|(.*)\\|(.*)\\|(.*)\\^\\^(.*)";
   
   Pattern phive = Pattern.compile(patternhive);
   String strhive ="0^^Hadoop^^America^^5000|8000|12000|level8^^male";
   System.out.println(strhive);
   Matcher mhive = phive.matcher(strhive);
   while(mhive.find()){
   String resultstr = mhive.group(1)+"\t"+mhive.group(2)+"\t"+mhive.group(3)+"\t"+
   mhive.group(4)+"\t"+mhive.group(5)+"\t"+mhive.group(6)+"\t"+
   mhive.group(7)+"\t"+mhive.group(8);
    
   System.out.println(resultstr);
   }
   
   
   
   System.out.println();
   System.out.println("=========================");
   System.out.println();
   while(mhive.find())


   {
   System.out.println("m.group():"+mhive.group()); //打印一个大组


   System.out.println("m.group(1):"+mhive.group(1)); //打印组1


   System.out.println("m.group(2):"+mhive.group(2)); //打印组2
   System.out.println("m.group(3):"+mhive.group(3)); 
   System.out.println("m.group(4):"+mhive.group(4)); 
   System.out.println("m.group(5):"+mhive.group(5)); 
   System.out.println("m.group(6):"+mhive.group(6)); 
   
   System.out.println("m.group(7):"+mhive.group(7)); 
   System.out.println("m.group(8):"+mhive.group(8)); 
   // System.out.println("m.group(9):"+mhive.group(9)); 
   // System.out.println("m.group(10):"+mhive.group(10)); 
   //System.out.println("m.group(11):"+mhive.group(11)); 
   System.out.println();


   }
          
   System.out.println();
   System.out.println("=========================");
   System.out.println();*/
/*String pattern ="^(\\S+) (\\S+) (\\S+) \\[([\\w/]+)([\\w:/]+)\\s([+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)";

Pattern p = Pattern.compile(pattern);


String s = "110.75.141.3 - - [18/Feb/2016:00:00:55 +0800] \"POST /alipayBillPay/InvoicePay HTTP/1.1\" 200 1285";


Matcher m = p.matcher(s);
while(m.find())


 {


System.out.println("m.group():"+m.group()); //打印一个大组


 System.out.println("m.group(1):"+m.group(1)); //打印组1


System.out.println("m.group(2):"+m.group(2)); //打印组2
System.out.println("m.group(3):"+m.group(3)); 
System.out.println("m.group(4):"+m.group(4)); 
System.out.println("m.group(5):"+m.group(5)); 
System.out.println("m.group(6):"+m.group(6)); 

System.out.println("m.group(7):"+m.group(7)); 
System.out.println("m.group(8):"+m.group(8)); 
System.out.println("m.group(9):"+m.group(9)); 
System.out.println("m.group(10):"+m.group(10)); 
System.out.println("m.group(11):"+m.group(11)); 
 System.out.println();


 }
 System.out.println("捕获个数:groupCount()="+m.groupCount());
          
          */
          
          
          
          
    //  }  
 }


举报

相关推荐

0 条评论