Czy istnieje coś takiego jak asynchroniczne BlockingCollection <T>?


86

Chciałbym awaitna wyniku BlockingCollection<T>.Take()asynchronicznie więc nie blokować wątku. Szukasz czegoś takiego:

var item = await blockingCollection.TakeAsync();

Wiem, że mógłbym to zrobić:

var item = await Task.Run(() => blockingCollection.Take());

ale to trochę zabija cały pomysł, ponieważ ThreadPoolzamiast tego blokowany jest inny wątek (of ).

Czy jest jakaś alternatywa?


2
Nie rozumiem, jeśli użyjesz await Task.Run(() => blockingCollection.Take())zadania, zostanie ono wykonane w innym wątku, a Twój wątek interfejsu użytkownika nie zostanie zablokowany, czy nie o to chodzi?
Selman Genç

8
@ Selman22, to nie jest aplikacja interfejsu użytkownika. Jest to biblioteka eksportująca TaskAPI. Może być używany na przykład z poziomu ASP.NET. Kod, o którym mowa, nie byłby tam dobrze skalowany.
avo

Czy nadal byłby problem, gdyby ConfigureAwaitzostał użyty po Run()? [red. nieważne, teraz rozumiem, o czym mówisz]
MojoFilter

Odpowiedzi:


96

Znam cztery alternatywy.

Pierwsza z nich to kanały , która zapewnia bezpieczną wątkowo kolejkę, która obsługuje operacje asynchroniczne Readi Writeoperacje. Kanały są wysoce zoptymalizowane i opcjonalnie obsługują upuszczanie niektórych elementów po osiągnięciu progu.

Następny pochodzi BufferBlock<T>z TPL Dataflow . Jeśli masz tylko jednego konsumenta, możesz użyć OutputAvailableAsynclub ReceiveAsynclub po prostu połączyć go z plikiem ActionBlock<T>. Więcej informacji znajdziesz na moim blogu .

Ostatnie dwa to typy, które utworzyłem, dostępne w mojej bibliotece AsyncEx .

AsyncCollection<T>jest asyncprawie odpowiednikiem BlockingCollection<T>, zdolnym do pakowania równoległej kolekcji producenta / konsumenta, takiej jak ConcurrentQueue<T>lub ConcurrentBag<T>. Możesz użyć TakeAsyncdo asynchronicznego korzystania z elementów z kolekcji. Więcej informacji znajdziesz na moim blogu .

AsyncProducerConsumerQueue<T>jest asynckolejką producent / konsument bardziej kompatybilną z przenośnymi . Możesz użyć DequeueAsyncdo asynchronicznego zużywania elementów z kolejki. Więcej informacji znajdziesz na moim blogu .

Ostatnie trzy z tych alternatyw umożliwiają synchroniczne i asynchroniczne operacje typu put and take.


12
Odnośnik do Git Hub informujący o
Paul

Dokumentacja API zawiera metodę AsyncCollection.TryTakeAsync, ale nie mogę jej znaleźć w pobranej Nito.AsyncEx.Coordination.dll 5.0.0.0(najnowszej wersji). Plik Nito.AsyncEx.Concurrent.dll, do którego się odwołuje , nie istnieje w pakiecie . czego mi brakuje?
Theodor Zoulias

@TheodorZoulias: Ta metoda została usunięta w wersji 5. Dokumentacja API v5 jest tutaj .
Stephen Cleary

Oh dziękuję. Wygląda na to, że był to najłatwiejszy i najbezpieczniejszy sposób wyliczenia kolekcji. while ((result = await collection.TryTakeAsync()).Success) { }. Dlaczego został usunięty?
Theodor Zoulias

1
@TheodorZoulias: Ponieważ „spróbuj” oznacza różne rzeczy dla różnych ludzi. Myślę o ponownym dodaniu metody „Try”, ale w rzeczywistości miałaby ona inną semantykę niż metoda oryginalna. Patrząc również na obsługę strumieni asynchronicznych w przyszłej wersji, która byłaby zdecydowanie najlepszą metodą zużycia, gdy jest obsługiwana.
Stephen Cleary

21

... lub możesz to zrobić:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}

Prosta, w pełni funkcjonalna asynchroniczna kolejka FIFO.

Uwaga: SemaphoreSlim.WaitAsynczostał dodany wcześniej w .NET 4.5, nie było to wcale takie proste.


2
Jaki jest pożytek z nieskończoności for? jeśli semafor zostanie zwolniony, kolejka ma co najmniej jedną pozycję do usunięcia z kolejki, nie?
Blendester

2
@Blendester może wystąpić sytuacja wyścigu, jeśli zablokowanych jest wielu konsumentów. Nie możemy być pewni, że nie ma co najmniej dwóch konkurujących ze sobą konsumentów i nie wiemy, czy obu z nich uda się obudzić, zanim zdążą zdekonować przedmiot. W przypadku wyścigu, jeśli nie zdąży się zderzyć, wróci do snu i będzie czekał na kolejny sygnał.
John Leidegren

Jeśli dwóch lub więcej konsumentów przejdzie przez WaitAsync (), w kolejce znajduje się równoważna liczba elementów, a zatem zawsze będą one pomyślnie usuwane z kolejki. Czy coś mi brakuje?
mindcruzer

