Java 8 Stream z przetwarzaniem wsadowym


100

Mam duży plik zawierający listę pozycji.

Chciałbym utworzyć partię pozycji, wykonać żądanie HTTP z tą partią (wszystkie pozycje są potrzebne jako parametry w żądaniu HTTP). Mogę to zrobić bardzo łatwo za pomocą forpętli, ale jako miłośnik Java 8 chcę spróbować napisać to za pomocą frameworka Stream Java 8 (i czerpać korzyści z leniwego przetwarzania).

Przykład:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

Chcę zrobić coś długiego lazyFileStream.group(500).map(processBatch).collect(toList())

Jaki byłby najlepszy sposób na zrobienie tego?


Nie do końca rozumiem, jak przeprowadzić grupowanie, przepraszam, ale wiersze plików # będą leniwie odczytywać zawartość pliku.
Toby

1
więc w zasadzie potrzebujesz odwrotności flatMap(+ dodatkowej płaskiej mapy, aby ponownie zwinąć strumienie)? Nie sądzę, aby coś takiego istniało jako wygodna metoda w standardowej bibliotece. Albo będziesz musiał znaleźć bibliotekę innej firmy, albo napisać własną w oparciu o spliteratory i / lub kolekcjoner emitujący strumień strumieni
the8472

3
Może możesz połączyć Stream.generatez reader::readLinei limit, ale problem polega na tym, że strumienie nie działają dobrze z wyjątkami. Prawdopodobnie nie jest to również możliwe do zrównoleglenia. Myślę, że forpętla jest nadal najlepszą opcją.
tobias_k

Właśnie dodałem przykładowy kod. Myślę, że flatMap nie jest właściwą drogą. Podejrzewam, że będę musiał napisać niestandardowego Spliteratora
Andy Dang

1
Na takie pytania ukułem termin „nadużycie strumienia”.
kervin

Odpowiedzi:


13

Uwaga! To rozwiązanie odczytuje cały plik przed uruchomieniem forEach.

Możesz to zrobić za pomocą jOOλ , biblioteki, która rozszerza strumienie Java 8 dla przypadków użycia jednowątkowych, sekwencyjnych strumieni:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

Za kulisami zipWithIndex()jest po prostu:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... mając na uwadze groupBy()wygodę API dla:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(Zastrzeżenie: pracuję dla firmy stojącej za jOOλ)


Łał. To jest DOKŁADNIE to, czego szukam. Nasz system zwykle przetwarza strumienie danych po kolei, więc byłoby to dobre rozwiązanie, aby przejść na Javę 8.
Andy Dang,

16
Zauważ, że to rozwiązanie niepotrzebnie przechowuje cały strumień wejściowy do półproduktu Map(w przeciwieństwie na przykład do rozwiązania Ben Manesa)
Tagir Valeev

128

Aby uzyskać kompletność, oto rozwiązanie Guava .

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

W pytaniu kolekcja jest dostępna, więc strumień nie jest potrzebny i można go zapisać jako,

Iterables.partition(data, batchSize).forEach(this::process);

11
Lists.partitionto kolejna odmiana, o której powinienem był wspomnieć.
Ben Manes,

2
to jest leniwe, prawda? nie wywoła całości Streamdo pamięci przed przetworzeniem odpowiedniej partii
orirab

1
@orirab yes. Jest leniwy między partiami, ponieważ zużywa batchSizeelementy na iterację.
Ben Manes,

Czy mógłbyś
rzucić

62

Możliwa jest również implementacja czystej Java-8:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

Zauważ, że w przeciwieństwie do JOOl może dobrze działać równolegle (pod warunkiem, że datajest to lista o swobodnym dostępie).


1
co, jeśli twoje dane są w rzeczywistości strumieniem? (powiedzmy linie w pliku lub nawet z sieci).
Omry Yadan

7
@OmryYadan, pytanie dotyczyło wkładu z List(patrz data.size(), data.get()w pytaniu). Odpowiadam na zadane pytanie. Jeśli masz inne pytanie, zadaj je zamiast tego (chociaż myślę, że pytanie dotyczące strumienia również zostało już zadane).
Tagir Valeev

1
Jak równolegle przetwarzać partie?
soup_boy

38

Rozwiązanie Pure Java 8 :

Możemy stworzyć niestandardowy kolektor, aby zrobić to elegancko, który zajmuje a batch sizei a, Consumeraby przetworzyć każdą partię:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

Opcjonalnie utwórz pomocniczą klasę narzędziową:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

Przykładowe użycie:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

Opublikowałem również swój kod na GitHub, jeśli ktoś chce rzucić okiem:

Link do Github


