2017-06-15 76 views
2

我在一个读取JSON文件并从JSON输入中提取元素的MapReduce(Map only task)上工作。 输入数据:JSON MapReduce中的错误

{"type":"cloud_monitor","format":"default","version":"1.0","id":"71101cb85441995d11a43bb","start":"1413585245.921","cp":"254623","message":{"proto":"http","protoVer":"1.1","status":"403","cliIP":"23.79.231.14","reqPort":"80","reqHost":"ksd.metareactor.com","reqMethod":"GET","reqPath":"%2findex.php","reqQuery":"path%3d57%26product_id%3d49%26route%3d%255Cwinnt%255Cwin.ini%2500.","respCT":"text/html","respLen":"286","bytes":"286","UA":"mozilla-saturn","fwdHost":"origin-demo2-akamaized.scoe-sil.net"},"reqHdr":{"accEnc":"gzip,%20deflate","cookie":"PHPSESSID%3dkkqoodvfe0rt9l7lbvqghk6e15%3bcurrency%3dUSD%3blanguage%3den"}} 

我已经宣布了JSON字符串数组变量:消息& reqHdr,你可以看到他们在context.write()方法

地图类:

public class JsonMapper extends Mapper<LongWritable, Text, Text, Text> { 

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
    String type; 
    String format; 
    String version; 
    String id; 
    String start; 
    String cp; 
    // variables for message and reqHdr 
    String[] line = value.toString().split("\\n"); 
    if (line.length > 0) { 
     for(int i=0; i<line.length; i++) { 
      try { 
       JSONObject jsonobj  = new JSONObject(line[i]); 
        type = (String) jsonobj.get("type"); 
        format = (String) jsonobj.get("format"); 
        version = (String) jsonobj.get("version"); 
        id  = (String) jsonobj.get("id"); 
        start = (String) jsonobj.get("start"); 
        cp  = (String) jsonobj.get("cp"); 

       // Message Variable array 
       JSONArray messageArray = (JSONArray) jsonobj.get("message"); 
       for(int j=0; j<messageArray.length(); j++) { 
        JSONObject jsonmessageobject = messageArray.getJSONObject(j); 
        proto  = jsonmessageobject.getString("proto"); 
        protoVer = jsonmessageobject.getString("protoVer"); 
        cliIP  = jsonmessageobject.getString("cliIP"); 
        reqPort  = jsonmessageobject.getString("reqPort"); 
        reqHost  = jsonmessageobject.getString("reqHost"); 
        reqMethod = jsonmessageobject.getString("reqMethod"); 
        reqPath  = jsonmessageobject.getString("reqPath"); 
        reqQuery = jsonmessageobject.getString("reqQuery"); 
        reqCT  = jsonmessageobject.getString("reqCT"); 
        reqLen  = jsonmessageobject.getString("reqLen"); 
        sslVer  = jsonmessageobject.getString("sslVer"); 
        status  = jsonmessageobject.getString("status"); 
        redirURL = jsonmessageobject.getString("redirURL"); 
        respCT  = jsonmessageobject.getString("respCT"); 
        respLen  = jsonmessageobject.getString("respLen"); 
        bytes  = jsonmessageobject.getString("bytes"); 
        UA   = jsonmessageobject.getString("UA"); 
        fwdHost  = jsonmessageobject.getString("fwdHost"); 
       } 

       // reqHdr variable array 
       JSONArray reqHdrArray = (JSONArray) jsonobj.get("reqHdr"); 
       for(int k=0; k<reqHdrArray.length(); k++) { 
        JSONObject jsonreqHdrobject = reqHdrArray.getJSONObject(i); 
        accEnc  = jsonreqHdrobject.getString("accEnc"); 
        accLang  = jsonreqHdrobject.getString("accLang"); 
        auth  = jsonreqHdrobject.getString("auth"); 
        reqHdr_cacheCtl = jsonreqHdrobject.getString("cacheCtl"); 
        reqHdr_conn = jsonreqHdrobject.getString("conn"); 
        reqHdr_contMD5 = jsonreqHdrobject.getString("contMD5"); 
        cookie  = jsonreqHdrobject.getString("cookie"); 
        DNT   = jsonreqHdrobject.getString("DNT"); 
        expect  = jsonreqHdrobject.getString("expect"); 
        ifMatch  = jsonreqHdrobject.getString("ifMatch"); 
        ifMod  = jsonreqHdrobject.getString("ifMod"); 
        ifNone  = jsonreqHdrobject.getString("ifNone"); 
        ifRange  = jsonreqHdrobject.getString("ifRange"); 
        ifUnmod  = jsonreqHdrobject.getString("ifUnmod"); 
        range  = jsonreqHdrobject.getString("range"); 
        referer  = jsonreqHdrobject.getString("referer"); 
        te   = jsonreqHdrobject.getString("te"); 
        upgrade  = jsonreqHdrobject.getString("upgrade"); 
        reqHdr_via = jsonreqHdrobject.getString("via"); 
        xFrwdFor = jsonreqHdrobject.getString("xFrwdFor"); 
        xReqWith = jsonreqHdrobject.getString("xReqWith"); 
       } 
      context.write(new Text("cloud_monitor"), new Text(type + format + version + id + start + cp + proto + protoVer + cliIP + reqPort + 
        reqHost + reqMethod + reqPath + reqQuery + reqCT + reqLen + sslVer + status + redirURL + respCT + respLen + bytes + UA + fwdHost + accEnc + accLang + auth + 
        reqHdr_cacheCtl + reqHdr_conn + reqHdr_contMD5 + cookie + DNT + expect + ifMatch + ifMod + ifNone + ifRange + ifUnmod + range + referer + te + 
        upgrade + reqHdr_via + xFrwdFor + xReqWith)); 
      } catch (JSONException e) { 
       e.printStackTrace(); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 
} 

驱动程序类别:

public class JsonDriver { 
public static void main(String[] args) throws IOException { 
    Configuration configuration = new Configuration(); 
    Job job = Job.getInstance(configuration); 
    job.setJobName("Json Parser"); 
    job.setJarByClass(com.json.driver.JsonDriver.class); 
    job.setMapperClass(JsonMapper.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 
    FileInputFormat.setInputPaths(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
} 
} 

当我尝试使用命令

[[email protected] ~]$ hadoop jar JsonMapper.jar myjson jsonoutput 

我看不出有任何错误或任何类型的输出或任何信息的提交jar文件。 我看到的是下一个命令行

[[email protected] ~]$ 

我在HDFS输入。 如果我得到任何错误,我可以尝试修复它,但在提交jar文件后,我只看到下一个命令行。 任何人都可以告诉我我在这里做的错误是什么?

回答

1

你实际上不会在你的驱动程序中提交/开始工作。您需要在您的驱动器的末尾添加这样的事情:

job.waitForCompletion(true); 

目前,它只是运行主要在驱动程序,但不能提交作业。

+0

我改正了它。 – Sidhartha