Kompresowanie strumieni za pomocą JDK8 z lambdą (java.util.stream.Streams.zip)


149

W JDK 8 z lambdą b93 istniała klasa java.util.stream.Streams.zip w b93, która mogłaby zostać użyta do zip strumieni (jest to zilustrowane w samouczku Exploring Java8 Lambdas. Part 1 autorstwa Dhananjay Nene ). Ta funkcja:

Tworzy leniwy i sekwencyjny połączony Stream, którego elementy są wynikiem połączenia elementów dwóch strumieni.

Jednak w b98 to zniknęło. W rzeczywistości Streamsklasa nie jest nawet dostępna w java.util.stream w wersji b98 .

Czy ta funkcja została przeniesiona, a jeśli tak, to jak w zwięzły sposób skompresować strumienie za pomocą b98?

Aplikacja, o której myślę, jest w tej implementacji Javy Shen , gdzie zastąpiłem funkcjonalność zip w

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

funkcje z raczej rozwlekłym kodem (który nie korzysta z funkcjonalności z b98).


3
Ach, właśnie się dowiedziałem, że wygląda na to, że został całkowicie usunięty: mail.openjdk.java.net/pipermail/lambda-libs-spec-observers/ ...
artella

„Exploring Java8 Lambdas. Part 1” - nowy link do tego artykułu to blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1
Aleksei Egorov

Odpowiedzi:


77

Też tego potrzebowałem, więc po prostu wziąłem kod źródłowy z b93 i umieściłem go w klasie „util”. Musiałem go nieco zmodyfikować, aby działał z obecnym API.

W celach informacyjnych oto działający kod (weź to na własne ryzyko ...):

public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
                                     Stream<? extends B> b,
                                     BiFunction<? super A, ? super B, ? extends C> zipper) {
    Objects.requireNonNull(zipper);
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();

    // Zipping looses DISTINCT and SORTED characteristics
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
            ~(Spliterator.DISTINCT | Spliterator.SORTED);

    long zipSize = ((characteristics & Spliterator.SIZED) != 0)
            ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
            : -1;

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
    Iterator<C> cIterator = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return aIterator.hasNext() && bIterator.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(aIterator.next(), bIterator.next());
        }
    };

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
    return (a.isParallel() || b.isParallel())
           ? StreamSupport.stream(split, true)
           : StreamSupport.stream(split, false);
}

1
Czy wynikowy strumień nie powinien wyglądać tak, SIZEDjeśli jeden ze strumieni to SIZEDnie oba?
Didier L

5
Nie sądzę. Aby SIZEDta implementacja działała, muszą być oba strumienie . W rzeczywistości zależy to od tego, jak zdefiniujesz zamykanie. Czy na przykład powinieneś być w stanie spakować dwa strumienie o różnych rozmiarach? Jak wtedy wyglądałby wynikowy strumień? Uważam, że właśnie dlatego ta funkcja została faktycznie pominięta w API. Można to zrobić na wiele sposobów i od użytkownika zależy, jakie zachowanie powinno być „właściwe”. Czy odrzucić elementy z dłuższego strumienia, czy uzupełnić krótszą listę? Jeśli tak, to jakie wartości?
siki

O ile czegoś nie brakuje, nie ma potrzeby rzucania (np. Do Spliterator<A>).
jub0bs

Czy istnieje witryna internetowa, na której jest hostowany kod źródłowy Java 8 b93? Mam problem ze znalezieniem tego.
Starwarswii

42

zip jest jedną z funkcji udostępnianych przez bibliotekę protonpack .

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");

List<String> zipped = StreamUtils.zip(streamA,
                                      streamB,
                                      (a, b) -> a + " is for " + b)
                                 .collect(Collectors.toList());

assertThat(zipped,
           contains("A is for Apple", "B is for Banana", "C is for Carrot"));


34

Jeśli masz guawę w swoim projekcie, możesz użyć metody Streams.zip (została dodana w Guava 21):