1
To dobre rozwiązanie, chyba że nie możesz zmieścić w pamięci wszystkich elementów ze swojego strumienia. Nie będzie również działać na niekończących się strumieniach - metodą collect jest terminal, co oznacza, że ​​zamiast generować strumień partii, będzie czekał, aż strumień się zakończy, a następnie przetworzy wynik w partiach.
Alex Ackerman

2
@AlexAckerman nieskończony strumień oznacza, że ​​finiszer nigdy nie zostanie wywołany, ale akumulator nadal będzie nazywany, więc elementy nadal będą przetwarzane. Ponadto wymaga, aby w pamięci znajdowały się tylko elementy o rozmiarze partii.
Solubris

@Solubris, masz rację! Mój zły, dziękuję za wskazanie tego - nie będę usuwał komentarza do odniesienia, jeśli ktoś ma ten sam pomysł na to, jak działa metoda collect.
Alex Ackerman

Listę przesłaną do konsumenta należy skopiować, aby modyfikacja była bezpieczna, np .: batchProcessor.accept (copyOf (ts))
Solubris

19

Napisałem niestandardowy Spliterator dla takich scenariuszy. Wypełni listy o danym rozmiarze ze strumienia wejściowego. Zaletą tego podejścia jest to, że będzie wykonywać leniwe przetwarzanie i będzie działać z innymi funkcjami strumienia.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}

naprawdę pomocny. Jeśli ktoś chce wsadować na jakieś niestandardowe kryteria (na przykład rozmiar kolekcji w bajtach), możesz delegować swój niestandardowy predykat i użyć go w pętli for jako warunku (pętla imho while będzie wtedy bardziej czytelna)
pls

Nie jestem pewien, czy implementacja jest poprawna. Na przykład, jeśli strumień bazowy jest SUBSIZEDpodziałem zwracanym z, trySplitmoże mieć więcej elementów niż przed podziałem (jeśli podział następuje w środku partii).
Słód

@Malt, jeśli moje rozumienie Spliteratorsjest poprawne, to czy trySplitnależy zawsze podzielić dane na dwie mniej więcej równe części, aby wynik nigdy nie był większy niż oryginał?
Bruce Hamilton

@BruceHamilton Niestety, zgodnie z dokumentacją, części nie mogą być z grubsza równe. Oni muszą być równe:if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Malt

Tak, to jest zgodne z moim rozumieniem podziału Spliteratora. Trudno mi jednak zrozumieć, w jaki sposób „podziały zwrócone z trySplit mogą zawierać więcej elementów niż przed podziałem”. Czy mógłbyś wyjaśnić, co masz na myśli?
Bruce Hamilton

14

Mieliśmy podobny problem do rozwiązania. Chcieliśmy wziąć strumień, który był większy niż pamięć systemowa (iterując po wszystkich obiektach w bazie danych) i jak najlepiej losowo uporządkować kolejność - pomyśleliśmy, że byłoby dobrze zbuforować 10000 elementów i je losować.

Celem była funkcja, która przyjmowała strumień.

Spośród proponowanych tutaj rozwiązań wydaje się, że istnieje szereg opcji:

  • Użyj różnych dodatkowych bibliotek innych niż java 8
  • Zacznij od czegoś, co nie jest strumieniem - np. Listą dostępu swobodnego
  • Miej strumień, który można łatwo podzielić w rozdzielaczu

Początkowo naszym instynktem było użycie niestandardowego kolektora, ale oznaczało to rezygnację z przesyłania strumieniowego. Powyższe niestandardowe rozwiązanie kolektora jest bardzo dobre i prawie go użyliśmy.

Oto rozwiązanie, które oszukuje, wykorzystując fakt, że Streams może dać ci, Iteratorktórego możesz użyć jako włazu ewakuacyjnego, abyś mógł zrobić coś więcej, czego strumienie nie obsługują. IteratorJest przekształcany z powrotem do strumienia za pomocą innej, Java 8 StreamSupportczary.

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

Prosty przykład użycia tego wyglądałby następująco:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

Powyższe wydruki

[A, B, C]
[D, E, F]

W naszym przypadku chcieliśmy przetasować partie, a następnie zachować je jako strumień - wyglądało to tak:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

Wyprowadza coś takiego (jest losowy, więc za każdym razem inny)

A
C
B
E
D
F

Sekretem jest to, że zawsze istnieje strumień, więc możesz albo działać na strumieniu partii, albo zrobić coś z każdą partią, a następnie z flatMappowrotem do strumienia. Jeszcze lepiej, wszystkie powyższe tylko działa jako ostateczne forEachlub collectczy inne wyrazy kończące PULL dane przez strumień.

