Jak mogę się upewnić, że zadanie nie zostanie uruchomione dwukrotnie w Bull?


11

Mam dwie funkcje scheduleScan()i scan().

scan()dzwoni, scheduleScan() gdy nie pozostaje nic innego, jak zaplanować nowy skan , więc scheduleScan()można zaplanować scan(). Ale jest problem, niektóre zadania uruchamiane są dwukrotnie.

Chcę mieć pewność, że w danym momencie przetwarzane jest tylko jedno zadanie. Jak mogę to osiągnąć? Wydaje mi się, że ma to coś wspólnego done()(został usunięty ze skanu (), teraz usunięty), ale nie mogłem znaleźć rozwiązania.

Wersja byka: 3.12.1

Ważna późna edycja: scan() wywołuje inne funkcje i mogą, ale nie muszą, wywoływać inne funkcje, ale wszystkie one są funkcjami synchronizacji, więc wywołują funkcję tylko po zakończeniu ich własnych zadań, jest tylko jedna droga do przodu. Na końcu „drzewa” nazywam to, ostatnia funkcja wywołuje harmonogramScan (), ale nie mogą być uruchomione dwa jednoczesne zadania. Nawiasem scan()mówiąc, każda praca zaczyna się o, a kończy nascheduleScan(stock, period, milliseconds, 'called by file.js')

export function update(job) {
  // does some calculations, then it may call scheduleScan() or
  // it may call another function, and that could be the one calling
  // scheduleScan() function.
  // For instance, a function like finalize()
}

export function scan(job) {
  update(job)
}


