Ogranicz strumień o predykat


187

Czy istnieje operacja strumienia Java 8, która ogranicza (potencjalnie nieskończony), Streamdopóki pierwszy element nie dopasuje predykatu?

W Javie 9 możemy użyć takeWhilejak w poniższym przykładzie, aby wydrukować wszystkie liczby mniejsze niż 10.

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

Ponieważ w Javie 8 nie ma takiej operacji, jaki jest najlepszy sposób na jej ogólne wdrożenie?


1
Prawdopodobnie przydatne informacje na: stackoverflow.com/q/19803058/248082
nobeh


Zastanawiam się, w jaki sposób architekci mogliby kiedykolwiek przejść przez „do czego właściwie możemy to wykorzystać ” bez wpadania na tę skrzynkę użytkową. Począwszy od Java 8, strumienie są w rzeczywistości pomocne tylko dla istniejących struktur danych: - /
Thorbjørn Ravn Andersen


W Javie 9 łatwiej byłoby pisaćIntStream.iterate(1, n->n<10, n->n+1).forEach(System.out::print);
Marc Dzaebel

Odpowiedzi:


81

Taka operacja powinna być możliwa w Javie 8 Stream, ale niekoniecznie musi być wykonana wydajnie - na przykład niekoniecznie trzeba równolegle wykonywać taką operację, ponieważ trzeba patrzeć na elementy w kolejności.

Interfejs API nie zapewnia łatwego sposobu na zrobienie tego, ale prawdopodobnie najłatwiej jest to zrobić Stream.iterator(), owiń je, Iteratoraby mieć implementację „take-while”, a następnie wróć do a, Spliteratora następnie a Stream. Lub - może - owinąć Spliterator, chociaż tak naprawdę nie można go już podzielić w tej implementacji.

Oto niesprawdzone wdrożenie takeWhilena zasadzie Spliterator:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}

8
Teoretycznie paralelizowanie takeWhile z predykatem bezstanowym jest łatwe. Oceń warunek w równoległych partiach (zakładając, że predykat nie rzuca się lub nie wywołuje efektu ubocznego, jeśli zostanie wykonany kilka razy więcej). Problem polega na robieniu tego w kontekście rekurencyjnej dekompozycji (struktura rozwidlenia / łączenia) używanej przez strumienie. Naprawdę, to strumienie są okropnie nieefektywne.
Aleksandr Dubinsky,

91
Strumienie byłyby o wiele lepsze, gdyby nie były tak zajęte automagiczną równoległością. Równoległość jest potrzebna tylko w niewielkiej części miejsc, w których można używać strumieni. Poza tym, gdyby Oracle tak bardzo troszczyło się o perfoance, mogliby dokonać autowektoryzacji JVM JIT i uzyskać znacznie większy wzrost wydajności, nie zawracając sobie głowy programistami. Teraz to właściwie zrobiona automatyzacja równoległości.
Aleksandr Dubinsky,

Powinieneś zaktualizować tę odpowiedź teraz po wydaniu Java 9.
Radiodef

4
Nie, @Radiodef. Pytanie dotyczy konkretnie rozwiązania Java 8.
Renato Wstecz

145

Operacje takeWhile i dropWhilezostały dodane do JDK 9. Twój przykładowy kod

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

zachowuje się dokładnie tak, jak się tego spodziewasz po skompilowaniu i uruchomieniu pod JDK 9.

JDK 9 został wydany. Można go pobrać tutaj: http://jdk.java.net/9/


3
Bezpośredni link do dokumentów podglądu dla JDK9 Stream, za pomocą takeWhile/ dropWhile: download.java.net/jdk9/docs/api/java/util/stream/Stream.html
Mil

1
Czy istnieje jakikolwiek powód, dlaczego nazywa się je takeWhilei dropWhilezamiast limitWhilei skipWhile, dla zachowania spójności z istniejących API?
Lukas Eder

10
@LukasEder takeWhilei dropWhilesą dość rozpowszechnione, występują w Scali, Python, Groovy, Ruby, Haskell i Clojure. Asymetria z skipi limitjest niefortunne. Może skipi limitpowinienem był zadzwonić dropi take, ale nie są one tak intuicyjne, chyba że znasz już Haskella.
Stuart Marks