Zwraca strumień, w którym każdy element jest wynikiem przekazania odpowiedniego elementu każdego ze streamA i streamB do funkcji. Wynikowy strumień będzie tak długi, jak krótszy z dwóch strumieni wejściowych; jeśli jeden strumień jest dłuższy, jego dodatkowe elementy zostaną zignorowane. Powstały strumień nie daje się skutecznie podzielić. Może to zaszkodzić wydajności równoległej.

 public class Streams {
     ...

     public static <A, B, R> Stream<R> zip(Stream<A> streamA,
             Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
         ...
     }
 }

26

Kompresowanie dwóch strumieni za pomocą JDK8 z lambda ( gist ).

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
    final Iterator<A> iteratorA = streamA.iterator();
    final Iterator<B> iteratorB = streamB.iterator();
    final Iterator<C> iteratorC = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return iteratorA.hasNext() && iteratorB.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(iteratorA.next(), iteratorB.next());
        }
    };
    final boolean parallel = streamA.isParallel() || streamB.isParallel();
    return iteratorToFiniteStream(iteratorC, parallel);
}

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
    final Iterable<T> iterable = () -> iterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}

2
Ładne rozwiązanie i (stosunkowo) kompaktowe! Wymaga umieszczenia import java.util.function.*;i import java.util.stream.*;na górze pliku.
sffc

Zauważ, że jest to operacja terminalowa w strumieniu. Oznacza to, że w przypadku nieskończonych strumieni ta metoda nie
działa

2
Tyle bezużyteczne owijarki: tutaj () -> iteratori tutaj znowu: iterable.spliterator(). Dlaczego nie wdrożyć bezpośrednio a Spliteratorzamiast an Iterator? Sprawdź odpowiedź na @Doradus stackoverflow.com/a/46230233/1140754
Miguel Gamboa

20

Ponieważ nie mogę sobie wyobrazić zastosowania zapinania na kolekcje inne niż indeksowane (listy) i jestem wielkim fanem prostoty, to byłoby moje rozwiązanie:

<A,B,C>  Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){
     int shortestLength = Math.min(lista.size(),listb.size());
     return IntStream.range(0,shortestLength).mapToObj( i -> {
          return zipper.apply(lista.get(i), listb.get(i));
     });        
}

1
Myślę, że mapToObjectpowinno mapToObj.
seanf

jeśli listy nie ma RandomAccess(np. na listach z
linkami

Zdecydowanie. Jednak większość programistów Java doskonale zdaje sobie sprawę, że LinkedList ma słabą wydajność w przypadku operacji dostępu do indeksów.
Rafael

11

Metody wspomnianej klasy zostały przeniesione do samego Streaminterfejsu na rzecz metod domyślnych. Ale wydaje się, że zipmetoda została usunięta. Może dlatego, że nie jest jasne, jakie powinno być domyślne zachowanie dla strumieni o różnych rozmiarach. Jednak wdrożenie pożądanego zachowania jest proste:

static <T> boolean every(
  Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
}
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
      .findFirst().orElse(null);
}

Czy predicateprzekazany do filtra nie jest stanem ? To narusza kontrakt metody, a zwłaszcza nie zadziała podczas równoległego przetwarzania strumienia.
Andreas

2
@ Andreas: żadne z rozwiązań tutaj nie obsługuje przetwarzania równoległego. Ponieważ moje metody nie zwracają strumienia, zapewniają, że strumienie nie działają równolegle. Podobnie kod zaakceptowanej odpowiedzi zwraca strumień, który można przekształcić w strumień równoległy, ale w rzeczywistości nic nie robi równolegle. To powiedziawszy, stanowe predykaty są odradzane, ale nie naruszają umowy. Mogą być nawet używane w kontekście równoległym, jeśli upewnisz się, że aktualizacja stanu jest bezpieczna dla wątków. W niektórych sytuacjach są one nie do uniknięcia, np. Przekształcenie strumienia w odrębny jest sam w sobie predykatem stanowym .
Holger,