Okazuje się, że iteratorjest to szczególny rodzaj operacji kończącej na strumieniu i nie powoduje on uruchomienia całego strumienia i zapamiętania go! Podziękowania dla facetów z Java 8 za genialny projekt!


I to bardzo dobrze, że w pełni iterujesz każdą partię, gdy jest zbierana i utrzymujesz do - Listnie możesz odroczyć iteracji elementów wewnątrz partii, ponieważ konsument może chcieć pominąć całą partię, a jeśli nie zużyjesz elementy to nie przeskakują zbyt daleko. (Zaimplementowałem jeden z nich w C #, chociaż było to znacznie łatwiejsze.)
ErikE

9

Możesz także użyć RxJava :

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

lub

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

lub

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();

8

Możesz też rzucić okiem na cyklop-reaguj , jestem autorem tej biblioteki. Implementuje interfejs jOOλ (i przez rozszerzenie JDK 8 Streams), ale w przeciwieństwie do JDK 8 Parallel Streams skupia się na operacjach asynchronicznych (takich jak potencjalne blokowanie asynchronicznych wywołań we / wy). JDK Parallel Streams, z kolei skupia się na równoległości danych dla operacji związanych z procesorem. Działa poprzez zarządzanie agregatami zadań opartych na Future pod maską, ale przedstawia standardowe rozszerzone API dla użytkowników końcowych.

Ten przykładowy kod może pomóc w rozpoczęciu

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

W tym miejscu znajduje się samouczek dotyczący grupowania

I bardziej ogólny samouczek tutaj

Aby użyć własnej puli wątków (która prawdopodobnie jest bardziej odpowiednia do blokowania we / wy), możesz rozpocząć przetwarzanie za pomocą

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();

3

Przykład w czystej Javie 8, który działa również z równoległymi strumieniami.

Jak używać:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

Deklaracja i implementacja metody:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}

2

Z całą uczciwością spójrz na eleganckie rozwiązanie Vavr :

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);

1

Prosty przykład z użyciem Spliteratora

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

Odpowiedź Bruce'a jest bardziej wyczerpująca, ale szukałem czegoś szybkiego i brudnego do przetworzenia wielu plików.


1

jest to czysta java rozwiązanie, które jest oceniane leniwie.

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}

1

Możesz użyć apache.commons:

ListUtils.partition(ListOfLines, 500).stream()
                .map(partition -> processBatch(partition)
                .collect(Collectors.toList());

Część partycjonowania jest wykonywana bez lenistwa, ale po podzieleniu listy na partycje uzyskujesz korzyści z pracy ze strumieniami (np. Użyj strumieni równoległych, dodaj filtry itp.). Inne odpowiedzi sugerowały bardziej rozbudowane rozwiązania, ale czasami czytelność i łatwość konserwacji są ważniejsze (a czasami nie są :-))


Nie jestem pewien, kto zagłosował, ale byłoby miło zrozumieć dlaczego. Podałem odpowiedź, która uzupełniła inne odpowiedzi dla osób, które nie mogą używać guawy
Tal Joffe

Przetwarzasz tutaj listę, a nie strumień.
Drakemor

@Drakemor Przetwarzam strumień list podrzędnych. zwróć uwagę na wywołanie funkcji stream ()
Tal Joffe

Ale najpierw zmieniasz go w listę pod-list, które nie będą działać poprawnie w przypadku prawdziwych danych przesyłanych strumieniowo. Oto odniesienie do partycji: commons.apache.org/proper/commons-collections/apidocs/org/…
Drakemor

1
TBH Nie rozumiem w pełni twojego argumentu, ale myślę, że możemy się zgodzić. Zredagowałem moją odpowiedź, aby odzwierciedlić naszą rozmowę tutaj. Dzięki za dyskusję
Tal Joffe

1

Można to łatwo zrobić za pomocą Reaktora :

Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
            .map(line -> someProcessingOfSingleLine(line))
            .buffer(BUFFER_SIZE)
            .subscribe(apiService::makeHttpRequest);

0

Za pomocą Java 8i com.google.common.collect.Listsmożesz zrobić coś takiego:

public class BatchProcessingUtil {
    public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
        List<List<T>> batches = Lists.partition(data, batchSize);
        return batches.stream()
                .map(processFunction) // Send each batch to the process function
                .flatMap(Collection::stream) // flat results to gather them in 1 stream
                .collect(Collectors.toList());
    }
}

Tutaj Tjest typ pozycji na liście wejściowej i Utyp pozycji na liście wyjściowej

Możesz go używać w ten sposób:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
    userKeys,
    10, // Batch Size
    partialKeys -> service.getUsers(partialKeys)
);
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.