Jak czekać na kilka kontraktów futures?


86

Załóżmy, że mam kilka przyszłości i muszę poczekać, aż którakolwiek z nich zawiedzie lub wszystkie odniosą sukces.

Na przykład niech istnieją 3 futures: f1, f2, f3.

  • Jeśli się f1powiedzie i f2zawiedzie, nie czekam na f3(i zwracam błąd klientowi).

  • Jeśli f2zawiedzie podczas f1i f3nadal działają, nie czekam na nie (i zwracam błąd )

  • Jeśli się f1powiedzie, a potem się f2powiedzie, nadal czekam f3.

Jak byś to zaimplementował?


kwestia Scala dotycząca tego pytania. Issues.scala-lang.org/browse/SI-8994 API powinno mieć opcję dla różnych zachowań
WeiChing 林 煒 清

Odpowiedzi:


83

Zamiast tego możesz użyć zrozumienia w następujący sposób:

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

W tym przykładzie futures 1, 2 i 3 są uruchamiane równolegle. Następnie, w celu zrozumienia, czekamy, aż dostępne będą wyniki 1, a następnie 2 i 3. Jeśli 1 lub 2 zawiedzie, nie będziemy już czekać na 3. Jeśli wszystkie 3 się powiedzie, aggFutval będzie trzymał krotkę z 3 miejscami, odpowiadającymi wynikom 3 futures.

Teraz, jeśli potrzebujesz zachowania, w którym chcesz przestać czekać, jeśli powiedz, że fut2 zawodzi jako pierwszy, sprawy stają się trochę trudniejsze. W powyższym przykładzie musisz poczekać, aż fut1 się zakończy, zanim zorientujesz się, że fut2 się nie powiodło. Aby rozwiązać ten problem, możesz spróbować czegoś takiego:

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

Teraz działa to poprawnie, ale problem polega na tym, że wiemy, które Futurez nich usunąć z Mappomyślnego ukończenia. Tak długo, jak masz jakiś sposób na poprawne skorelowanie wyniku z Przyszłością, która zrodziła ten wynik, wtedy coś takiego działa. Po prostu rekurencyjnie usuwa ukończone kontrakty futures z mapy, a następnie wywołuje Future.firstCompletedOfpozostałe, Futuresdopóki ich nie ma, zbierając wyniki po drodze. To nie jest ładne, ale jeśli naprawdę potrzebujesz zachowania, o którym mówisz, to lub coś podobnego może zadziałać.


Dziękuję Ci. Co się stanie, jeśli fut2wcześniej się nie uda fut1? Czy fut1w takim razie nadal będziemy czekać ? Jeśli będziemy, to nie jest dokładnie to, czego chcę.
Michael

Ale jeśli 3 zawiedzie jako pierwsze, nadal czekamy na 1 i 2, kiedy możemy wrócić wcześniej. Jakikolwiek sposób na zrobienie tego bez konieczności sekwencjonowania przyszłości?
The Archetypal Paul

Można zainstalować onFailureobsługi za fut2szybko się nie uda, a onSuccessna aggFutsukces uchwytu. Sukces na aggFutimplies fut2zakończył się pomyślnie, więc masz tylko jednego z wywoływanych programów obsługi.
pagoda_5b

Dodałem trochę więcej do mojej odpowiedzi, aby pokazać możliwe rozwiązanie szybkiego niepowodzenia, jeśli któraś z przyszłości zawiedzie.
cmbaxter

1
W pierwszym przykładzie 1 2 i 3 nie działają równolegle, a następnie działają szeregowo. Wypróbuj z printlines i zobacz
bwawok

35

Możesz użyć obietnicy i wysłać do niej albo pierwszą porażkę, albo ostatni zakończony, zagregowany sukces:

def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
  val p = Promise[M[A]]()

  // the first Future to fail completes the promise
  in.foreach(_.onFailure{case i => p.tryFailure(i)})

  // if the whole sequence succeeds (i.e. no failures)
  // then the promise is completed with the aggregated success
  Future.sequence(in).foreach(p trySuccess _)

  p.future
}

Następnie możesz to zrobić Await, Futurejeśli chcesz zablokować, lub po prostu mapzrobić coś innego.

Różnica w przypadku dla zrozumienia polega na tym, że tutaj otrzymujesz błąd pierwszego niepowodzenia, podczas gdy w przypadku zrozumienia otrzymujesz pierwszy błąd w kolejności przechodzenia kolekcji wejściowej (nawet jeśli pierwszy zawiódł). Na przykład:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order

I:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)

7

Oto rozwiązanie bez użycia aktorów.

import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger

// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
  val remaining = new AtomicInteger(fs.length)

  val p = promise[T]

  fs foreach {
    _ onComplete {
      case s @ Success(_) => {
        if (remaining.decrementAndGet() == 0) {
          // Arbitrarily return the final success
          p tryComplete s
        }
      }
      case f @ Failure(_) => {
        p tryComplete f
      }
    }
  }

  p.future
}

5

