这是 Databricks spark 教程中使用 pyspark 程序进行的简单日志分析:
import sys
import os
from test_helper import Test
baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab2', 'apache.access.log.PROJECT')
logFile = os.path.join(baseDir, inputPath)
def parseLogs():
""" Read and parse log file """
parsed_logs = (sc.textFile(logFile).map(parseApacheLogLine))
access_logs = (parsed_logs.filter(lambda s: s[1] == 1).map(lambda s: s[0]))
failed_logs = (parsed_logs.filter(lambda s: s[1] == 0).map(lambda s: s[0]))
failed_logs_count = failed_logs.count()
if failed_logs_count > 0:
print 'Number of invalid logline: %d' % failed_logs.count()
for line in failed_logs.take(20):
print 'Invalid logline: %s' % line
print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())
return parsed_logs, access_logs, failed_logs
parsed_logs, access_logs, failed_logs = parseLogs()
我收到以下错误:
Py4JJavaError Traceback (most recent call last)
<ipython-input-4-d91e2c3d41f9> in <module>()
24
25
---> 26 parsed_logs, access_logs, failed_logs = parseLogs()
<ipython-input-4-d91e2c3d41f9> in parseLogs()
14
15 failed_logs = (parsed_logs.filter(lambda s: s[1] == 0).map(lambda s: s[0]))
---> 16 failed_logs_count = failed_logs.count()
17 if failed_logs_count > 0:
18 print 'Number of invalid logline: %d' % failed_logs.count()
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in count(self)
930 3
931 """
--> 932 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
933
934 def stats(self):
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in sum(self)
921 6.0
922 """
--> 923 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
924
925 def count(self):
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in reduce(self, f)
737 yield reduce(f, iterator, initial)
738
--> 739 vals = self.mapPartitions(func).collect()
740 if vals:
741 return reduce(f, vals)
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in collect(self)
711 """
712 with SCCallSiteSync(self.context) as css:
--> 713 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
714 return list(_load_from_socket(port, self._jrdd_deserializer))
715
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/vagrant/data/data/cs100/lab2/apache.access.log.PROJECT
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:374)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
你能建议我这里似乎有什么问题吗?我是这个领域的新手。如果你也写一个解释会很有帮助。