Chcę użyć Streamdo równoległego przetwarzania heterogenicznego zestawu zdalnie przechowywanych plików JSON o nieznanej liczbie (liczba plików nie jest znana z góry). Rozmiar plików może się znacznie różnić, od 1 rekordu JSON na plik do 100 000 rekordów w niektórych innych plikach. Rekord JSON w tym przypadku oznacza samodzielne JSON obiekt reprezentowany jako jeden wiersz w pliku.
Naprawdę chcę do tego używać strumieni, więc wdrożyłem to Spliterator:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
Problem, który mam, polega na tym, że podczas gdy strumień pięknie zrównuje się na początku, w końcu największy plik jest przetwarzany w jednym wątku. Uważam, że bliższa przyczyna jest dobrze udokumentowana: spliterator jest „niezrównoważony”.
Mówiąc bardziej konkretnie, wydaje się, że trySplitmetoda nie jest wywoływana po pewnym momencie Stream.forEachcyklu życia, więc dodatkowa logika do dystrybucji małych partii na końcu trySplitjest rzadko wykonywana.
Zauważ, że wszystkie spliteratory zwrócone z trySplit współużytkują ten sam pathsiterator. Pomyślałem, że to naprawdę sprytny sposób na zrównoważenie pracy we wszystkich rozdzielaczach, ale to nie wystarczyło do osiągnięcia pełnej równoległości.
Chciałbym, aby równoległe przetwarzanie przebiegało najpierw między plikami, a następnie, gdy niewiele dużych plików pozostało rozdzielających się, chcę przeprowadzić równoległość między fragmentami pozostałych plików. Taki był cel elsebloku pod koniec trySplit.
Czy istnieje prosty / prosty / kanoniczny sposób rozwiązania tego problemu?
Long.MAX_VALUEże powrót powoduje nadmierne i niepotrzebne dzielenie, podczas gdy wszelkie szacunki inne niż Long.MAX_VALUEpowodują zatrzymanie dalszego podziału, zabijając równoległość. Zwrócenie zestawu dokładnych szacunków nie prowadzi do żadnych inteligentnych optymalizacji.
AbstractSpliteratorale zastępujesz, trySplit()co jest złym zestawem do niczego innego Long.MAX_VALUE, ponieważ nie dostosowujesz oszacowania rozmiaru trySplit(). Następnie trySplit()oszacowanie rozmiaru powinno zostać zmniejszone o liczbę elementów, które zostały rozdzielone.