Możesz to zrobić tylko z futures. Oto jedna implementacja. Zauważ, że nie spowoduje to wcześniejszego zakończenia wykonywania! W takim przypadku musisz zrobić coś bardziej wyrafinowanego (i prawdopodobnie samodzielnie zaimplementować przerwę). Ale jeśli po prostu nie chcesz czekać na coś, co nie zadziała, kluczem jest czekanie na zakończenie pierwszej rzeczy i zatrzymanie się, gdy nic nie zostanie lub trafisz w wyjątek:

import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global

@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): 
Either[Throwable, Seq[A]] = {
  val first = Future.firstCompletedOf(fs)
  Await.ready(first, Duration.Inf).value match {
    case None => awaitSuccess(fs, done)  // Shouldn't happen!
    case Some(Failure(e)) => Left(e)
    case Some(Success(_)) =>
      val (complete, running) = fs.partition(_.isCompleted)
      val answers = complete.flatMap(_.value)
      answers.find(_.isFailure) match {
        case Some(Failure(e)) => Left(e)
        case _ =>
          if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
          else Right( answers.map(_.get) ++: done )
      }
  }
}

Oto przykład tego w akcji, gdy wszystko działa dobrze:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))

Ale kiedy coś pójdzie nie tak:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); throw new Exception("boo"); () }, 
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)

scala> Bye!

1
Niezła realizacja. Ale uwaga, że jeśli przejdzie pusty ciąg futures do awaitSuccess, to czeka na zawsze ...
Michael Rueegg

5

W tym celu użyłbym aktora Akka. W przeciwieństwie do zrozumienia, zawodzi, gdy tylko zawodzi któraś z przyszłości, więc jest nieco bardziej wydajna w tym sensie.

class ResultCombiner(futs: Future[_]*) extends Actor {

  var origSender: ActorRef = null
  var futsRemaining: Set[Future[_]] = futs.toSet

  override def receive = {
    case () =>
      origSender = sender
      for(f <- futs)
        f.onComplete(result => self ! if(result.isSuccess) f else false)
    case false =>
      origSender ! SomethingFailed
    case f: Future[_] =>
      futsRemaining -= f
      if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
  }

}

sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result

Następnie stwórz aktora, wyślij do niego wiadomość (aby wiedział, do kogo wysłać odpowiedź) i poczekaj na odpowiedź.

val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
  val f4: Future[Result] = actor ? ()
  implicit val timeout = new Timeout(30 seconds) // or whatever
  Await.result(f4, timeout.duration).asInstanceOf[Result] match {
    case SomethingFailed => println("Oh noes!")
    case EverythingSucceeded => println("It all worked!")
  }
} finally {
  // Avoid memory leaks: destroy the actor
  actor ! PoisonPill
}

Wygląda na zbyt skomplikowane jak na tak proste zadanie. Czy naprawdę potrzebuję aktora, żeby tylko czekał na przyszłość? W każdym razie dzięki.
Michael

1
Nie mogłem znaleźć w API żadnej odpowiedniej metody, która może zrobić dokładnie to, co chcesz, ale może coś przeoczyłem.
Robin Green

5

Odpowiedź na to pytanie została udzielona, ​​ale publikuję moje rozwiązanie klasy wartości (klasy wartości zostały dodane w 2.10), ponieważ nie ma tutaj żadnego. Nie krępuj się krytykować.

  implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
    def concurrently = ConcurrentFuture(self)
  }
  case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
    def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
    def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
  }
  def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
    val p = Promise[B]()
    val inner = f(outer.future)
    inner.future onFailure { case t => p.tryFailure(t) }
    outer.future onFailure { case t => p.tryFailure(t) }
    inner.future onSuccess { case b => p.trySuccess(b) }
    ConcurrentFuture(p.future)
  }

ConcurrentFuture to niepotrzebne opakowanie Future, które zmienia domyślną mapę przyszłości / flatMap z `` wykonaj to-to-tam '' na połączenie wszystkiego i niepowodzenia. Stosowanie:

def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }

val f : Future[(Int,String,Double)] = {
  for {
    f1 <- func1.concurrently
    f2 <- func2.concurrently
    f3 <- func3.concurrently
  } yield for {
   v1 <- f1
   v2 <- f2
   v3 <- f3
  } yield (v1,v2,v3)
}.future
f.onFailure { case t => println("future failed $t") }

W powyższym przykładzie, f1, f2 i f3 będą działać jednocześnie, a jeśli jakikolwiek błąd w dowolnej kolejności, przyszłość krotki zakończy się natychmiastową porażką.


Niesamowite! Każda biblioteka, która zapewnia tego rodzaju funkcję użytkową?
srirachapills

1
Tak, od tamtej pory stworzyłem rozbudowane narzędzie Future lib: github.com/S-Mach/s_mach.concurrent Zobacz async.par w przykładowym kodzie.
lancegatlin


2

Możesz użyć tego:

val l = List(1, 6, 8)

val f = l.map{
  i => future {
    println("future " +i)
    Thread.sleep(i* 1000)
    if (i == 12)
      throw new Exception("6 is not legal.")
    i
  }
}

val f1 = Future.sequence(f)

f1 onSuccess{
  case l => {
    logInfo("onSuccess")
    l.foreach(i => {

      logInfo("h : " + i)

    })
  }
}

f1 onFailure{
  case l => {
    logInfo("onFailure")
  }
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.