Zagnieżdżanie czeka w Parallel.ForEach


183

W aplikacji metra muszę wykonać wiele połączeń WCF. Istnieje wiele połączeń, które należy wykonać, więc muszę je wykonywać w pętli równoległej. Problem polega na tym, że pętla równoległa kończy się przed zakończeniem wszystkich wywołań WCF.

Jak zmieniłbyś to tak, aby działało zgodnie z oczekiwaniami?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

Odpowiedzi:


172

Cała idea Parallel.ForEach()polega na tym, że masz zestaw wątków, a każdy wątek przetwarza część kolekcji. Jak zauważyłeś, nie działa to z async- await, gdzie chcesz zwolnić wątek na czas trwania wywołania asynchronicznego.

Możesz to „naprawić”, blokując ForEach()wątki, ale to niweczy cały punkt async- await.

Zamiast tego możesz użyć przepływu danych TPLParallel.ForEach() , który Taskdobrze obsługuje asynchroniczne s.

Konkretnie, twój kod może być napisany przy użyciu, TransformBlockktóry przekształca każdy identyfikator w Customerprzy użyciu asynclambda. Blok ten można skonfigurować do wykonywania równoległego. Łączyłbyś ten blok z pismem, ActionBlockktóry zapisuje każdy Customerdo konsoli. Po skonfigurowaniu sieci bloków możesz Post()przypisać każdy identyfikator do TransformBlock.

W kodzie:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Chociaż prawdopodobnie chcesz ograniczyć paralelizm tej TransformBlockmałej stałej. Ponadto możesz ograniczyć pojemność TransformBlocki dodać do niej elementy asynchronicznie SendAsync(), na przykład, jeśli kolekcja jest zbyt duża.

Dodatkową korzyścią w porównaniu z twoim kodem (jeśli zadziałał) jest to, że pisanie rozpocznie się, gdy tylko pojedynczy element zostanie zakończony, i nie czekaj, aż całe przetwarzanie zostanie zakończone.


2
Bardzo krótki przegląd asynchronicznych, reaktywnych rozszerzeń, TPL i TPL DataFlow - vantsuyoshi.wordpress.com/2012/01/05/... dla tych jak ja, którzy mogą potrzebować pewnej przejrzystości.
Norman H

1
Jestem prawie pewien, że ta odpowiedź NIE równolegle przetwarzania. Uważam, że musisz zrobić Parallel.ForEach nad identyfikatorami i opublikować je w getCustomerBlock. Przynajmniej to znalazłem, kiedy przetestowałem tę sugestię.
JasonLind,

4
@JasonLind To naprawdę działa. Korzystanie Parallel.ForEach()z Post()przedmiotów równolegle nie powinno mieć żadnego rzeczywistego efektu.
svick,

1
@svick Ok Znalazłem, ActionBlock musi być równoległy. Robiłem to nieco inaczej, nie potrzebowałem transformacji, więc po prostu użyłem bufora i wykonałem swoją pracę w ActionBlock. Zdezorientowały mnie inne odpowiedzi na interwebach.
JasonLind,

2
Rozumiem przez to określenie MaxDegreeOfParallelism na ActionBlock, tak jak ty na TransformBlock w twoim przykładzie
JasonLind

125

Odpowiedź Svicka jest (jak zwykle) doskonała.

Uważam jednak, że przepływ danych jest bardziej przydatny, gdy faktycznie masz duże ilości danych do przesłania. Lub gdy potrzebujesz asynckolejki kompatybilnej.

W twoim przypadku prostszym rozwiązaniem jest po prostu użycie asyncrównoległości stylu:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();

13
Jeśli chcesz ręcznie ograniczyć równoległość (co najprawdopodobniej zrobisz w tym przypadku), zrobienie tego w ten sposób byłoby bardziej skomplikowane.
sick

1
Ale masz rację, że przepływ danych może być dość skomplikowany (na przykład w porównaniu z Parallel.ForEach()). Ale myślę, że obecnie jest to najlepsza opcja do wykonywania prawie każdej asyncpracy z kolekcjami.
sick

