我的问题是在使用 Scalaj-Http 时如何使用缓冲流。
我编写了以下代码,这是一个完整的工作示例,它将使用 HttpFS 从 Hadoop HDFS 下载文件。我的目标是处理非常大的文件,这将需要使用缓冲方法,将多个 I/O 写入本地文件。
我无法找到有关如何使用 ScalaJ-Http 接口的流的文档。我对可以处理大型多 GB 文件的下载和上传示例感兴趣。我下面的代码用于仅适用于原型设计的内存缓冲。
import scalaj.http._
import ujson.Js
import java.text.SimpleDateFormat
import java.net.SocketTimeoutException
import java.io.InputStream
import java.io.BufferedOutputStream
import java.io.FileOutputStream
import java.io.FileNotFoundException
object CopyFileFromHdfs {
def main(args: Array[String]) {
val host = "hadoop.example.com"
val user = "root"
var dstFile = ""
var srcFile = ""
val operation = "OPEN"
val port = 14000
System.setProperty("sun.net.http.allowRestrictedHeaders", "true")
if (args.length != 2)
println("Error: Missing or too many arguments")
println("Usage: CopyFileFromHdfs <srcfile> <dstfile>")
srcFile = args(0)
dstFile = args(1)
// ********************************************************************************
// Create the URL string that we will use to connect to Hadoop HttpFS
// The string will look like this:
// http://root@123.456.789.012:14000/webhdfs/v1/?user.name=root&op=OPEN
// ********************************************************************************
val url = makeHttpfsUrl(host, user, srcFile, operation, port)
// ********************************************************************************
// Using HTTP, call the HttpFS server
// Exceptions:
// java.net.SocketTimeoutException
// java.net.UnknownHostException
// java.lang.IllegalArgumentException
// Remote Exceptions:
// java.io.FileNotFoundException
// com.sun.jersey.api.NotFoundException
// ********************************************************************************
try {
var response = Http(url)
.timeout(connTimeoutMs = 1000, readTimeoutMs = 5000)
// ********************************************************************************
// Check for an error. We are expecting an HTTP 200 response
// ********************************************************************************
if (response.code < 200 || response.code > 299)
val data = ujson.read(response.body)
printf("Error: Cannot download file: %s\n", dstFile)
val is = new FileOutputStream(dstFile)
val bs = new BufferedOutputStream(is)
bs.write(response.body, 0, response.body.length)
} catch {
case e: SocketTimeoutException => {
printf("Error: Cannot connect to host %s on port %d\n", host, port)
case e: Exception => {
printf("Error (other): Cannot download file %s\n", srcFile)
printf("Success: File downloaded. %s -> %s\n", srcFile, dstFile)
// ********************************************************************************
// The Json strings are surrounded by quotes.
// This function will remove them (only at the start and the end).
// ********************************************************************************
def removeQuotes(str: String): String = {
// This expression will delete quotes at the beginning and end of a string
return str.replaceAll("^\"|\"$", "");
// ********************************************************************************
// Create the URL string that we will use to connect to Hadoop HttpFS
// The string will look like this:
// http://root@123.456.789.012:14000/webhdfs/v1/?user.name=root&op=LISTSTATUS
// ********************************************************************************
def makeHttpfsUrl(
host: String,
user: String,
hdfsPath: String,
operation: String,
port: Integer) : String = {
var url = "http://" + user + "@" + host + ":" + port.toString + "/webhdfs/v1"
if (hdfsPath(0) == '/')
url += hdfsPath
url += "/" + hdfsPath
url += "?user.name=" + user + "&op=" + operation
return url