Jak utworzyć wiele wątków dla każdego elementu żądania


9

Próbuję przetworzyć poniższy kod przy użyciu wielowątkowości na poziomie zamówienia.

List<String> orders = Arrays.asList("order1", "order2", 
                   "order3", "order4", "order1");

Bieżące wykonywanie sekwencyjne:

orders.stream().forEach(order -> {
    rules.forEach(rule -> {
        finalList.add(beanMapper.getBean(rule)
                .applyRule(createTemplate.apply(getMetaData.apply(rule), command),
                           order));
    });
});

Próbowałem użyć:

orders.parallelStream().forEach(order -> {}} // code snippet.

Ale zmienia reguły. Dla każdego (zasada -> {}} kolejność.

Na przykład:
Input:

 List<String> orders = Arrays.asList("order1", "order2", 
                         "order3", "order4", "order1");
 List<String> rules = Arrays.asList("rule1", "rule2", "rule3");

Oczekiwany wynik:

order1 with rule1, rule2, rule3
order2 with rule1, rule2, rule3

Rzeczywista wydajność przy parallelStream():

order1 with rule3, rule1, rule2
order1 with rule2, rule1, rule3

Nie przejmuję się kolejnością zamówień , ale niepokoi mnie kolejność reguł . Zamówienia mogą być przetwarzane w dowolnej kolejności, ale reguły powinny być wykonywane w tej samej kolejności dla każdego zamówienia.

Proszę pomóż.

Odpowiedzi:


4

Możesz użyć :