1
@JamesManning, w jaki sposób ParallelOptionspomożesz? Ma zastosowanie tylko do tych Parallel.For/ForEach/Invoke, które zgodnie z ustalonym PO nie mają tu zastosowania.
Ohad Schneider

1
@StephenCleary Jeśli GetCustomermetoda zwraca a Task<T>, Czy należy jej używać Select(async i => { await repo.GetCustomer(i);});?
Shyju

5
@batmaci: Parallel.ForEachnie obsługuje async.
Stephen Cleary

81

Używanie DataFlow zgodnie z sugestią Svicka może być przesadą, a odpowiedź Stephena nie zapewnia środków do kontrolowania współbieżności operacji. Można to jednak osiągnąć po prostu:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

Te ToArray()połączenia mogą być optymalizowane przy użyciu tablicę zamiast listy i zastąpienie wykonanych zadań, ale wątpię, by to zrobić wielkiej różnicy w większości scenariuszy. Przykładowe użycie według pytania PO:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

EDIT Inny użytkownik SO i kreator TPL, Eli Arbel, wskazał mi powiązany artykuł Stephena Touba . Jak zwykle jego implementacja jest zarówno elegancka, jak i wydajna:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });

        })); 
}

1
@RichardPierre faktycznie to przeciążenie Partitioner.Createwykorzystuje partycjonowanie porcji, które zapewnia elementy dynamicznie do różnych zadań, więc opisany scenariusz się nie wydarzy . Należy również pamiętać, że statyczne (wcześniej określone) partycjonowanie może być szybsze w niektórych przypadkach ze względu na mniejszy narzut (szczególnie synchronizację). Aby uzyskać więcej informacji, zobacz: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .
Ohad Schneider,

1
@OhadSchneider W // obserwuj wyjątki, jeśli spowoduje to wyjątek, czy będzie to oznaczać bąbelek dzwoniącego? Na przykład, jeśli chciałbym, aby cały wylicznik przestał przetwarzać / zawodzić, jeśli jakaś jego część zawiodła?
Terry,

3
@Terry spowoduje to przeniesienie do dzwoniącego w tym sensie, że najwyższe zadanie (utworzone przez Task.WhenAll) będzie zawierać wyjątek (wewnątrz an AggregateException), a w konsekwencji, jeśli wspomniany dzwoniący użyje await, wyjątek zostanie zgłoszony w witrynie wywołującej. Jednak Task.WhenAllnadal będzie czekać na zakończenie wszystkich zadań i GetPartitionsdynamicznie przydziela elementy po partition.MoveNextwywołaniu, dopóki nie pozostanie więcej elementów do przetworzenia. Oznacza to, że dopóki nie dodasz własnego mechanizmu zatrzymującego przetwarzanie (np. CancellationToken), Nie stanie się to samo.
Ohad Schneider,

1
@ gibbocool Nadal nie jestem pewien, czy śledzę. Załóżmy, że masz w sumie 7 zadań z parametrami określonymi w komentarzu. Ponadto załóżmy, że pierwsza partia zajmuje okazjonalnie 5 sekundowe zadanie i trzy 1 sekundowe zadanie. Po około sekundzie 5-sekundowe zadanie będzie nadal wykonywane, a trzy 1-sekundowe zadania zostaną zakończone. W tym momencie rozpoczną się pozostałe trzy 1-sekundowe zadania (zostaną dostarczone przez partycjoner do trzech „wolnych” wątków).
Ohad Schneider,

