0

我有 4 个大文件(每个大约 1.5 gb),我想处理这些文件,读取文件的每一行并将其转换为客户对象。我有以下实现。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;

import static java.nio.charset.StandardCharsets.UTF_8;

public class CustomerDataAccess {

    public static void main(String[] args) throws IOException {
        CustomerFileItem john = new CustomerFileItem("CustFile1", "http://w.customer1.com");
        CustomerFileItem sarah = new CustomerFileItem("CustFile2", "http://w.customer2.com");
        CustomerFileItem charles = new CustomerFileItem("CustFile3", "http://w.customer3.com");
        List<CustomerFileItem> customers = Arrays.asList(john, sarah, charles);

        Iterator<CustomerFileLineItem> custList = new CustIterator(customers);
    }

    public static class CustIterator implements Iterator<CustomerFileLineItem> {

        private static final int HEADER_LINES = 9; // 8 + 1 blank line
        BufferedReader bufferedReader;

        private int index = 0;
        private final List<CustomerFileItem> custFileItems = new ArrayList<>();


        public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
            this.custFileItems.addAll(custFileItems);
            processNext();
        }

        private void processNext() throws IOException {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
            if (index < custFileItems.size()) { // only update if there's another file
                CustomerFileItem custFileItem = custFileItems.get(index);
                GZIPInputStream gis = new GZIPInputStream(new URL(custFileItem.url).openStream());
                // default buffer size is 8 KB
                bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
                // read the first few lines
                for (int i = 0; i < HEADER_LINES; i++) {
                    bufferedReader.readLine();
                }
            }
            index++;
        }

        @Override
        public boolean hasNext() {
            try {
                boolean currentReaderStatus = bufferedReader.ready();
                if (currentReaderStatus) {
                    return true;
                } else if (index < custFileItems.size()) {
                    // at end of current file, try to get the next one
                    processNext();
                    return hasNext();
                } else { // no more files left
                    return false;
                }
            } catch (IOException e) {
                try {
                    bufferedReader.close();
                } catch (IOException e1) {
                    throw new UncheckedIOException(e1);
                }
                throw new UncheckedIOException(e);
            }
        }

        @Override
        public CustomerFileLineItem next() {
            try {
                String line = bufferedReader.readLine();
                if (line != null) {
                    return new CustomerFileLineItem(line);
                } else {
                    return null;
                }
            } catch (IllegalArgumentException exception) {
                return null;
            } catch (IOException e) {
                try {
                    bufferedReader.close();
                } catch (IOException e1) {
                    throw new UncheckedIOException(e1);
                }
                throw new UncheckedIOException(e);
            }
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override
        public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
            throw new UnsupportedOperationException();
        }
    }


    public static class CustomerFileLineItem {
        private static final int NUMBER_OF_FIELDS = 4;

        final String id;
        final String productNumber;
        final String usageType;
        final String operation;


        public CustomerFileLineItem(final String line) {
            String[] strings = line.split(",");
            if (strings.length != NUMBER_OF_FIELDS) {
                throw new IllegalArgumentException(String.format("Malformed customer file line: %s", line));
            }
            this.id = strings[0];
            this.productNumber = strings[1];
            this.usageType = strings[3];
            this.operation = strings[4];
        }
    }

    static class CustomerFileItem {
        private String fileName;
        private String url;

        public CustomerFileItem(String fileName, String url) {
            this.fileName = fileName;
            this.url = url;
        }
    }


}

在一个用例中,我想在输出列表(custList)中使用流。但我知道我不能将流与Iterator. 我怎样才能将其转换为Spliterator?或者我怎样才能实现与在 Spliterator 中使用 Iterator 实现相同的功能?

4

2 回答 2

4

TL;DR 您不需要实现Iteratoror Spliterator,您可以简单地Stream首先使用 a :

private static final int HEADER_LINES = 9; // 8 + 1 blank line
Stream<CustomerFileLineItem> stream = customers.stream()
    .flatMap(custFileItem -> {
        try {
            GZIPInputStream gis
                = new GZIPInputStream(new URL(custFileItem.url).openStream());
            BufferedReader br = new BufferedReader(new InputStreamReader(gis, UTF_8));
            // read the first few lines
            for (int i = 0; i < HEADER_LINES; i++) br.readLine();
            return br.lines().onClose(() -> {
              try { br.close(); }
              catch(IOException ex) { throw new UncheckedIOException(ex); }
            });
        } catch(IOException ex) {
            throw new UncheckedIOException(ex);
        }
    })
    .map(CustomerFileLineItem::new);

但为了完整起见,从字面上解决这个问题:

首先,您不应该添加类似的方法定义

@Override
public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
    throw new UnsupportedOperationException();
}

当您使用 Stream API 时,这种方法肯定会适得其反,因为这是大多数非短路操作的最终结果。

甚至没有理由添加它。当你不声明方法时,你会从Iterator接口中得到一个合理的默认方法。

解决此问题后,您可以轻松地将 转换IteratorSpliteratorusing Spliterators.pliteratorUnknownSize(Iterator, int)

但没有理由这样做。首先实现时,您的代码会变得更简单:Spliterator

public static class CustIterator
                    extends Spliterators.AbstractSpliterator<CustomerFileLineItem> {
    private static final int HEADER_LINES = 9; // 8 + 1 blank line
    BufferedReader bufferedReader;

    private final ArrayDeque<CustomerFileItem> custFileItems;

    public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
        super(Long.MAX_VALUE, ORDERED|NONNULL);
        this.custFileItems = new ArrayDeque<>(custFileItems);
        processNext();
    }

    @Override
    public boolean tryAdvance(Consumer<? super CustomerFileLineItem> action) {
        if(bufferedReader == null) return false;
        try {
            String line = bufferedReader.readLine();
            while(line == null) {
                processNext();
                if(bufferedReader == null) return false;
                line = bufferedReader.readLine();
            }
            action.accept(new CustomerFileLineItem(line));
            return true;
        }
        catch(IOException ex) {
            if(bufferedReader != null) try {
                bufferedReader.close();
                bufferedReader = null;
            }
            catch(IOException ex2) {
                ex.addSuppressed(ex2);
            }
            throw new UncheckedIOException(ex);
        }
    }

    private void processNext() throws IOException {
        if (bufferedReader != null) {
            bufferedReader.close();
            bufferedReader = null;
        }
        if (!custFileItems.isEmpty()) { // only update if there's another file
            CustomerFileItem custFileItem = custFileItems.remove();
            GZIPInputStream gis
                = new GZIPInputStream(new URL(custFileItem.url).openStream());
            // default buffer size is 8 KB
            bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
            // read the first few lines
            for (int i = 0; i < HEADER_LINES; i++) {
                bufferedReader.readLine();
            }
        }
    }
}

但是,正如开头所说,您甚至不需要在Spliterator这里实现 a 。

于 2020-05-26T15:38:58.663 回答
1

每个Iterable<T>对象都有以下方法:

因此,您想要创建一个Iterable<T>需要Iterator<T>重写唯一一个非默认和抽象方法的内容:

Iterable<CustomerFileLineItem> iterable = new Iterable<CustomerFileLineItem>() {
    @Override
    public Iterator<CustomerFileLineItem> iterator() {
        return custList;
    }
};

这可以缩短为 lambda 表达式,结果是:

Iterable<CustomerFileLineItem> iterable = () -> custList;
Spliterator<CustomerFileLineItem> spliterator = iterable.spliterator();

...所以很容易创建流:

Stream<CustomerFileLineItem> stream = StreamSupport.stream(spliterator, false);
于 2020-05-26T12:42:36.443 回答