2
To jest kolekcja blokująca, której semantyka TryDequeuejest, zwraca wartość lub w ogóle nie zwraca. Technicznie rzecz biorąc, jeśli masz więcej niż 1 czytnik, ten sam czytelnik może skonsumować dwie (lub więcej) pozycje, zanim jakikolwiek inny czytelnik całkowicie się obudzi. Sukces WaitAsyncto tylko sygnał, że w kolejce mogą znajdować się przedmioty do spożycia, nie jest to gwarancja.
John Leidegren,

@JohnLeidegren If the value of the CurrentCount property is zero before this method is called, the method also allows releaseCount threads or tasks blocked by a call to the Wait or WaitAsync method to enter the semaphore.z docs.microsoft.com/en-us/dotnet/api/… W jaki sposób udane WaitAsyncnie ma elementów w kolejce? Jeśli uwolnienie N budzi więcej niż N konsumentów, semaphoreto jest zepsute. Prawda?
Ashish Negi

4

Oto bardzo podstawowa implementacja, BlockingCollectionktóra obsługuje oczekiwanie, z wieloma brakującymi funkcjami. Korzysta z AsyncEnumerablebiblioteki, która umożliwia asynchroniczne wyliczanie dla wersji C # starszych niż 8,0.

public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
    private Queue<T> _queue = new Queue<T>();
    private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
    private int _consumersCount = 0;
    private bool _isAddingCompleted;

    public void Add(T item)
    {
        lock (_queue)
        {
            if (_isAddingCompleted) throw new InvalidOperationException();
            _queue.Enqueue(item);
        }
        _semaphore.Release();
    }

    public void CompleteAdding()
    {
        lock (_queue)
        {
            if (_isAddingCompleted) return;
            _isAddingCompleted = true;
            if (_consumersCount > 0) _semaphore.Release(_consumersCount);
        }
    }

    public IAsyncEnumerable<T> GetConsumingEnumerable()
    {
        lock (_queue) _consumersCount++;
        return new AsyncEnumerable<T>(async yield =>
        {
            while (true)
            {
                lock (_queue)
                {
                    if (_queue.Count == 0 && _isAddingCompleted) break;
                }
                await _semaphore.WaitAsync();
                bool hasItem;
                T item = default;
                lock (_queue)
                {
                    hasItem = _queue.Count > 0;
                    if (hasItem) item = _queue.Dequeue();
                }
                if (hasItem) await yield.ReturnAsync(item);
            }
        });
    }
}

Przykład użycia:

var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(100);
        abc.Add(i);
    }
    abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
    await abc.GetConsumingEnumerable().ForEachAsync(async item =>
    {
        await Task.Delay(200);
        await Console.Out.WriteAsync(item + " ");
    });
});
await Task.WhenAll(producer, consumer);

Wynik:

1 2 3 4 5 6 7 8 9 10


Aktualizacja: wraz z wydaniem języka C # 8 asynchroniczne wyliczanie stało się funkcją języka wbudowanego. Wymagane klasy ( IAsyncEnumerable, IAsyncEnumerator) są osadzone w .NET Core 3.0 i są oferowane jako pakiet dla .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces ).

Oto alternatywna GetConsumingEnumerableimplementacja z nową składnią C # 8:

public async IAsyncEnumerable<T> GetConsumingEnumerable()
{
    lock (_queue) _consumersCount++;
    while (true)
    {
        lock (_queue)
        {
            if (_queue.Count == 0 && _isAddingCompleted) break;
        }
        await _semaphore.WaitAsync();
        bool hasItem;
        T item = default;
        lock (_queue)
        {
            hasItem = _queue.Count > 0;
            if (hasItem) item = _queue.Dequeue();
        }
        if (hasItem) yield return item;
    }
}

Zwróć uwagę na współistnienie awaiti yieldw tej samej metodzie.

Przykład użycia (C # 8):

var consumer = Task.Run(async () =>
{
    await foreach (var item in abc.GetConsumingEnumerable())
    {
        await Task.Delay(200);
        await Console.Out.WriteAsync(item + " ");
    }
});

Zwróć uwagę na awaitprzed foreach.


1
Po namyśle myślę teraz, że nazwa klasy AsyncBlockingCollectionjest bezsensowna. Coś nie może być jednocześnie asynchroniczne i blokujące, ponieważ te dwie koncepcje są dokładnymi przeciwieństwami!
Theodor Zoulias

0

Jeśli nie masz nic przeciwko drobnemu włamaniu, możesz wypróbować te rozszerzenia.

public static async Task AddAsync<TEntity>(
    this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt)
{
    while (true)
    {
        try
        {
            if (Bc.TryAdd(item, 0, abortCt))
                return;
            else
                await Task.Delay(100, abortCt);
        }
        catch (Exception)
        {
            throw;
        }
    }
}

public static async Task<TEntity> TakeAsync<TEntity>(
    this BlockingCollection<TEntity> Bc, CancellationToken abortCt)
{
    while (true)
    {
        try
        {
            TEntity item;

            if (Bc.TryTake(out item, 0, abortCt))
                return item;
            else
                await Task.Delay(100, abortCt);
        }
        catch (Exception)
        {
            throw;
        }
    }
}

Więc wprowadzasz sztuczne opóźnienie, aby było asynchroniczne? Nadal się blokuje, prawda?
nawfal
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.