24

Java 8 引入了一个类似于 Scala 的Stream的Stream类,这是一个强大的惰性结构,使用它可以非常简洁地执行以下操作:

def from(n: Int): Stream[Int] = n #:: from(n+1)

def sieve(s: Stream[Int]): Stream[Int] = {
  s.head #:: sieve(s.tail filter (_ % s.head != 0))
}

val primes = sieve(from(2))

primes takeWhile(_ < 1000) print  // prints all primes less than 1000

我想知道是否有可能在 Java 8 中做到这一点,所以我写了这样的东西:

IntStream from(int n) {
    return IntStream.iterate(n, m -> m + 1);
}

IntStream sieve(IntStream s) {
    int head = s.findFirst().getAsInt();
    return IntStream.concat(IntStream.of(head), sieve(s.skip(1).filter(n -> n % head != 0)));
}

IntStream primes = sieve(from(2));

相当简单,但它会产生,java.lang.IllegalStateException: stream has already been operated upon or closed因为两者findFirst()都是skip()终端操作,Stream只能执行一次。

我真的不需要用完流两次,因为我只需要流中的第一个数字,其余的作为另一个流,即相当于 Scala 的Stream.headStream.tail. Java 8Stream中是否有可以用来实现此目的的方法?

谢谢。

4

10 回答 10

11

即使您没有无法拆分的问题IntStream,您的代码也无法正常工作,因为您是sieve递归地而不是懒惰地调用您的方法。因此,在您可以查询结果流以获取第一个值之前,您有一个无限递归。

将 a拆分IntStream s为头部和尾部IntStream(尚未消耗)是可能的:

PrimitiveIterator.OfInt it = s.iterator();
int head = it.nextInt();
IntStream tail = IntStream.generate(it::next).filter(i -> i % head != 0);

在这个地方,你需要一个懒惰地调用sieve尾部的构造。Stream不提供;concat期望现有的流实例作为参数,并且您不能sieve使用 lambda 表达式构造延迟调用的流,因为延迟创建仅适用于 lambda 表达式不支持的可变状态。如果您没有隐藏可变状态的库实现,则必须使用可变对象。但是一旦你接受了可变状态的要求,解决方案可能比你的第一种方法更容易:

IntStream primes = from(2).filter(i -> p.test(i)).peek(i -> p = p.and(v -> v % i != 0));

IntPredicate p = x -> true;

IntStream from(int n)
{
  return IntStream.iterate(n, m -> m + 1);
}

这将递归地创建一个过滤器,但最终创建一个IntPredicates 的树还是一个 s 的树都没有关系IntStream(就像你的IntStream.concat方法一样,如果它确实有效)。如果您不喜欢过滤器的可变实例字段,您可以将其隐藏在内部类中(但不能隐藏在 lambda 表达式中……)。

于 2013-11-15T17:40:19.863 回答
4

我的StreamEx库现在headTail()可以解决问题了:

public static StreamEx<Integer> sieve(StreamEx<Integer> input) {
    return input.headTail((head, tail) -> 
        sieve(tail.filter(n -> n % head != 0)).prepend(head));
}

headTail方法采用BiFunction在流终端操作执行期间最多执行一次的方法。所以这个实现是惰性的:它在遍历开始之前不计算任何东西,并且只计算请求的素数。BiFunction接收第一个流元素和head其余元素的流,tail并且可以tail以任何它想要的方式修改。您可以将其与预定义的输入一起使用:

sieve(IntStreamEx.range(2, 1000).boxed()).forEach(System.out::println);

但是无限流也可以

sieve(StreamEx.iterate(2, x -> x+1)).takeWhile(x -> x < 1000)
     .forEach(System.out::println);
// Not the primes till 1000, but 1000 first primes
sieve(StreamEx.iterate(2, x -> x+1)).limit(1000).forEach(System.out::println);

还有使用headTail和谓词连接的替代解决方案:

public static StreamEx<Integer> sieve(StreamEx<Integer> input, IntPredicate isPrime) {
    return input.headTail((head, tail) -> isPrime.test(head) 
            ? sieve(tail, isPrime.and(n -> n % head != 0)).prepend(head)
            : sieve(tail, isPrime));
}

sieve(StreamEx.iterate(2, x -> x+1), i -> true).limit(1000).forEach(System.out::println);

比较递归解决方案很有趣:它们能够生成多少个素数。

@John McClean 解决方案(StreamUtils

John McClean 的解决方案并不懒惰:你不能用无限的流来喂它们。所以我刚刚通过反复试验找到了最大允许的上限 ( 17793) (发生 StackOverflowError 之后):

public void sieveTest(){
    sieve(IntStream.range(2, 17793).boxed()).forEach(System.out::println);
}

@John McClean 解决方案(Streamable

public void sieveTest2(){
    sieve(Streamable.range(2, 39990)).forEach(System.out::println);
}

增加上限39990会导致 StackOverflowError。

@frhack 解决方案(LazySeq

LazySeq<Integer> ints = integers(2);
LazySeq primes = sieve(ints); // sieve method from @frhack answer
primes.forEach(p -> System.out.println(p));

结果:卡在素数之后 =53327巨大的堆分配和垃圾收集占用超过 90%。从 53323 前进到 53327 需要几分钟,因此等待更多似乎不切实际。

@vidi 解决方案

Prime.stream().forEach(System.out::println);

结果:素数 = 之后的 StackOverflowError 134417

我的解决方案(StreamEx)

sieve(StreamEx.iterate(2, x -> x+1)).forEach(System.out::println);

结果:素数 = 之后的 StackOverflowError 236167

@frhack 解决方案(rxjava

Observable<Integer> primes = Observable.from(()->primesStream.iterator());
primes.forEach((x) -> System.out.println(x.toString()));            

结果:素数 = 之后的 StackOverflowError 367663

@Holger 解决方案

IntStream primes=from(2).filter(i->p.test(i)).peek(i->p=p.and(v->v%i!=0));
primes.forEach(System.out::println);

结果:素数 = 之后的 StackOverflowError 368089

我的解决方案(带有谓词连接的 StreamEx)

sieve(StreamEx.iterate(2, x -> x+1), i -> true).forEach(System.out::println);

结果:素数 = 之后的 StackOverflowError 368287


所以三个涉及谓词连接的解决方案获胜,因为每个新条件只增加了 2 个堆栈帧。我认为,它们之间的差异是微不足道的,不应被视为定义赢家。但是我更喜欢我的第一个 StreamEx 解决方案,因为它更类似于 Scala 代码。

于 2016-01-19T10:33:04.423 回答
3

下面的解决方案不做状态突变,除了流的头/尾解构。

使用 IntStream.iterate 获得惰性。Prime 类用于保持生成器状态

    import java.util.PrimitiveIterator;
    import java.util.stream.IntStream;
    import java.util.stream.Stream;

    public class Prime {
        private final IntStream candidates;
        private final int current;

        private Prime(int current, IntStream candidates)
        {
            this.current = current;
            this.candidates = candidates;
        }

        private Prime next()
        {
            PrimitiveIterator.OfInt it = candidates.filter(n -> n % current != 0).iterator();

            int head = it.next();
            IntStream tail = IntStream.generate(it::next);

            return new Prime(head, tail);
        }

        public static Stream<Integer> stream() {
            IntStream possiblePrimes = IntStream.iterate(3, i -> i + 1);

            return Stream.iterate(new Prime(2, possiblePrimes), Prime::next)
                         .map(p -> p.current);
        }
    }

用法是这样的:

Stream<Integer> first10Primes = Prime.stream().limit(10)
于 2015-09-17T15:20:19.840 回答
2

您基本上可以像这样实现它:

static <T> Tuple2<Optional<T>, Seq<T>> splitAtHead(Stream<T> stream) {
    Iterator<T> it = stream.iterator();
    return tuple(it.hasNext() ? Optional.of(it.next()) : Optional.empty(), seq(it));
}

在上面的例子中,Tuple2Seq是从jOOλ借来的类型,我们为jOOQ集成测试开发的一个库。如果您不想要任何额外的依赖项,您不妨自己实现它们:

class Tuple2<T1, T2> {
    final T1 v1;
    final T2 v2;

    Tuple2(T1 v1, T2 v2) {
        this.v1 = v1;
        this.v2 = v2;
    }

    static <T1, T2> Tuple2<T1, T2> tuple(T1 v1, T2 v2) {
        return new Tuple<>(v1, v2);
    }
}

static <T> Tuple2<Optional<T>, Stream<T>> splitAtHead(Stream<T> stream) {
    Iterator<T> it = stream.iterator();
    return tuple(
        it.hasNext() ? Optional.of(it.next()) : Optional.empty,
        StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            it, Spliterator.ORDERED
        ), false)
    );
}
于 2014-09-22T14:28:46.940 回答
1

这里提供了许多有趣的建议,但如果有人需要一个不依赖第三方库的解决方案,我想出了这个:

    import java.util.AbstractMap;
    import java.util.Optional;
    import java.util.Spliterators;
    import java.util.stream.StreamSupport;

    /**
     * Splits a stream in the head element and a tail stream.
     * Parallel streams are not supported.
     * 
     * @param stream Stream to split.
     * @param <T> Type of the input stream.
     * @return A map entry where {@link Map.Entry#getKey()} contains an
     *    optional with the first element (head) of the original stream
     *    and {@link Map.Entry#getValue()} the tail of the original stream.
     * @throws IllegalArgumentException for parallel streams.
     */
    public static <T> Map.Entry<Optional<T>, Stream<T>> headAndTail(final Stream<T> stream) {
        if (stream.isParallel()) {
            throw new IllegalArgumentException("parallel streams are not supported");
        }
        final Iterator<T> iterator = stream.iterator();
        return new AbstractMap.SimpleImmutableEntry<>(
                iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty(),
                StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false)
        );
    }
于 2018-11-11T12:22:54.943 回答
1

如果您不介意使用 3rd 方库cyclops-streams,我编写的库有许多潜在的解决方案。

StreamUtilsjava.util.stream.Streams类有大量静态方法可以直接使用include headAndTail

HeadAndTail<Integer> headAndTail = StreamUtils.headAndTail(Stream.of(1,2,3,4));
int head = headAndTail.head(); //1
Stream<Integer> tail = headAndTail.tail(); //Stream[2,3,4]

Streamable类代表一个可重放的类Stream,它通过构建一个惰性的、缓存的中间数据结构来工作。因为它是缓存和可回收的——head 和 tail 可以直接分开实现。

Streamable<Integer> replayable=  Streamable.fromStream(Stream.of(1,2,3,4));
int head = repayable.head(); //1
Stream<Integer> tail = replayable.tail(); //Stream[2,3,4]

cyclops-streams还提供了一个顺序Stream扩展,进而扩展了 jOOλ,并且具有Tuple用于头尾提取的基于(来自 jOOλ)和域对象(HeadAndTail)的解决方案。

SequenceM.of(1,2,3,4)
         .splitAtHead(); //Tuple[1,SequenceM[2,3,4]

SequenceM.of(1,2,3,4)
         .headAndTail();

根据 Tagir 的请求更新 -> Scala 筛子的 Java 版本,使用SequenceM

public void sieveTest(){
    sieve(SequenceM.range(2, 1_000)).forEach(System.out::println);
}

SequenceM<Integer> sieve(SequenceM<Integer> s){

    return s.headAndTailOptional().map(ht ->SequenceM.of(ht.head())
                            .appendStream(sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(SequenceM.of());
}

另一个版本通过Streamable

public void sieveTest2(){
    sieve(Streamable.range(2, 1_000)).forEach(System.out::println);
}

Streamable<Integer> sieve(Streamable<Integer> s){

    return s.size()==0? Streamable.of() : Streamable.of(s.head())
                                                    .appendStreamable(sieve(s.tail()
                                                                    .filter(n -> n % s.head() != 0)));
}

注意 - 两者StreamableSequenceM没有 Empty 实现 - 因此检查Streamable和使用headAndTailOptional.

最后是一个使用普通的版本java.util.stream.Stream

import static com.aol.cyclops.streams.StreamUtils.headAndTailOptional;

public void sieveTest(){
    sieve(IntStream.range(2, 1_000).boxed()).forEach(System.out::println);
}

Stream<Integer> sieve(Stream<Integer> s){

    return headAndTailOptional(s).map(ht ->Stream.concat(Stream.of(ht.head())
                            ,sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(Stream.of());
}

另一个更新 - 基于 @Holger 版本的惰性迭代,使用对象而不是原始版本(注意原始版本也是可能的)

  final Mutable<Predicate<Integer>> predicate = Mutable.of(x->true);
  SequenceM.iterate(2, n->n+1)
           .filter(i->predicate.get().test(i))
           .peek(i->predicate.mutate(p-> p.and(v -> v%i!=0)))
           .limit(100000)
           .forEach(System.out::println);
于 2016-01-11T06:00:46.253 回答
0

要获得 head 和 tail,您需要一个 Lazy Stream 实现。Java 8 流或 RxJava 不适合。

例如,您可以使用LazySeq,如下所示。

惰性序列总是使用非常便宜的 first/rest 分解(head() 和 tail())从头开始遍历

LazySeq 实现了 java.util.List 接口,因此可以在各种地方使用。此外,它还实现了对集合的 Java 8 增强,即流和收集器


package com.company;

import com.nurkiewicz.lazyseq.LazySeq;

public class Main {

    public static void main(String[] args) {

        LazySeq<Integer> ints = integers(2);
        LazySeq primes = sieve(ints);
        primes.take(10).forEach(p -> System.out.println(p));

    }

    private static LazySeq<Integer> sieve(LazySeq<Integer> s) {
        return LazySeq.cons(s.head(), () -> sieve(s.filter(x -> x % s.head() != 0)));
    }

    private static LazySeq<Integer> integers(int from) {
        return LazySeq.cons(from, () -> integers(from + 1));
    }

}
于 2015-06-06T10:23:27.963 回答
0

这是另一个使用 Holger 建议的方法的食谱。它使用 RxJava 只是为了增加使用 take(int) 方法和许多其他方法的可能性。

package com.company;

import rx.Observable;

import java.util.function.IntPredicate;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) {

        final IntPredicate[] p={(x)->true};
        IntStream primesStream=IntStream.iterate(2,n->n+1).filter(i -> p[0].test(i)).peek(i->p[0]=p[0].and(v->v%i!=0)   );

        Observable primes = Observable.from(()->primesStream.iterator());

        primes.take(10).forEach((x) -> System.out.println(x.toString()));


    }

}
于 2015-06-06T11:11:55.553 回答
0

这也适用于并行流:

public static <T> Map.Entry<Optional<T>, Stream<T>> headAndTail(final Stream<T> stream) {
    final AtomicReference<Optional<T>> head = new AtomicReference<>(Optional.empty());
    final var spliterator = stream.spliterator();
    spliterator.tryAdvance(x -> head.set(Optional.of(x)));
    return Map.entry(head.get(), StreamSupport.stream(spliterator, stream.isParallel()));
}
于 2021-07-02T09:18:00.530 回答
-2

如果您想获得流的头部,只需:

IntStream.range(1, 5).first();

如果您想获取流的尾部,只需:

IntStream.range(1, 5).skip(1);

如果您想同时获取流的头部和尾部,只需:

IntStream s = IntStream.range(1, 5);
int head = s.head();
IntStream tail = s.tail();

如果你想找到素数,只需:

LongStream.range(2, n)
   .filter(i -> LongStream.range(2, (long) Math.sqrt(i) + 1).noneMatch(j -> i % j == 0))
   .forEach(N::println);

如果您想了解更多信息,请前往获取AbacusUtil

声明:我是AbacusUtil的开发者。

于 2016-12-01T19:43:07.817 回答