2
@Andreas: możesz zgadnąć, dlaczego te operacje zostały usunięte z Java API…
Holger

8

Z pokorą proponuję taką realizację. Wynikowy strumień jest obcinany do krótszego z dwóch strumieni wejściowych.

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
    Spliterator<L> lefts = leftStream.spliterator();
    Spliterator<R> rights = rightStream.spliterator();
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
        }
    }, leftStream.isParallel() || rightStream.isParallel());
}

Podoba mi się twoja propozycja. Ale nie do końca zgadzam się z ostatnim .., leftStream.isParallel() || rightStream.isParallel(). Myślę, że nie ma to żadnego efektu, ponieważ AbstractSpliteratordomyślnie oferuje ograniczoną równoległość. Więc myślę, że ostateczny wynik będzie taki sam jak zaliczenie false.
Miguel Gamboa

@MiguelGamboa - dzięki za komentarz. Nie jestem pewien, co masz na myśli, mówiąc „domyślnie ograniczona równoległość” - czy masz link do niektórych dokumentów?
Doradus

6

Biblioteka Lazy-Seq zapewnia funkcjonalność zip.

https://github.com/nurkiewicz/LazySeq

Ta biblioteka jest mocno zainspirowana scala.collection.immutable.Streami ma na celu zapewnienie niezmiennej, bezpiecznej dla wątków i łatwej w użyciu implementacji leniwych sekwencji, prawdopodobnie nieskończonej.


5

Korzystając z najnowszej biblioteki Guava (dla Streamsklasy), powinieneś być w stanie to zrobić

final Map<String, String> result = 
    Streams.zip(
        collection1.stream(), 
        collection2.stream(), 
        AbstractMap.SimpleEntry::new)
    .collect(Collectors.toMap(e -> e.getKey(), e  -> e.getValue()));

2

Czy to zadziała dla Ciebie? Jest to krótka funkcja, która leniwie ocenia strumienie, które jest kompresowane, więc możesz dostarczyć jej nieskończoną liczbę strumieni (nie musi brać rozmiaru strumieni, które są skompresowane).

Jeśli strumienie są skończone, zatrzymuje się, gdy tylko w jednym ze strumieni skończą się elementy.

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;

class StreamUtils {
    static <ARG1, ARG2, RESULT> Stream<RESULT> zip(
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner) {
        final var i2 = s2.iterator();
        return s1.map(x1 -> i2.hasNext() ? combiner.apply(x1, i2.next()) : null)
                .takeWhile(Objects::nonNull);
    }
}

