2015-10-26 80 views
-1

这是pyspark计划一个简单的日志分析可在Databricks火花教程:Pyspark iphython笔记本错误

import sys 
import os 
from test_helper import Test 

baseDir = os.path.join('data') 
inputPath = os.path.join('cs100', 'lab2', 'apache.access.log.PROJECT') 
logFile = os.path.join(baseDir, inputPath) 

def parseLogs(): 
    """ Read and parse log file """ 
    parsed_logs = (sc.textFile(logFile).map(parseApacheLogLine)) 

    access_logs = (parsed_logs.filter(lambda s: s[1] == 1).map(lambda s: s[0])) 

    failed_logs = (parsed_logs.filter(lambda s: s[1] == 0).map(lambda s: s[0])) 
    failed_logs_count = failed_logs.count() 
    if failed_logs_count > 0: 
     print 'Number of invalid logline: %d' % failed_logs.count() 
     for line in failed_logs.take(20): 
      print 'Invalid logline: %s' % line 

    print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count()) 
    return parsed_logs, access_logs, failed_logs 


parsed_logs, access_logs, failed_logs = parseLogs() 

我得到folllowing错误:


Py4JJavaError        Traceback (most recent call last) 
<ipython-input-4-d91e2c3d41f9> in <module>() 
    24 
    25 
---> 26 parsed_logs, access_logs, failed_logs = parseLogs() 

<ipython-input-4-d91e2c3d41f9> in parseLogs() 
    14 
    15  failed_logs = (parsed_logs.filter(lambda s: s[1] == 0).map(lambda s: s[0])) 
---> 16  failed_logs_count = failed_logs.count() 
    17  if failed_logs_count > 0: 
    18   print 'Number of invalid logline: %d' % failed_logs.count() 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in count(self) 
    930   3 
    931   """ 
--> 932   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    933 
    934  def stats(self): 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in sum(self) 
    921   6.0 
    922   """ 
--> 923   return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 
    924 
    925  def count(self): 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in reduce(self, f) 
    737    yield reduce(f, iterator, initial) 
    738 
--> 739   vals = self.mapPartitions(func).collect() 
    740   if vals: 
    741    return reduce(f, vals) 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in collect(self) 
    711   """ 
    712   with SCCallSiteSync(self.context) as css: 
--> 713    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    714   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    715 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/vagrant/data/data/cs100/lab2/apache.access.log.PROJECT 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:374) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

你能否建议我看起来像是什么?问题在这里?我对这个领域很陌生。如果你也写一个解释,这将会很有帮助。

回答

1

在我看来,问题在于你的输入文件的路径。

Input path does not exist: file:/home/vagrant/data/data/cs100/lab2/apache.access.log.PROJECT 

确保这是正确的路径,如果没有,请相应地在脚本中进行更改。您也可以访问路径是这样的:

: 
inputPath = '/path/to/your/data/directory/' 
logFile = inputPath + 'apache.access.log.PROJECT' 
: 
parsed_logs = (sc.textFile(logFile).map(parseApacheLogLine)) 
: 

您可以随时验证您的路径,通过打印,然后,通过手动或者终端/ CMD或文件资源管理器验证的位置。

+0

即使我怀疑相同。我没有这个项目的文件。我按照给出的步骤。 –

+0

事实证明,我把文件放在错误的目录中,导致它查找同一目录中的数据。 在阅读您的评论后,我意识到问题与路径有关并做了必要的更改。它的工作现在。非常感谢。 –