Tworzenie blokującej kolejki Queue <T> w .NET?


163

Mam scenariusz, w którym mam wiele wątków dodających do kolejki i wiele wątków odczytujących z tej samej kolejki. Jeśli kolejka osiągnie określony rozmiar, wszystkie wątki , które wypełniają kolejkę, zostaną zablokowane przy dodawaniu, dopóki element nie zostanie usunięty z kolejki.

Poniższe rozwiązanie jest tym, czego teraz używam, a moje pytanie brzmi: Jak można to poprawić? Czy istnieje obiekt, który już włącza to zachowanie w BCL, którego powinienem używać?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}

5
.Net jak ma wbudowane klasy, które pomogą w tym scenariuszu. Większość wymienionych tutaj odpowiedzi jest nieaktualna. Zobacz najnowsze odpowiedzi na dole. Przejrzyj kolekcje blokujące bezpieczne wątkowo. Odpowiedzi mogą być nieaktualne, ale to wciąż dobre pytanie!
Tom A,

Myślę, że nadal dobrym pomysłem jest poznanie Monitor.Wait / Pulse / PulseAll, nawet jeśli mamy nowe współbieżne klasy w .NET.
thewpfguy

1
Zgadzam się z @thewpfguy. Będziesz chciał zrozumieć podstawowe mechanizmy blokowania za kulisami. Warto również zauważyć, że Systems.Collections.Concurrent nie istniał do kwietnia 2010 roku, a potem tylko w Visual Studio 2010 i nowszych. Zdecydowanie nie ma opcji dla VS2008 wstrzymujących się ...
Vic

Jeśli czytasz to teraz, spójrz na System.Threading.Channels dla implementacji tego z wieloma zapisami / wieloma czytelnikami, ograniczonej, opcjonalnie blokującej dla .NET Core i .NET Standard.
Mark Rendle

Odpowiedzi:


200

Wygląda to bardzo niebezpiecznie (bardzo mała synchronizacja); co powiesz na coś takiego:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(edytować)

W rzeczywistości chciałbyś mieć sposób na zamknięcie kolejki, aby czytelnicy zaczęli czysto wychodzić - być może coś w rodzaju flagi bool - jeśli jest ustawiona, pusta kolejka po prostu wraca (zamiast blokować):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}

1
Co powiesz na zmianę czekania na WaitAny i przekazanie zakończenia czekania na budowę ...
Sam Saffron

1
@ Marc - optymalizacją, jeśli oczekujesz, że kolejka zawsze osiąga pojemność, byłoby przekazanie wartości maxSize do konstruktora Queue <T>. Możesz dodać innego konstruktora do swojej klasy, aby się do tego dostosować.
RichardOD

3
Dlaczego SizeQueue, dlaczego nie FixedSizeQueue?
mindless.panda

4
@Lasse - zwalnia blokadę (i) w trakcie Wait, aby inne wątki mogły ją zdobyć. Po przebudzeniu odzyskuje blokadę (i).
Marc Gravell

1
Fajnie, jak powiedziałem, było coś, czego nie rozumiałem :) To z pewnością sprawia, że ​​chcę ponownie odwiedzić część mojego kodu wątku ....
Lasse V. Karlsen


14

„Jak można to poprawić?”

Cóż, musisz przyjrzeć się każdej metodzie w swojej klasie i zastanowić się, co by się stało, gdyby inny wątek jednocześnie wywoływał tę metodę lub inną metodę. Na przykład blokada jest umieszczana w metodzie Remove, ale nie w metodzie Add. Co się stanie, jeśli jeden wątek zostanie dodany w tym samym czasie co inny wątek zostanie usunięty? Złe rzeczy.

Weź również pod uwagę, że metoda może zwrócić drugi obiekt, który zapewnia dostęp do wewnętrznych danych pierwszego obiektu - na przykład GetEnumerator. Wyobraź sobie, że jeden wątek przechodzi przez ten moduł wyliczający, a inny wątek modyfikuje listę w tym samym czasie. Niedobrze.

Dobrą praktyczną zasadą jest uproszczenie tego rozwiązania poprzez zmniejszenie liczby metod w klasie do absolutnego minimum.

W szczególności nie dziedzicz innej klasy kontenera, ponieważ ujawnisz wszystkie metody tej klasy, umożliwiając wywołującemu uszkodzenie wewnętrznych danych lub zobaczenie częściowo kompletnych zmian w danych (tak samo źle, ponieważ dane wydaje się w tym momencie uszkodzony). Ukryj wszystkie szczegóły i bądź całkowicie bezlitosny w kwestii tego, jak udzielasz do nich dostępu.

Zdecydowanie radzę korzystać z gotowych rozwiązań - zdobądź książkę o wątkach lub skorzystaj z biblioteki innej firmy. W przeciwnym razie, biorąc pod uwagę to, czego próbujesz, będziesz debugować swój kod przez długi czas.

