1

我有一个 1GB+ 大小的文件从 MQ 进入我的目录,这需要一些时间才能完全传输文件,但即使文件不完整,也会在该目录中生成一个文件。我担心我的 directoryScan 操作员会选择一个不完整的文件。此外,我无法添加初始延迟,因为我不确定传输文件需要多长时间。

PS:我在某处读到一些文件传输协议通过向文件添加不同的扩展名来解决这个问题,直到它完成。假设我的 directoryScan 操作员正在等待任何扩展名为 .txt 的文件,因此此文件传输协议将创建一个扩展名为 .abc 的文件,直到传输完成。

我该怎么做呢?

4

1 回答 1

1

如果您打算使用正则表达式路由,下面是一个调用运算符以仅读取特定扩展名的文件的示例:

   // DirectoryScan operator with an absolute file argument and a file name pattern         
stream<rstring name> Dir2 = DirectoryScan()                                        
{                                                                                        
  param                                                                                  
    directory : "/tmp/work";                                                             
   pattern : "\\.txt$";                                                                 
}   

如果这不起作用,是否可以设置 MQ 将文件写入不同的目录,然后在完成后将其移动到目标目录?

如果您知道文件的大小,您可以做的一件事是使用该Size()函数忽略文件,直到它达到正确的大小。此代码段使用Custom运算符等待文件大小至少为 2000 字节。

graph
        stream <rstring filename, uint64 size> DirScanOutput = DirectoryScan() {
            param
                directory: "test1";
                sleepTime: 10.0; //wait 10s between scans
                pattern: ".*\\.txt";

          output DirScanOutput : size= Size();
        } 


        stream<rstring file> FileNameStream= Custom(DirScanOutput as In){
            logic
                onTuple In:{
                    if (size < 2000ul){
                        printStringLn("Required size not met yet.");
                    } else {
                        printStringLn("Size of file reached.");
                        submit({file=filename}, FileNameStream);

                    }
                }
        }

        stream <cityData> CityDataRecord = FileSource(FileNameStream) {
            param
                format: csv;
        } 

我希望这些建议之一对您有用。

于 2017-06-15T14:43:48.087 回答