3
@StuartMarks: Rozumiem to dropXXXi takeXXXsą bardziej popularnymi terminami, ale mogę osobiście żyć z bardziej SQL-esque limitXXXi skipXXX. Uważam, że ta nowa asymetria jest bardziej myląca niż indywidualny wybór terminów ... :) (przy okazji: Scala też ma drop(int)i take(int))
Lukas Eder

1
tak, pozwól mi tylko uaktualnić do Jdk 9 w produkcji. Wielu deweloperów wciąż korzysta z Jdk8, taka funkcja powinna była zostać dołączona do Streamingów od samego początku.
wilmol

50

allMatch()jest funkcją zwierającą, więc możesz jej użyć do zatrzymania przetwarzania. Główną wadą jest to, że musisz zrobić test dwa razy: raz, aby sprawdzić, czy powinieneś go przetworzyć, i jeszcze raz, aby sprawdzić, czy kontynuować.

IntStream
    .iterate(1, n -> n + 1)
    .peek(n->{if (n<10) System.out.println(n);})
    .allMatch(n->n < 10);

5
Na początku wydawało mi się to nieintuicyjne (biorąc pod uwagę nazwę metody), ale docs potwierdzają, że Stream.allMatch()jest to operacja zwarcia . To się zakończy nawet w nieskończonym strumieniu, takim jak IntStream.iterate(). Oczywiście z perspektywy czasu jest to rozsądna optymalizacja.
Bailey Parker,

3
To jest miłe, ale nie sądzę, aby dobrze komunikowało, że jego intencją jest ciało peek. Gdybym go spotkał w przyszłym miesiącu, zastanawiałbym się, dlaczego programista przede mną sprawdził, czy allMatchzignorował odpowiedź.
Joshua Goldberg

10
Wadą tego rozwiązania jest to, że zwraca wartość logiczną, więc nie można zbierać wyników strumienia w normalny sposób.
neXus

35

W odpowiedzi na odpowiedź @StuartMarks . Moja biblioteka StreamEx ma takeWhileoperację zgodną z bieżącą implementacją JDK-9. Podczas pracy pod JDK-9 po prostu deleguje się do implementacji JDK (przez MethodHandle.invokeExactco jest naprawdę szybki). Podczas pracy pod JDK-8 zostanie zastosowana implementacja „polyfill”. Korzystając z mojej biblioteki, problem można rozwiązać w następujący sposób:

IntStreamEx.iterate(1, n -> n + 1)
           .takeWhile(n -> n < 10)
           .forEach(System.out::println);

Dlaczego nie zaimplementowałeś go dla klasy StreamEx?
Someguy,

@Someguy Zrobiłem to.
Tagir Valeev

14

takeWhilejest jedną z funkcji udostępnianych przez bibliotekę protonpack .

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);

assertThat(finiteInts.collect(Collectors.toList()),
           hasSize(10));

11

Aktualizacja: Java 9 jest Streamteraz wyposażona w metodę takeWhile .

Nie potrzeba hacków ani innych rozwiązań. Po prostu użyj tego!


Jestem pewien, że można to znacznie poprawić: (ktoś może sprawić, że będzie wątkowo bezpieczny)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);

TakeWhile.stream(stream, n -> n < 10000)
         .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

Na pewno hack ... Nie elegancki - ale działa ~: D

class TakeWhile<T> implements Iterator<T> {

    private final Iterator<T> iterator;
    private final Predicate<T> predicate;
    private volatile T next;
    private volatile boolean keepGoing = true;

    public TakeWhile(Stream<T> s, Predicate<T> p) {
        this.iterator = s.iterator();
        this.predicate = p;
    }

    @Override
    public boolean hasNext() {
        if (!keepGoing) {
            return false;
        }
        if (next != null) {
            return true;
        }
        if (iterator.hasNext()) {
            next = iterator.next();
            keepGoing = predicate.test(next);
            if (!keepGoing) {
                next = null;
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        if (next == null) {
            if (!hasNext()) {
                throw new NoSuchElementException("Sorry. Nothing for you.");
            }
        }
        T temp = next;
        next = null;
        return temp;
    }

    public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
        TakeWhile tw = new TakeWhile(s, p);
        Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
        return StreamSupport.stream(split, false);
    }

}

8

Możesz użyć java8 + rxjava .

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          {
                System.out.println(n);
                return n < 10;
          }
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));