Poza tym, czy nie miałoby większego sensu, gdyby funkcja Remove zwracała element (powiedzmy, ten, który został dodany jako pierwszy, ponieważ jest to kolejka), zamiast wybierania przez dzwoniącego określonego elementu? A kiedy kolejka jest pusta, być może Usuń również powinno się zablokować.

Aktualizacja: odpowiedź Marca faktycznie uwzględnia wszystkie te sugestie! :) Ale zostawię to tutaj, ponieważ może być pomocne zrozumienie, dlaczego jego wersja jest tak ulepszona.


12

Możesz użyć BlockingCollection i ConcurrentQueue w przestrzeni nazw System.Collections.Concurrent

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}

3
BlockingCollection domyślnie Queue. Więc nie sądzę, żeby to było konieczne.
Curtis White

Czy BlockingCollection zachowuje porządkowanie jak kolejka?
joelc

Tak, kiedy jest zainicjowany przez ConcurrentQueue
Andreas

6

Właśnie podrzuciłem to za pomocą Reactive Extensions i przypomniałem sobie to pytanie:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

Niekoniecznie całkowicie bezpieczne, ale bardzo proste.


Co to jest Subject <t>? Nie mam żadnego programu rozpoznawania nazw dla jego przestrzeni nazw.
Jerm

To część rozszerzeń reaktywnych.
Mark Rendle

Nie ma odpowiedzi. To wcale nie odpowiada na pytanie.
makhdumi

5

To jest to, co przyszedłem do bezpiecznej, ograniczonej kolejki blokowania wątków.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}

Czy możesz podać przykłady kodu, w jaki sposób ustawiłbym w kolejce niektóre funkcje wątku przy użyciu tej biblioteki, w tym sposób utworzenia wystąpienia tej klasy?
Jerm

To pytanie / odpowiedź jest nieco przestarzałe. Powinieneś przyjrzeć się przestrzeni nazw System.Collections.Concurrent, aby uzyskać blokowanie obsługi kolejek.
Kevin

2

Nie do końca poznałem TPL, ale mogą mieć coś, co pasuje do twoich potrzeb lub przynajmniej trochę pożywienia Reflector, z którego można czerpać inspirację.

Mam nadzieję, że to pomoże.


Zdaję sobie sprawę, że to jest stare, ale mój komentarz jest dla nowicjuszy w SO, ponieważ OP już o tym wie. To nie jest odpowiedź, to powinien być komentarz.
John Demetriou

0

Cóż, możesz spojrzeć na System.Threading.Semaphoreklasę. Poza tym - nie, musisz to zrobić sam. AFAIK nie ma takiej wbudowanej kolekcji.


Patrzyłem na to pod kątem ograniczania liczby wątków, które uzyskują dostęp do zasobu, ale nie pozwala to na zablokowanie całego dostępu do zasobu na podstawie jakiegoś warunku (np. Collection.Count). AFAIK w każdym razie
Eric Schoonover

Cóż, robisz tę część sam, tak jak teraz. Po prostu zamiast MaxSize i _FullEvent masz Semaphore, który inicjujesz z odpowiednią liczbą w konstruktorze. Następnie, po każdym dodaniu / usunięciu wywołania WaitForOne () lub Release ().
Vilx

Nie różni się zbytnio od tego, co masz teraz. Po prostu prostsze IMHO.
Vilx

Czy możesz podać przykład pokazujący, że to działa? Nie widziałem, jak dynamicznie dostosować rozmiar semafora, czego wymaga ten scenariusz. Ponieważ musisz mieć możliwość blokowania wszystkich zasobów tylko wtedy, gdy kolejka jest pełna.
Eric Schoonover

Ach, zmiana rozmiaru! Dlaczego nie powiedziałeś tego od razu? OK, więc semafor nie jest dla ciebie. Powodzenia w takim podejściu!
Vilx

-1

Jeśli chcesz mieć maksymalną przepustowość, pozwalającą wielu czytelnikom na czytanie i tylko jednemu pisarzowi do pisania, BCL ma coś, co nazywa się ReaderWriterLockSlim, co powinno pomóc odchudzić twój kod ...


Nie chcę jednak, aby ktokolwiek mógł pisać, jeśli kolejka jest pełna.
Eric Schoonover

Więc łączysz to z zamkiem. Oto kilka bardzo dobrych przykładów albahari.com/threading/part2.aspx#_ProducerConsumerQWaitHandle albahari.com/threading/part4.aspx
DavidN

3
Dzięki queue / dequeue każdy jest pisarzem ... ekskluzywna blokada może być bardziej pragmatyczna
Marc Gravell

Zdaję sobie sprawę, że to jest stare, ale mój komentarz jest dla nowicjuszy w SO, ponieważ OP już o tym wie. To nie jest odpowiedź, to powinien być komentarz.
John Demetriou
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.