Oto kod testu jednostkowego (znacznie dłuższy niż sam kod!)

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class StreamUtilsTest {
    @ParameterizedTest
    @MethodSource("shouldZipTestCases")
    <ARG1, ARG2, RESULT>
    void shouldZip(
            String testName,
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner,
            Stream<RESULT> expected) {
        var actual = StreamUtils.zip(s1, s2, combiner);

        assertEquals(
                expected.collect(Collectors.toList()),
                actual.collect(Collectors.toList()),
                testName);
    }

    private static Stream<Arguments> shouldZipTestCases() {
        return Stream.of(
                Arguments.of(
                        "Two empty streams",
                        Stream.empty(),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One singleton and one empty stream",
                        Stream.of(1),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One empty and one singleton stream",
                        Stream.empty(),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "Two singleton streams",
                        Stream.of("blah"),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blah", 1))),
                Arguments.of(
                        "One singleton, one multiple stream",
                        Stream.of("blob"),
                        Stream.of(2, 3),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blob", 2))),
                Arguments.of(
                        "One multiple, one singleton stream",
                        Stream.of("foo", "bar"),
                        Stream.of(4),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("foo", 4))),
                Arguments.of(
                        "Two multiple streams",
                        Stream.of("nine", "eleven"),
                        Stream.of(10, 12),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("nine", 10), pair("eleven", 12)))
        );
    }

    private static List<Object> pair(Object o1, Object o2) {
        return List.of(o1, o2);
    }

    static private <T1, T2> List<Object> combine(T1 o1, T2 o2) {
        return List.of(o1, o2);
    }

    @Test
    void shouldLazilyEvaluateInZip() {
        final var a = new AtomicInteger();
        final var b = new AtomicInteger();
        final var zipped = StreamUtils.zip(
                Stream.generate(a::incrementAndGet),
                Stream.generate(b::decrementAndGet),
                (xa, xb) -> xb + 3 * xa);

        assertEquals(0, a.get(), "Should not have evaluated a at start");
        assertEquals(0, b.get(), "Should not have evaluated b at start");

        final var takeTwo = zipped.limit(2);

        assertEquals(0, a.get(), "Should not have evaluated a at take");
        assertEquals(0, b.get(), "Should not have evaluated b at take");

        final var list = takeTwo.collect(Collectors.toList());

        assertEquals(2, a.get(), "Should have evaluated a after collect");
        assertEquals(-2, b.get(), "Should have evaluated b after collect");
        assertEquals(List.of(2, 4), list);
    }
}

Musiałem porzucić takeWhilena końcu to, że nie wydaje się być w java8, ale nie stanowi to problemu, ponieważ wywoływany może odfiltrować wszelkie wartości null, które występują, gdy spakowane strumienie nie są tego samego rozmiaru. Myślę, że ta odpowiedź powinna być odpowiedzią numer 1, ponieważ jest spójna i zrozumiała. świetna robota dzięki jeszcze raz.
simbo1905

1
public class Tuple<S,T> {
    private final S object1;
    private final T object2;

    public Tuple(S object1, T object2) {
        this.object1 = object1;
        this.object2 = object2;
    }

    public S getObject1() {
        return object1;
    }

    public T getObject2() {
        return object2;
    }
}


public class StreamUtils {

    private StreamUtils() {
    }

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) {
        Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
        Iterator<Integer> integerIterator = integerStream.iterator();
        return stream.map(x -> new Tuple<>(integerIterator.next(), x));
    }
}

1

Cyklop-reakcja AOL , do której się przyczyniam, zapewnia również funkcjonalność kompresowania, zarówno poprzez rozszerzoną implementację Stream , która również implementuje interfejs reaktywnych strumieni ReactiveSeq, jak i za pośrednictwem StreamUtils, który oferuje wiele tych samych funkcji za pośrednictwem metod statycznych do standardowych strumieni Java.

 List<Tuple2<Integer,Integer>> list =  ReactiveSeq.of(1,2,3,4,5,6)
                                                  .zip(Stream.of(100,200,300,400));


  List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
                                                  Stream.of(100,200,300,400));

Oferuje również bardziej uogólnione zamykanie oparte na aplikacjach. Na przykład

   ReactiveSeq.of("a","b","c")
              .ap3(this::concat)
              .ap(of("1","2","3"))
              .ap(of(".","?","!"))
              .toList();

   //List("a1.","b2?","c3!");

   private String concat(String a, String b, String c){
    return a+b+c;
   }

A nawet możliwość sparowania każdego elementu w jednym strumieniu z każdym elementem w innym

   ReactiveSeq.of("a","b","c")
              .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);

   //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")

0

Jeśli ktoś jeszcze tego potrzebuje, StreamEx.zipWithw bibliotece streamex jest funkcja :

StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);

fullNames.forEach(System.out::println);  // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"

-1

To jest świetne. Musiałem spakować dwa strumienie do mapy, przy czym jeden strumień był kluczem, a drugi wartością

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");    
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
                    streamB,
                    (a, b) -> {
                        final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
                        return entry;
                    });

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));

Wynik: {A = jabłko, B = banan, C = marchewka}

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.