orders.stream().parallel().forEachOrdered(// Your rules logic goes here. )

ForEachOrders gwarantuje utrzymanie porządku Strumienia.

W celach informacyjnych:

orders.stream().parallel().forEachOrdered( order -> {

            rules.stream().parallel().forEachOrdered ( rule -> {

                 System.out.println( " Order : " + order + " rule :" + rule);
            });

        });

Uwaga: Chociaż możemy to zrobić, wydajność należy uważnie obserwować, ponieważ parellelizm i porządek nie łączą się ze sobą bardzo dobrze!

Wynik

 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order1 rule :rule3
 Order : order2 rule :rule1
 Order : order2 rule :rule2
 Order : order2 rule :rule3
 Order : order3 rule :rule1
 Order : order3 rule :rule2
 Order : order3 rule :rule3
 Order : order4 rule :rule1
 Order : order4 rule :rule2
 Order : order4 rule :rule3
 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order1 rule :rule3

Dziękuję za odpowiedź. forEachOrders gwarantuje porządek strumienia, ale spowalnia również wydajność. Próbowałem i aplikacja zajmuje czas podobny do przetwarzania sekwencyjnego. stream () .allel & forEachOrders nie komplementują się nawzajem.
mayank bisht

Tak, zgadzam się, że musimy zrobić pełną analizę opóźnień, zanim to zrobisz.
Pramod S. Nikam,

Tak, dzięki temu uzyskuję taką samą wydajność, nie ma poprawy.
mayank bisht

1
Śledź ten wątek, aby uzyskać lepsze rozwiązanie.
Pramod S. Nikam,

Czy mogę osiągnąć równoległe przetwarzanie za pomocą ExecutorService?
mayank bisht

1

W finalListtym samym czasie dodajesz elementy z różnych wątków. Powoduje to mieszanie wyników stosowania reguł do różnych zamówień (reguły nie są grupowane według ich zamówień).

Możesz to naprawić, tworząc listę tymczasową dla każdej, ordera następnie synchronicznie łącząc wszystkie listy tymczasowe w pliku finalList.

Oto, jak możesz to zrobić za pomocą Stream-API (Java 9+):

List<AppliedRule> finalList = orders.parallelStream().map(order ->
        rules.stream().map(rule -> applyRule(order, rule)).collect(Collectors.toList())
).collect(Collectors.flatMapping(Collection::stream, Collectors.toList()));

Uwaga: Collectors.flatMapping()jest tu stosowane zamiast prostego flatMapsynchronizowania płaskiego mapowania podczas zbierania strumienia.


Java 8 analogowy:

List<AppliedRule> finalList = orders.parallelStream().map(order ->
        rules.stream().map(rule -> applyRule(order, rule)).collect(Collectors.toList())
).collect(Collectors.toList())
        .stream()
        .flatMap(Collection::stream)
        .collect(Collectors.toList());

Dziękuję za odpowiedź. Próbowałem twoje podejście i dostaję java.util.ConcurrentModificationException: null
mayank bisht

finalList = zamowienia.parallelStream () .map (order -> rules.stream () .map (rules -> beanMapper.getBean (rule) .applyRule (createTemplate.apply (getMetaData.apply (reguła), polecenie), kolejność)) .collect (Collectors.toList ())). collect (Collectors.toList ()). stream (). flatMap (Collection :: stream) .collect (Collectors.toList ());
mayank bisht

@ mayankbisht, oznacza to, że beanMapper.getBean(rule) .applyRule(createTemplate.apply(getMetaData.apply(rule), command), order)nie jest to czysta funkcja, więc nie można jej używać równolegle. Spróbuj usunąć z niego wszystkie skutki uboczne; ConcurrentModificationExceptionśledzenie stosu może pomóc w ich zlokalizowaniu.
Bananon,

0

Czy to zadziała?

final int rulesSize = rules.size();
AtomicInteger atomicInteger = new AtomicInteger(0);
orders.stream().parallel().forEach(order -> {
    IntStream.range(0, rulesSize).parallel().forEach( i -> {
        synchronized (atomicInteger) {
            System.out.println(" Order : " + order + " rule :" + rules.get(atomicInteger.getAndIncrement() % rulesSize));
        }
    });
});

Wynik

 Order : order1 rule :rule1
 Order : order4 rule :rule2
 Order : order1 rule :rule3
 Order : order3 rule :rule1
 Order : order3 rule :rule2
 Order : order3 rule :rule3
 Order : order2 rule :rule1
 Order : order2 rule :rule2
 Order : order2 rule :rule3
 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order4 rule :rule3
 Order : order1 rule :rule1
 Order : order4 rule :rule2
 Order : order1 rule :rule3

0

Kolejność zamówień może być dowolna, ale kolejność reguł nie powinna się zmieniać. Również dla konkretnego zamówienia reguła powinna przychodzić w grupie.

W takim przypadku nie ma miejsca na faktyczną równoległość.

Kiedy

order1-rule1
order1-rule2
order2-rule1
order2-rule2

i

order2-rule1
order2-rule2
order1-rule1
order1-rule2

są jedynymi prawidłowymi przebiegami dla 2 zamówień i 2 reguł,
oraz

order1-rule1
order2-rule1
order1-rule2
order2-rule2

jest uważany za nieważny, to nie jest paralelizm, tylko randomizacja orders, prawdopodobnie bez wzmocnienia. Jeśli „nudzi cię” ciągłe order1zajmowanie pierwszego miejsca, możesz przetasować listę, ale to wszystko:

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    Collections.shuffle(orders);
    orders.forEach(order->{
        rules.forEach(rule->{
            System.out.println(order+"-"+rule);
        });
    });
}

Nawet transmisja strumieniowa nie jest konieczna, tylko dwie zagnieżdżone pętle. Test: https://ideone.com/qI3dqd

order2-rule1
order2-rule2
order2-rule3
order4-rule1
order4-rule2
order4-rule3
order1-rule1
order1-rule2
order1-rule3
order3-rule1
order3-rule2
order3-rule3


Oryginalna odpowiedź

Ale zmienia reguły. Dla każdego (zasada -> {}} kolejność.

Nie. W ordery mogą zachodzić na siebie, a kolejność ruleS dla każdego rzędu są przechowywane. Dlaczego nierównoległość miałaby forEachrobić cokolwiek innego?

Przykładowy kod:

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    orders.stream().parallel().forEach(order->{
        rules.forEach(rule->{
            System.out.println(order+"-"+rule);
        });
    });
}

Test: https://ideone.com/95Cybg
Przykładowy wynik:

order2-rule1
order2-rule2
order2-rule3
order1-rule1
order1-rule2
order1-rule3
order4-rule1
order4-rule2
order4-rule3
order3-rule1
order3-rule2
order3-rule3

Kolejność orders jest mieszana, ale rules są zawsze 1-2-3. Myślę, że twój wynik po prostu ukrył pary (w rzeczywistości nie pokazałeś, jak został wygenerowany).

Oczywiście można go rozszerzyć z pewnymi opóźnieniami, więc przetwarzanie orders faktycznie się nakłada:

public static void delay(){
    try{
        Thread.sleep(ThreadLocalRandom.current().nextInt(100,300));
    }catch(Exception ex){}
}

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    orders.stream().parallel().forEach(order->{
        rules.forEach(rule->{
            delay();
            System.out.println(order+"-"+rule);
        });
    });
}

Test: https://ideone.com/cSFaqS
Przykładowy wynik:

order3-rule1
order2-rule1
order2-rule2
order3-rule2
order3-rule3
order2-rule3
order1-rule1
order4-rule1
order1-rule2
order4-rule2
order4-rule3
order1-rule3

To może być coś, co widziałeś, tylko bez tej orderxczęści. Z orderwidocznymi s można śledzić, że ruleciągle pojawiają się jako 1-2-3, naorder . Ponadto twoja przykładowa lista zawierała order1dwa razy, co z pewnością nie pomogło zobaczyć, co się dzieje.


Dziękuję za odpowiedź. Powyższe dane wyjściowe mogą być poprawne dla mniejszej liczby zamówień. Ale jeśli zwiększysz zamówienia, otrzymasz inną wydajność. Na przykład (kolejność4-zasada1 kolejność4-zasada2 kolejność4-zasada1) (kolejność1-zasada1 kolejność1-zasada2) (kolejność3-zasada1 kolejność3-zasada2) (kolejność4-zasada1 kolejność4-zasada2 kolejność4-zasada1 kolejność4-zasada2).
mayank bisht

Kolejność zamówień może być dowolna, ale kolejność reguł nie powinna się zmieniać. Również dla konkretnego zamówienia reguła powinna przychodzić w grupie. Np. (zamówienie1-zasada 1 zamówienie1-zasada2 zamówienie1-zasada3) i nie (zamówienie1-zasada1 zamówienie2-zasada1 zamówienie1-zasada2 zamówienie1-zasada3).)
mayank bisht 24.12.19

@ mayankbisht Myślę, że te ograniczenia po prostu nie zezwalają na przetwarzanie równoległe. Zobacz zaktualizowaną odpowiedź (nową część napisałem na początku).
tevemadar

Tak, rozumiem to i dlatego opublikowałem tutaj to pytanie. Myślałem, że może będzie inny sposób na zrobienie tego, a może możemy zmienić algo
mayank bisht

@ mayankbisht możesz opisać, dlaczego orders nie może się nakładać ( rulebyć może są one stanowe i istnieją w ograniczonej liczbie kopii, być może tylko w jednym?). Ale generalnie nie ma paralelizmu bez rzeczy biegnących równolegle, to przecież cały punkt równoległości.
tevemadar

0

Jeśli nie masz nic przeciwko wypróbowaniu biblioteki innej firmy. Oto przykład z moją biblioteką: abacus-util

StreamEx.of(orders).parallelStream().forEach(order -> {}}

Możesz nawet podać numer wątku:

StreamEx.of(orders).parallelStream(maxThreadNum).forEach(order -> {}}

Kolejność rulezostanie zachowana.

Nawiasem mówiąc, ponieważ jest to strumień równoległy, fragment kodu ...finalList.add(...najprawdopodobniej nie zadziała. Myślę, że lepiej jest zebrać wynik na liście:

StreamEx.of(orders).parallelStream().map/flatMap(order -> {...}}.toList()

jest to również wykonalne, nawet jeśli z orderjakiegoś powodu chcesz zachować kolejność :

StreamEx.of(orders).indexed().parallelStream()
      .map/flatMap(order -> {...}}.sortedBy(...index).toList()
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.