6

W rzeczywistości są 2 sposoby na zrobienie tego w Javie 8 bez żadnych dodatkowych bibliotek lub przy użyciu Java 9.

Jeśli chcesz wydrukować numery od 2 do 20 na konsoli, możesz to zrobić:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

lub

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

Dane wyjściowe są w obu przypadkach:

2
4
6
8
10
12
14
16
18
20

Nikt nie wspomniał anyMatch jeszcze. To jest powód tego postu.


5

Jest to źródło skopiowane z JDK 9 java.util.stream.Stream.takeWhile (Predicate). Mała różnica w pracy z JDK 8.

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) {
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t))) {   // and test on element passes
                action.accept(t);           // then accept element
                return true;
            } else {
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            }
        }

        @Override
        public Comparator<? super T> getComparator() {
            return s.getComparator();
        }

        @Override
        public void accept(T t) {
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}

4

Oto wersja wykonana na ints - jak zadano w pytaniu.

Stosowanie:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

Oto kod StreamUtil:

import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class StreamUtil
{
    public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
    {
        return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
    }

    private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
    {
        private final PrimitiveIterator.OfInt iterator;
        private final IntPredicate predicate;

        public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
        {
            super(Long.MAX_VALUE, IMMUTABLE);
            this.iterator = stream.iterator();
            this.predicate = predicate;
        }

        @Override
        public boolean tryAdvance(IntConsumer action)
        {
            if (iterator.hasNext()) {
                int value = iterator.nextInt();
                if (predicate.test(value)) {
                    action.accept(value);
                    return true;
                }
            }

            return false;
        }
    }
}

2

Idź po bibliotekę AbacusUtil . Zapewnia dokładnie taki interfejs API, jaki chcesz i więcej:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

Deklaracja : Jestem programistą AbacusUtil.


0

Nie można przerwać strumienia, z wyjątkiem operacji terminalu powodującego zwarcie, która pozostawiłaby niektóre wartości strumienia nieprzetworzone niezależnie od ich wartości. Ale jeśli chcesz uniknąć operacji na strumieniu, możesz dodać transformację i filtrować do strumienia:

import java.util.Objects;

class ThingProcessor
{
    static Thing returnNullOnCondition(Thing thing)
    {    return( (*** is condition met ***)? null : thing);    }

    void processThings(Collection<Thing> thingsCollection)
    {
        thingsCollection.stream()
        *** regular stream processing ***
        .map(ThingProcessor::returnNullOnCondition)
        .filter(Objects::nonNull)
        *** continue stream processing ***
    }
} // class ThingProcessor

To przekształca strumień rzeczy w wartości zerowe, gdy rzeczy spełniają pewien warunek, a następnie odfiltrowuje wartości zerowe. Jeśli chcesz pozwolić sobie na efekty uboczne, możesz ustawić wartość warunku na true po napotkaniu czegoś, aby wszystkie kolejne rzeczy były odfiltrowywane bez względu na ich wartość. Ale nawet jeśli nie, możesz zaoszczędzić dużo (jeśli nie całkiem) przetwarzania, odfiltrowując wartości ze strumienia, którego nie chcesz przetwarzać.


To kiepskie, że jakaś anonimowa ocena oceniła moją odpowiedź bez podania przyczyny. Więc ani ja, ani żaden inny czytelnik nie wie, co jest nie tak z moją odpowiedzią. Wobec braku uzasadnienia uznam ich krytykę za nieważną, a moja odpowiedź jako opublikowana jest poprawna.
Matthew

Twoja odpowiedź nie rozwiązuje problemu PO, który dotyczy nieskończonych strumieni. Wydaje się również, że niepotrzebnie komplikuje rzeczy, ponieważ możesz zapisać warunek w samym wywołaniu filter (), bez potrzeby map (). Pytanie ma już przykładowy kod, po prostu spróbuj zastosować odpowiedź na ten kod, a zobaczysz, że program zapętli się na zawsze.
SenoCtar,

0