import moment from 'moment'
import stringHash from 'string-hash'
const opts = { redis: { port: 6379, host: '127.0.0.1', password: mypassword' } }
let queue = new Queue('scan', opts)

queue.process(1, (job) => {
  job.progress(100).then(() => {
    scan(job)
  })
})

export function scheduleScan (stock, period, milliseconds, triggeredBy) {
  let uniqueId = stringHash(stock + ':' + period)

  queue.getJob(uniqueId).then(job => {
    if (!job) {
      if (milliseconds) {
        queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
          // console.log('Added with ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      } else {
        queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
          // console.log('Added without ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      }
    } else {
      job.getState().then(state => {
        if (state === 'completed') {
          job.remove().then(() => {
            if (milliseconds) {
              queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
                // console.log('Added with ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            } else {
              queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
                // console.log('Added without ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            }
          }).catch(err => {
            if (err) {
              // console.log(err)
            }
          })
        }
      }).catch(err => {
        // console.log(err)
      })
    }
  })
}

Nie mogę znaleźć scanfunkcji, czy możesz pomóc?
Muhammad Zeeshan

@MuhammadZeeshan Dodałem go, mój błąd.
salep

Odpowiedzi:


6

Problem, jak sądzę, polega na tym, że twoja scanfunkcja jest asynchroniczna. Twoja job.progressfunkcja wywołuje, scana następnie natychmiast wywołuje, doneumożliwiając kolejce przetworzenie innego zadania.

Rozwiązaniem może być zdać donezwrotnego jako parametr do swoich scanand scheduleScanfunkcji i wywołać ją, po zakończeniu swojej pracy (lub w przypadku błędu).

Innym (lepszym) rozwiązaniem może być zapewnienie, że zawsze zwrócisz Promiseod scani scheduleScan, a następnie zaczekasz na obietnicę do rozwiązania, a następnie zadzwonisz done. Jeśli to zrobisz, upewnij się, że w łańcuchu masz wszystkie zwroty obietnic scheduleScan.

queue.process(1, (job, done) => {
  job.progress(100).then(() => {
    scan(job)
        .then(done)
        .catch(done)
  })
})

export function scan() {
   // business logic
   return scheduleScan()
}

// Chain all of your promise returns. Otherwise
// the scan function will return sooner and allow done to be called
// prior to the scheduleScan function finishing it's execution
export function scheduleScan() {
    return queue.getJob(..).then(() => {
        ....
        return queue.add()...
        ....
        return queue.add(...)
            .catch(e => {
                 console.log(e);
                 // propogate errors!
                 throw e;
             })

}

Zredagowałem moje pytanie. Czy możesz to sprawdzić jeszcze raz, zwłaszcza część „Ważna późna edycja”? Czy Twoja odpowiedź nadal obowiązuje w tej sytuacji? Dzięki.
salep

1
Tak, nadal jest ważny. Z twojej edycji, myślę, że mówisz, że scheduledScanjest zawsze wywoływany po wszystkich innych funkcjach synchronizacji scan. Jeśli tak, to tak, moja odpowiedź jest nadal aktualna. Po prostu zawsze zwracaj obietnicę, która zostanie zwrócona scheduleScanw scanfunkcji
jeeves

Znowu mój błąd. Pierwsza funkcja, update (), jest w trakcie skanowania, ale update () może wywoływać inną funkcję, np. Finalize (), a finalize () może wywoływać scheduleScan (). Pamiętaj, że dzieje się to w kolejności, więc nie ma wielu połączeń, robię to, aby zachować modułowość mojej aplikacji. - Dzięki
wyprzedażp

1
Tak, ta sama odpowiedź. Jeśli updatepołączenia scheduledScanlub dowolna liczba funkcji między nimi. Kluczową kwestią jest to, że musisz przywrócić łańcuch obietnic z scheduleScanpowrotem do scanfunkcji. Więc jeśli scanwywołania, updatektóre wywołają finalise..... Które wywołania scheduleScanłańcuch obietnicy będzie musiał zostać zwrócony przez wszystkie wywołania funkcji, tj. Upewnij się, że zwróciłeś obietnicę z każdej z tych funkcji.
jeeves

Żeby wyjaśnić mój ostatni komentarz. Na przykład, jeśli w trakcie skanowania przeprowadzasz aktualizację. Musisz zwrócić wynik aktualizacji (obietnicę) z funkcji skanowania.
jeeves

4

Funkcja skanowania jest funkcją asynchroniczną. W swojej queue.process()funkcji musisz poczekać na funkcję skanowania, a następnie done()oddzwonić.

export async function scan(job) {
  // it does some calculations, then it creates a new schedule.
  return scheduleScan(stock, period, milliseconds, "scan.js");
}

queue.process(1, (job, done) => {
  job.progress(100).then(async() => {
    await scan(job);
    done();
  });
});

export async function scheduleScan(stock, period, milliseconds, triggeredBy) {
    let uniqueId = stringHash(stock + ":" + period);
    try {
      const existingJob = await queue.getJob(uniqueId);
      if (!existingJob) {
        const job = await addJob({
          queue,
          stock,
          period,
          uniqueId,
          milliseconds,
          triggeredBy
        });
        return job;
      } else {
        const jobState = await existingJob.getState();
        if (jobState === "completed") {
          await existingJob.remove();
          const newJob = await addJob({
            queue,
            stock,
            period,
            uniqueId,
            milliseconds,
            triggeredBy
          });
          return newJob;
        }
      }
    } catch (err) {
      throw new Error(err);
    }
}

export function addJob({ queue, stock, period, milliseconds, triggeredBy }) {
  if (milliseconds) {
    return queue.add(
      { stock, period, triggeredBy },
      { delay: milliseconds, jobId: uniqueId }
    );
  } else {
    return queue.add({ stock, period, triggeredBy }, { jobId: uniqueId });
  }
}

Spróbuj tego! Próbowałem trochę przebudować kod, używając asynchronicznego oczekiwania.


Zredagowałem moje pytanie. Czy możesz to sprawdzić jeszcze raz, zwłaszcza część „Ważna późna edycja”? Czy Twoja odpowiedź nadal obowiązuje w tej sytuacji? Dzięki.
salep
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.