0

我有一个系统,当找到某种类型的文件时,我会在单独的线程中下载、编码和上传它们。

while(true) {
    for(SftpClient c : clients) {
        try {
            filenames = c.list("*.wav", "_rdy_");
        } catch (SftpException e) {
            e.printStackTrace();
        }
        if(filenames.size() > 0) {
            //AudioThread run() method handles the download, encode, and upload
            AudioThread at = new AudioThread(filenames);
            at.setNode(c.getNode());
            Thread t = new Thread(at);
            t.start();
        }
    }
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

AudioThread 的 run 方法

public void run() {
    System.out.println("Running...");
    this.buildAsteriskMapping();
    this.connectToSFTP();
    ac = new AudioConvert();
    this.connectToS3();

    String downloadDir = "_rough/" + getNode() + "/" + Time.getYYYYMMDDDate() + "/";
    String encodeDir = "_completed" + getNode() + "/" + Time.getYYYYMMDDDate() + "/";
    String uploadDir = getNode() + "/" + Time.getYYYYMMDDDate() + "/";

    System.out.println("Downloading...");
    try {
        sftp.get(filenames, downloadDir);
    } catch (SftpException e) {
        //download failed
        System.out.println("DL Failed...");
        e.printStackTrace();
    }

    System.out.println("Encoding...");
    try {
        ac.encodeWavToMP3(filenames, downloadDir, encodeDir);
    } catch (IllegalArgumentException | EncoderException e) {
        System.out.println("En Failed...");
        e.printStackTrace();
    }

    System.out.println("Uploading...");
    try {
        s3.upload(filenames, encodeDir, uploadDir);
    } catch (AmazonClientException e) {
        System.out.println("Up Failed...");
        e.printStackTrace();
    }

}

下载方式:

public void get(ArrayList<String> src, String dest) throws SftpException {
    for(String file : src) {
        System.out.println(dest + file);
        channel.get(file, dest + file);
    }
}

编码方法:

public void encodeWavToMP3(ArrayList<String> filenames, String downloadDir, String encodeDir) throws IllegalArgumentException, EncoderException {
    for(String f : filenames) {
        File wav = new File(downloadDir + f);
        File mp3 = new File(encodeDir + wav.getName().replace(".wav", ".mp3"));
        encoder.encode(wav, mp3, attrs);
    }
}

上传方式:

public void upload(ArrayList<String> filenames, String encodeDir, String uploadDir)  throws AmazonClientException, AmazonServiceException {
    for(String f : filenames) {
        s3.putObject(new PutObjectRequest(bucketName, uploadDir, new File(encodeDir + f)));
    }
}

问题是我不断为每个线程下载相同的文件(或大约相同的文件)。我想为每个保存正在下载的文件的客户端添加一个变量,但我不知道如何从这个变量中删除列表/文件名。什么是解决方案?我的老板也希望只允许运行 x 个线程。

4

2 回答 2

4

很难看出问题所在,因为缺少实际下载的代码:P

但是,我会改用某种ExecutorService

基本上,我会将每个下载请求添加到服务中(包装在“DownloadTask”中,并引用要下载的文件以及获取文件可能需要的任何其他相关信息),然后让服务来处理其余的事情。

可以对下载任务进行编码,以根据您认为合适的方式考虑现有文件。

根据您的要求,这可能是单线程或多线程服务。它还可以让您在其中放置上传任务。

查看Executors trail 了解更多信息

总体思路是使用一种生产者/消费者模式。您将(至少)有一个线程来查找所有要下载的文件,并且对于每个文件,您会将其添加到执行程序服务中。下载文件后,我会将请求排队并将请求上传到同一服务中。

这样,您就避免了同步和线程管理的所有混乱:D

您可以对扫描任务使用相同的想法,对于每个客户端,您可以将任务分配给单独的服务

于 2012-10-12T21:09:29.033 回答
1

您的代码中存在问题,您在 while 循环中实例化 AudioThread。

请注意,在创建线程并执行 t.start() 后,所有下载、编码和上传都是异步进行的。因此,在您启动线程后,循环继续对 c.list(...) 进行另一次调用,而您创建的第一个线程仍在处理第一组文件。很可能在随后的 c.list() 调用中返回相同的文件集,因为您在调用中指定了文件模式,并且没有标记当前正在处理哪些文件的代码。

我的建议:

  • 使用前一篇文章中提到的 Executors.newFixedThreadPool(int nThreads)。并将线程数指定为机器中的处理器数。在您的 while 循环之前执行此操作。
  • 对于从 ftp s.list() 检索到的每个文件名,创建一个 Callable 类并调用 ExecutorService.invokeAll(Collection<Callable<T>> tasks)。您将创建的 Callable 中的代码是您的 AudioThread 代码。修改 AudioThread 代码以一次只处理一个文件(如果可能),这样您就可以为每个文件并行进行下载、上传和编码。
  • 添加标记哪些文件已被处理的代码。我建议添加一个代码,将您已处理的文件重命名为不同的名称,以避免在下一次 c.list() 调用中返回。
  • 在您的 while 循环块之后调用 ExecutorService.shutdown(...)
于 2012-10-13T04:55:09.357 回答