Nawet ja miałem podobne wymaganie - wywołaj usługę internetową, jeśli się nie powiedzie, spróbuj ponownie 3 razy. Jeśli nie powiedzie się nawet po wielu próbach, wyślij powiadomienie e-mailem. Po google dużo, anyMatch()przyszedł jako wybawiciel. Mój przykładowy kod w następujący sposób. W poniższym przykładzie, jeśli metoda webServiceCall zwróci wartość true w samej pierwszej iteracji, strumień nie będzie iterował dalej, tak jak to wywołaliśmyanyMatch() . Wierzę, że tego właśnie szukasz.

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

class TrialStreamMatch {

public static void main(String[] args) {        
    if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
         //Code for sending email notifications
    }
}

public static boolean webServiceCall(int i){
    //For time being, I have written a code for generating boolean randomly
    //This whole piece needs to be replaced by actual web-service client code
    boolean bool = ThreadLocalRandom.current().nextBoolean();
    System.out.println("Iteration index :: "+i+" bool :: "+bool);

    //Return success status -- true or false
    return bool;
}

0

Jeśli znasz dokładną liczbę powtórzeń, które zostaną wykonane, możesz to zrobić

IntStream
          .iterate(1, n -> n + 1)
          .limit(10)
          .forEach(System.out::println);

1
Chociaż może to odpowiedzieć na pytanie autora, brakuje w nim niektórych wyjaśnień i linków do dokumentacji. Fragmenty surowego kodu nie są bardzo pomocne bez niektórych fraz wokół niego. Bardzo pomocne może się okazać, jak napisać dobrą odpowiedź . Edytuj swoją odpowiedź.
hellow

0
    IntStream.iterate(1, n -> n + 1)
    .peek(System.out::println) //it will be executed 9 times
    .filter(n->n>=9)
    .findAny();

zamiast piku możesz użyć mapToObj, aby zwrócić końcowy obiekt lub komunikat

    IntStream.iterate(1, n -> n + 1)
    .mapToObj(n->{   //it will be executed 9 times
            if(n<9)
                return "";
            return "Loop repeats " + n + " times";});
    .filter(message->!message.isEmpty())
    .findAny()
    .ifPresent(System.out::println);

-2

Jeśli masz inny problem, może być potrzebne inne rozwiązanie, ale w przypadku obecnego problemu po prostu wybrałbym:

IntStream
    .iterate(1, n -> n + 1)
    .limit(10)
    .forEach(System.out::println);

-2

Może być trochę off topic, ale to, co mamy na List<T>raczej niżStream<T> .

Najpierw musisz mieć takemetodę util. Ta metoda wymaga pierwszych nelementów:

static <T> List<T> take(List<T> l, int n) {
    if (n <= 0) {
        return newArrayList();
    } else {
        int takeTo = Math.min(Math.max(n, 0), l.size());
        return l.subList(0, takeTo);
    }
}

po prostu działa jak scala.List.take

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));

    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

teraz napisanie takeWhilemetody opartej na . będzie dość prostetake

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
    return l.stream().
            filter(p.negate()).findFirst(). // find first element when p is false
            map(l::indexOf).        // find the index of that element
            map(i -> take(l, i)).   // take up to the index
            orElse(l);  // return full list if p is true for all elements
}

to działa tak:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

ta implementacja kilkukrotnie iteruje listę częściowo, ale nie doda O(n^2)operacji dodawania . Mam nadzieję, że to możliwe.


-3

Mam inne szybkie rozwiązanie, wdrażając to (co jest raczej nieczyste, ale masz pomysł):

public static void main(String[] args) {
    System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
            .map(o -> o.toString()).collect(Collectors.joining(", ")));
}

static interface TerminatedStream<T> {
    Stream<T> terminateOn(T e);
}

static class StreamUtil {
    static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) {
        return new TerminatedStream<T>() {
            public Stream<T> terminateOn(T e) {
                Builder<T> builder = Stream.<T> builder().add(seed);
                T current = seed;
                while (!current.equals(e)) {
                    current = op.apply(current);
                    builder.add(current);
                }
                return builder.build();
            }
        };
    }
}

2
Oceniasz góry oceniasz cały strumień! A jeśli currentnigdy .equals(e)nie dostaniesz nieskończonej pętli. Oba, nawet jeśli później złożysz wniosek np .limit(1). To o wiele gorsze niż „nieczyste” .
charlie

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.