我正在尝试使用 azureml-sdk 使用以下代码将 CSV 文件中的内容读入 Spark DataFrame,但抛出异常。
代码抛出异常
import pyspark.sql as spark
from azureml.core import Dataset
dataset = Dataset.Tabular.from_delimited_files(path = [(datastore, file_path)], header = False)
sdf: spark.DataFrame = dataset.to_spark_dataframe()
sdf.show()
例外
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/dataset_error_handling.py in _try_execute(action, operation, dataset_info, **kwargs)
100 else:
--> 101 return action()
102 except Exception as e:
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/_loggerfactory.py in wrapper(*args, **kwargs)
178 try:
--> 179 return func(*args, **kwargs)
180 except Exception as e:
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/dataflow.py in to_spark_dataframe(self)
763 self._raise_if_missing_secrets()
--> 764 return self._spark_executor.get_dataframe(steps_to_block_datas(self._steps), use_sampling=False)
765
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/sparkexecution.py in get_dataframe(self, steps, use_sampling, overrides, use_first_record_schema)
136 overrides,
--> 137 use_first_record_schema)
138
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/sparkexecution.py in _execute(self, blocks, export_format, use_sampling, overrides, use_first_record_schema)
169 + lariat_version + '.')
--> 170 raise e
171
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/dataprep/api/sparkexecution.py in _execute(self, blocks, export_format, use_sampling, overrides, use_first_record_schema)
160 if export_format == ExportScriptFormat.PYSPARKDATAFRAMELOADER:
--> 161 return module.LoadData(secrets=secrets, schemaFromFirstRecord=use_first_record_schema)
162 else:
/tmp/spark-6ce53791-c8e4-4db0-bd37-bedb53a1ef1e/userFiles-dda6cd30-5d1e-48cf-af87-9c7c2a4b8038/loaderb9bc01c2b40c4b7aa86a95d343021e0c.py in LoadData(secrets, schemaFromFirstRecord)
8 def LoadData(secrets=dict(), schemaFromFirstRecord=False):
----> 9 pex = Executor("S4ddf53ee8d5f4173bd3dcf4b51d78247", "dprep_2.11", "0.116.0", "42315", "39a925e4-9ae9-4588-93c4-5433250b7f73")
10 jex = pex.jex
/tmp/spark-6ce53791-c8e4-4db0-bd37-bedb53a1ef1e/userFiles-dda6cd30-5d1e-48cf-af87-9c7c2a4b8038/Executor.py in __init__(self, scalaName, dprepMavenPackageName, dprepMavenPackageMatchingVersion, pythonHostChannelPort, pythonHostSecret)
54 pythonHostChannelPort,
---> 55 pythonHostSecret)
56
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1568 return_value = get_return_value(
-> 1569 answer, self._gateway_client, None, self._fqn)
1570
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling None.com.microsoft.dprep.execution.PySparkExecutor.
: java.lang.NoClassDefFoundError: Could not initialize class com.microsoft.dprep.integration.azureml.AmlPySdkInvoker$
at com.microsoft.dprep.execution.PySparkExecutor.<init>(PySparkExecutor.scala:79)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
AzureMLException Traceback (most recent call last)
<ipython-input-30-c546b1aded42> in <module>
2 from azureml.core import Dataset
3 dataset = Dataset.Tabular.from_delimited_files(path = [(datastore, file_path)], header = False)
----> 4 sdf: spark.DataFrame = dataset.to_spark_dataframe()
5 sdf.show()
6
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/_loggerfactory.py in wrapper(*args, **kwargs)
124 with _LoggerFactory.track_activity(logger, func.__name__, activity_type, custom_dimensions) as al:
125 try:
--> 126 return func(*args, **kwargs)
127 except Exception as e:
128 if hasattr(al, 'activity_info') and hasattr(e, 'error_code'):
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/tabular_dataset.py in to_spark_dataframe(self)
187 return _try_execute(dataflow.to_spark_dataframe,
188 'to_spark_dataframe',
--> 189 None if self.id is None else {'id': self.id, 'name': self.name, 'version': self.version})
190
191 @track(_get_logger, custom_dimensions={'app_name': 'TabularDataset'}, activity_type=_PUBLIC_API)
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/dataset_error_handling.py in _try_execute(action, operation, dataset_info, **kwargs)
102 except Exception as e:
103 message, is_dprep_exception = _construct_message_and_check_exception_type(e, dataset_info, operation)
--> 104 _dataprep_error_handler(e, message, is_dprep_exception)
105
106
/anaconda/envs/azureml_py36/lib/python3.6/site-packages/azureml/data/dataset_error_handling.py in _dataprep_error_handler(e, message, is_dprep_exception)
143 raise AzureMLException(message, inner_exception=e)
144 else:
--> 145 raise AzureMLException(message, inner_exception=e)
146
147
AzureMLException: AzureMLException:
Message: Execution failed unexpectedly due to: Py4JJavaError
InnerException An error occurred while calling None.com.microsoft.dprep.execution.PySparkExecutor.
: java.lang.NoClassDefFoundError: Could not initialize class com.microsoft.dprep.integration.azureml.AmlPySdkInvoker$
at com.microsoft.dprep.execution.PySparkExecutor.<init>(PySparkExecutor.scala:79)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
ErrorResponse
{
"error": {
"message": "Execution failed unexpectedly due to: Py4JJavaError"
}
}
但是,我可以使用以下代码读取和打印数据,即 create as Panda
a DataFrame
。
工作代码
dataset = Dataset.Tabular.from_delimited_files(path = [(datastore, file_path)], header = False)
#sdf: spark.DataFrame = dataset.to_spark_dataframe()
sdf: pd.DataFrame = dataset.to_pandas_dataframe()
print(sdf.head(3))