Jak przerwać BlockingQueue, które blokuje się przy take ()?


82

Mam klasę, która pobiera obiekty z a BlockingQueuei przetwarza je, wywołując take()w ciągłej pętli. W pewnym momencie wiem, że do kolejki nie zostaną dodane żadne obiekty. Jak przerwać take()metodę, aby przestała blokować?

Oto klasa przetwarzająca obiekty:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

A oto metoda, która używa tej klasy do przetwarzania obiektów:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}

Odpowiedzi:


72

Jeśli przerwanie wątku nie wchodzi w grę, innym rozwiązaniem jest umieszczenie w kolejce obiektu „znacznika” lub „polecenia”, który zostałby rozpoznany jako taki przez MyObjHandler i wyrwanie się z pętli.


40
Jest to również znane jako podejście „Poison Pill Shutdown” i jest szczegółowo omówione w „Java Concurrency in Practice”, a konkretnie na str. 155-156.
Brandon Yarbrough

14
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Jeśli jednak to zrobisz, wątek może zostać przerwany, gdy w kolejce nadal znajdują się elementy, które czekają na przetworzenie. Możesz rozważyć użycie pollzamiast take, co pozwoli wątkowi przetwarzającemu na przekroczenie limitu czasu i zakończenie go, gdy będzie czekał przez chwilę bez nowych danych wejściowych.


Tak, jest to problem, jeśli wątek zostanie przerwany, gdy w kolejce wciąż znajdują się elementy. Aby obejść ten problem, dodałem kod, aby upewnić się, że kolejka jest pusta przed przerwaniem wątku: <code> while (queue.size ()> 0) Thread.currentThread (). Sleep (5000); </code>
MCS

3
@MCS - uwaga dla przyszłych odwiedzających, że twoje podejście to hack i nie powinno być powielane w kodzie produkcyjnym z trzech powodów. Zawsze lepiej jest znaleźć rzeczywisty sposób na podpięcie wyłączenia. Nigdy nie wolno używać go Thread.sleep()jako alternatywy dla właściwego haczyka. W innych implementacjach inne wątki mogą umieszczać rzeczy w kolejce, a pętla while może nigdy się nie kończyć.
Erick Robertson

Na szczęście nie ma potrzeby polegać na takich hackach, ponieważ równie łatwo można je przetworzyć po przerwaniu, jeśli to konieczne. Na przykład „wyczerpująca” take()implementacja może wyglądać następująco: try { return take(); } catch (InterruptedException e) { E o = poll(); if (o == null) throw e; Thread.currentThread().interrupt(); return o; } Jednak nie ma powodu, aby musiała być implementowana w tej warstwie, a implementacja nieco wyżej doprowadzi do bardziej wydajnego kodu (na przykład przez unikanie elementów InterruptedExceptioni / lub używając BlockingQueue.drainTo()).
antak

13

Bardzo późno, ale mam nadzieję, że to pomoże również innym, ponieważ stanąłem przed podobnym problemem i zastosowałem pollpodejście sugerowane przez Ericksona powyżej z kilkoma drobnymi zmianami,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

To rozwiązało oba problemy

  1. Oznaczono jako, BlockingQueueaby wiedział, że nie musi dłużej czekać na elementy
  2. Nie został przerwany pomiędzy, więc bloki przetwarzania kończą się tylko wtedy, gdy wszystkie pozycje w kolejce są przetworzone i nie ma już żadnych elementów do dodania

2
Uczynić Finishedzmiennej volatilewidoczności gwarancyjnej pomiędzy wątkami. Zobacz stackoverflow.com/a/106787
lukk

2
Jeśli się nie mylę, ostatni element w kolejce nie zostanie przetworzony. Kiedy weźmiesz ostatni element z kolejki, zakończone jest prawdziwe, a kolejka jest pusta, więc powróci przed obsługą tego ostatniego elementu. Dodaj trzeci warunek, jeśli (Finished && queue.isEmpty () && obj == null)
Matt R

@MattR dzięki, trafnie poinformowałem, że
poprawię


0

Albo nie przerywaj, to paskudne.

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. kiedy producent umieszcza obiekt w kolejce, call queue.notify(), jeśli się kończy, callqueue.done()
  2. pętla while (! queue.isDone () ||! queue.isEmpty ())
  3. test take () zwraca wartość null

1
Powiedziałbym, że poprzednie rozwiązanie było czystsze i prostsze niż to
sakthisundar

1
Flaga gotowa jest taka sama jak pigułka z trucizną, po prostu jest podawana inaczej :)
David Mann

Odkurzacz? Wątpię. Nie wiesz, kiedy dokładnie wątek zostanie przerwany. A może zapytam, co w tym jest czystszego? Jest mniej kodu, to fakt.
tomasb

0

A co z

queue.add(new MyObj())

w jakimś wątku producenta, gdzie flaga stopu sygnalizuje wątkowi konsumenta zakończenie pętli while?


1
Przed udzieleniem odpowiedzi na stare pytanie z zaakceptowaną odpowiedzią (poszukaj zielonego ✓), a także innymi odpowiedziami, upewnij się, że Twoja odpowiedź dodaje coś nowego lub jest w związku z nimi pomocna. W związku z tym odpowiedzią na Twoje pytanie jest zaakceptowana odpowiedź na pytanie OP. - Format pytań i odpowiedzi Stack Overflow nie jest przeznaczony do dyskusji w stylu forum. Nie udzielaj odpowiedzi, pytając „co powiesz na X?” ponieważ nie odpowiada na pytanie OP, a raczej jest komentarzem. Proszę odnieść się do wytycznych specjalista .
Ivo Mori

... widziałem, moja wiadomość może zostać usunięta
Sam Ginrich
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.