2
@MichaelFreidgeim możesz zrobić coś takiego jak var current = partition.Currentwcześniej, await bodya następnie użyć currentw kontynuacji ( ContinueWith(t => { ... }).
Ohad Schneider

43

Możesz zaoszczędzić wysiłek dzięki nowemu pakietowi NuGet AsyncEnumerator , który nie istniał 4 lata temu, kiedy pytanie zostało pierwotnie opublikowane. Pozwala kontrolować stopień równoległości:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

Oświadczenie: Jestem autorem biblioteki AsyncEnumerator, która jest oprogramowaniem typu open source i jest licencjonowana na MIT, i wysyłam tę wiadomość, aby pomóc społeczności.


11
Siergiej, powinieneś ujawnić, że jesteś autorem biblioteki
Michael Freidgeim

5
ok, dodano zastrzeżenie. Nie szukam żadnych korzyści z reklamowania go, chcę tylko pomóc ludziom;)
Serge Semenov

Twoja biblioteka nie jest kompatybilna z .NET Core.
Corniel Nobel

2
@CornielNobel, jest kompatybilny z .NET Core - kod źródłowy w GitHub ma zasięg testowy zarówno dla .NET Framework, jak i .NET Core.
Serge Semenov

1
@SergeSemenov Często korzystałem z twojej biblioteki AsyncStreamsi muszę powiedzieć, że jest doskonała. Nie mogę wystarczająco polecić tej biblioteki.
WBuck

16

Zawiń Parallel.Foreachw a Task.Run()i zamiast awaitsłowa kluczowego użyj[yourasyncmethod].Result

(musisz wykonać zadanie Task.Run, aby nie blokować wątku interfejsu użytkownika)

Coś takiego:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;

3
W czym problem? Zrobiłbym to dokładnie tak. Pozwolić Parallel.ForEachdo pracy równoległej, która blokuje aż wszystkie są zrobione, a następnie wcisnąć całą rzecz do wątku tła mieć czułe UI. Masz z tym jakieś problemy? Może to za dużo jednego śpiącego wątku, ale jest to krótki, czytelny kod.
ygoe

@LonelyPixel Moim jedynym problemem jest to, że dzwoni, Task.Runkiedy TaskCompletionSourcejest to preferowane.
Gusdor

1
@Gusdor Curious - dlaczego jest TaskCompletionSourcelepszy?
Seafish

@Seafish Dobre pytanie, na które chciałbym odpowiedzieć. To musiał być ciężki dzień: D
Gusdor,

Tylko krótka aktualizacja. Właśnie tego szukałem, przewinąłem w dół, aby znaleźć najprostsze rozwiązanie i ponownie znalazłem własny komentarz. Użyłem dokładnie tego kodu i działa on zgodnie z oczekiwaniami. Zakłada tylko, że w pętli znajduje się wersja synchronizacji oryginalnych wywołań Async. awaitmożna przesunąć z przodu, aby zapisać nazwę dodatkowej zmiennej.
ygoe

7

Powinno to być dość wydajne i łatwiejsze niż uruchomienie całego przepływu danych TPL:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}

Nie powinny przykład wykorzystanie wykorzystanie awaittakich jak: var customers = await ids.SelectAsync(async i => { ... });?
Paccc

5

Jestem trochę spóźniony na imprezę, ale możesz rozważyć użycie GetAwaiter.GetResult (), aby uruchomić kod asynchroniczny w kontekście synchronizacji, ale tak równolegle, jak poniżej;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});

5

Metoda rozszerzenia dla tego, która wykorzystuje SemaphoreSlim, a także pozwala ustawić maksymalny stopień równoległości

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Przykładowe użycie:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

5

Po wprowadzeniu szeregu metod pomocniczych będziesz mógł wykonywać równoległe zapytania za pomocą tej prostej składni:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

Dzieje się tak: dzielimy kolekcję źródeł na 10 części ( .Split(DegreeOfParallelism)), a następnie uruchamiamy 10 zadań, przetwarzając poszczególne elementy jeden po drugim ( .SelectManyAsync(...)) i scalamy je z powrotem w jedną listę.

Warto wspomnieć, że istnieje prostsze podejście:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

Ale wymaga to ostrożności : jeśli masz zbyt dużą kolekcję źródeł, Taskod razu zaplanuje dla każdego elementu, co może spowodować znaczny spadek wydajności.

Metody rozszerzenia stosowane w powyższych przykładach wyglądają następująco:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.