hdfs 导入到hbase

qq_┿→王银龙 MapReduce on HB... 最后由 Wil1 于2016年03月05日回复

  • 2 回答
  • 1.1k 浏览

map input 2条数据   output0条     数据导入了 没有报错  但是hbase中用scan显示0条

  • 极客学院_吕布 2015年12月16日 回答 #1楼
  • 导入hbase一般有三种方式:使用HBase的API中的Put方法,使用HBase 的bulk load工具和使用定制的MapReduce Job方式。


    这个问题要看你的代码问题现在哪里


    你可以参考一下代码:


    package Hbase;


    import java.text.SimpleDateFormat;
    import java.util.Date;


    import org.apache.Hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;


    public class BatchImport {


    public static void main(String[] args) throws Exception {
    final Configuration configuration = new Configuration();
    //设置zookeeper
    configuration.set("hbase.zookeeper.quorum", "hadoop1");
    //设置hbase表名称
    configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
    //将该值改大,防止hbase超时退出
    configuration.set("dfs.socket.timeout", "180000");

    final Job job = new Job(configuration, "HBaseBatchImport");

    job.setMapperClass(BatchImportMapper.class);
    job.setReducerClass(BatchImportReducer.class);
    //设置map的输出,不设置reduce的输出类型
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);
    //不再设置输出路径,而是设置输出格式类型
    job.setOutputFormatClass(TableOutputFormat.class);

    FileInputFormat.setInputPaths(job, "hdfs://hadoop1:9000/HTTP*");

    job.waitForCompletion(true);
    }

     

     

    static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
    Text v2 = new Text();

    protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
    final String[] splited = value.toString().split("t");
    try {
    final Date date = new Date(Long.parseLong(splited[0].trim()));
    final String dateFormat = dateformat1.format(date);
    String rowKey = splited[1]+":"+dateFormat;
    v2.set(rowKey+"t"+value.toString());
    context.write(key, v2);
    } catch (NumberFormatException e) {
    final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
    counter.increment(1L);
    System.out.println("出错了"+splited[0]+" "+e.getMessage());
    }
    };
    }

     

    static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
    protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException {
    for (Text text : values) {
    final String[] splited = text.toString().split("t");

    final Put put = new Put(Bytes.toBytes(splited[0]));
    put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
    //省略其他字段,调用put.add(....)即可
    context.write(NullWritable.get(), put);
    }
    };
    }


    }

  • 0 评论
  • Wil1 2016年03月05日 回答 #2楼
  • 输入两条数据,输出一条数据 ,代表中途数据处理的过程过滤掉了数据,检测程序中的判断语句,对比课下的源代码,从而找到错误并解决。

  • 0 评论