Używanie Spark 2.4.4 działającej w trybie klastra YARN z iskrowym harmonogramem FIFO.
Przesyłam wiele operacji ramki danych Spark (tj. Zapisuję dane do S3) przy użyciu modułu wykonującego pulę wątków o zmiennej liczbie wątków. Działa to dobrze, jeśli mam ~ 10 wątków, ale jeśli użyję setek wątków, wydaje się, że jest impas, a żadne zadania nie są planowane zgodnie z interfejsem Spark.
Jakie czynniki kontrolują, ile zadań można zaplanować jednocześnie? Zasoby sterownika (np. Pamięć / rdzenie)? Jakieś inne ustawienia konfiguracji iskier?
EDYTOWAĆ:
Oto krótkie streszczenie mojego kodu
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);
Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);
List<Future<Void>> futures = listOfSeveralHundredThings
.stream()
.map(aThing -> ecs.submit(() -> {
df
.filter(col("some_column").equalTo(aThing))
.write()
.format("org.apache.hudi")
.options(writeOptions)
.save(outputPathFor(aThing));
return null;
}))
.collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();
W pewnym momencie, wraz ze nThreads
wzrostem, iskra nie wydaje się już planować żadnych zadań, o czym świadczą:
ecs.poll(...)
upłynął limit czasu- Karta Zadania interfejsu użytkownika Spark pokazuje brak aktywnych zadań
- Karta executorów interfejsu użytkownika Spark pokazująca brak aktywnych zadań dla dowolnego modułu wykonującego
- Karta SQL interfejsu użytkownika Spark wyświetlająca
nThreads
uruchomione zapytania bez uruchomionych identyfikatorów zadań
Moje środowisko wykonawcze to
- AWS EMR 5.28.1
- Spark 2.4.4
- Węzeł główny =
m5.4xlarge
- Węzły rdzenia = 3x
rd5.24xlarge
spark.driver.cores=24
spark.driver.memory=32g
spark.executor.memory=21g
spark.scheduler.mode=FIFO
jstack -l
aby uzyskać zrzut wątku z informacjami o blokowaniu.