Ta odpowiedź zależy od akka-stream
wersji 2.4.2
. Interfejs API może się nieco różnić w innych wersjach. Zależność może być wykorzystana przez sbt :
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"
Dobra, zacznijmy. Interfejs API Akka Streams składa się z trzech głównych typów. W przeciwieństwie do strumieni reaktywnych , typy te są znacznie potężniejsze, a zatem bardziej złożone. Zakłada się, że dla wszystkich przykładów kodu istnieją już następujące definicje:
import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher
Te import
stwierdzenia są potrzebne do deklaracji typu. system
reprezentuje system aktorski Akka i materializer
reprezentuje kontekst oceny strumienia. W naszym przypadku używamy a ActorMaterializer
, co oznacza, że strumienie są oceniane nad aktorami. Obie wartości są oznaczone jako implicit
, co daje kompilatorowi Scala możliwość automatycznego wstrzykiwania tych dwóch zależności, gdy tylko są one potrzebne. Importujemy również system.dispatcher
, który jest kontekstem wykonania dla Futures
.
Nowy interfejs API
Strumienie Akka mają następujące kluczowe właściwości:
- Wdrażają specyfikację strumieni reaktywnych , której trzy główne cele, przeciwciśnienie, asynchroniczne i nieblokujące granice oraz interoperacyjność między różnymi implementacjami, w pełni dotyczą również strumieni Akka.
- Zapewniają one abstrakcję dla silnika oceny dla strumieni, który jest nazywany
Materializer
.
- Programy są formułowane jako bloki konstrukcyjne wielokrotnego użytku, które są reprezentowane jako trzy główne typy
Source
, Sink
oraz Flow
. Bloki konstrukcyjne tworzą wykres, którego ocena opiera się na Materializer
i musi zostać wyraźnie uruchomiona.
Poniżej podano głębsze wprowadzenie do korzystania z trzech głównych typów.
Źródło
A Source
jest twórcą danych, służy jako źródło wejściowe do strumienia. Każdy Source
ma jeden kanał wyjściowy i brak kanału wejściowego. Wszystkie dane przepływają przez kanał wyjściowy do wszystkiego, co jest podłączone do Source
.
Zdjęcie pochodzi z boldradius.com .
A Source
można utworzyć na wiele sposobów:
scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...
scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...
scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future
W powyższych przypadkach wprowadziliśmy Source
skończone dane, co oznacza, że ostatecznie się zakończą. Nie należy zapominać, że strumienie reaktywne są domyślnie leniwe i asynchroniczne. Oznacza to, że należy wyraźnie poprosić o ocenę strumienia. W strumieniach Akka można to zrobić run*
metodami. runForeach
Byłoby nie różni się od znanej foreach
funkcji - dzięki run
dodaniu czyni wyraźne, że pytamy o ocenę strumienia. Ponieważ dane skończone są nudne, kontynuujemy z nieskończonymi:
scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5
Dzięki tej take
metodzie możemy stworzyć sztuczny punkt zatrzymania, który uniemożliwi nam ocenę w nieskończoność. Ponieważ obsługa aktorów jest wbudowana, możemy również z łatwością karmić strumień wiadomościami, które są wysyłane do aktora:
def run(actor: ActorRef) = {
Future { Thread.sleep(300); actor ! 1 }
Future { Thread.sleep(200); actor ! 2 }
Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
.actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
.mapMaterializedValue(run)
scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1
Widzimy, że Futures
są wykonywane asynchronicznie na różnych wątkach, co wyjaśnia wynik. W powyższym przykładzie bufor dla przychodzących elementów nie jest konieczny i dlatego OverflowStrategy.fail
możemy skonfigurować, że strumień powinien zawieść przy przepełnieniu bufora. Zwłaszcza za pośrednictwem tego interfejsu aktora możemy przesyłać strumień z dowolnego źródła danych. Nie ma znaczenia, czy dane są tworzone przez ten sam wątek, inny wątek, inny proces, czy pochodzą ze zdalnego systemu przez Internet.
Tonąć
A Sink
jest w zasadzie przeciwieństwem a Source
. Jest to punkt końcowy strumienia i dlatego zużywa dane. A Sink
ma jeden kanał wejściowy i brak kanału wyjściowego. Sinks
są szczególnie potrzebne, gdy chcemy określić zachowanie modułu gromadzącego dane w sposób wielokrotnego użytku i bez oceny strumienia. Znane już run*
metody nie pozwalają nam na te właściwości, dlatego zaleca się stosowanie Sink
zamiast nich.
Zdjęcie pochodzi z boldradius.com .
Krótki przykład Sink
działania:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3
Połączenie Source
z a Sink
można wykonać za pomocą tej to
metody. Zwraca tak zwany RunnableFlow
, który jest, jak zobaczymy później, specjalną formą Flow
- strumienia, który można wykonać, po prostu wywołując jego run()
metodę.
Zdjęcie pochodzi z boldradius.com .
Oczywiście możliwe jest przekazanie aktorowi wszystkich wartości, które trafiają do zlewu:
val actor = system.actorOf(Props(new Actor {
override def receive = {
case msg => println(s"actor received: $msg")
}
}))
scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...
scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed
Pływ
Źródła danych i ujścia są świetne, jeśli potrzebujesz połączenia między strumieniami Akka i istniejącym systemem, ale tak naprawdę nic z nimi nie możesz zrobić. Przepływy to ostatni brakujący element w abstrakcyjnej bazie strumieni Akka. Działają one jako łącznik między różnymi strumieniami i mogą służyć do przekształcania jego elementów.
Zdjęcie pochodzi z boldradius.com .
Jeśli a Flow
jest podłączone do Source
nowego, Source
powstaje wynik. Podobnie Flow
połączenie z a Sink
tworzy nowy Sink
. A w Flow
połączeniu z A Source
i Sink
wyniki w RunnableFlow
. Dlatego znajdują się między kanałem wejściowym a wyjściowym, ale same w sobie nie odpowiadają jednemu ze smaków, o ile nie są podłączone do ani a, Source
ani a Sink
.
Zdjęcie pochodzi z boldradius.com .
Aby lepiej zrozumieć Flows
, przyjrzymy się niektórym przykładom:
scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...
scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...
scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...
scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6
Poprzez via
metodę możemy podłączyć Source
z Flow
. Musimy określić typ danych wejściowych, ponieważ kompilator nie może nas o tym wnioskować. Jak możemy zobaczyć już w tym prostym przykładzie, przepływy invert
i double
są całkowicie niezależne od jakichkolwiek danych producentów i konsumentów. Przekształcają tylko dane i przekazują je do kanału wyjściowego. Oznacza to, że możemy ponownie wykorzystać przepływ między wieloma strumieniami:
scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...
scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3
scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1
s1
i s2
reprezentują zupełnie nowe strumienie - nie współużytkują żadnych danych przez swoje bloki konstrukcyjne.
Nieograniczone strumienie danych
Zanim przejdziemy dalej, powinniśmy ponownie zapoznać się z niektórymi kluczowymi aspektami strumieni reaktywnych. Nieograniczona liczba elementów może dotrzeć w dowolnym punkcie i może umieścić strumień w różnych stanach. Oprócz strumienia uruchomialnego, który jest zwykłym stanem, strumień może zostać zatrzymany albo przez błąd, albo przez sygnał, który oznacza, że nie dotrą żadne dalsze dane. Strumień można modelować w sposób graficzny, zaznaczając zdarzenia na osi czasu, tak jak ma to miejsce:
Zdjęcie zaczerpnięte z wprowadzenia do programowania reaktywnego, którego brakowało .
Widzieliśmy już możliwe do uruchomienia przepływy w przykładach poprzedniej sekcji. Otrzymujemy RunnableGraph
ilekroć rzeczywiście można zmaterializować strumień, co oznacza, że a Sink
jest podłączony do Source
. Do tej pory zawsze materializowaliśmy się do wartości Unit
, którą można zobaczyć w typach:
val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)
For Source
i Sink
parametr drugiego typu oraz dla Flow
parametru trzeciego typu oznaczają zmaterializowaną wartość. W tej odpowiedzi pełne znaczenie materializacji nie zostanie wyjaśnione. Jednak dalsze szczegóły na temat materializacji można znaleźć w oficjalnej dokumentacji . Na razie jedyne, co musimy wiedzieć, to to, że zmaterializowana wartość jest tym, co otrzymujemy, gdy uruchamiamy strumień. Ponieważ do tej pory byliśmy zainteresowani tylko efektami ubocznymi, otrzymaliśmy Unit
wartość zmaterializowaną. Wyjątkiem było zmaterializowanie zlewu, co spowodowało:Future
. Dało nam toFuture
, ponieważ ta wartość może oznaczać zakończenie strumienia połączonego z ujściem. Dotychczasowe przykłady kodu były przyjemne do wyjaśnienia tej koncepcji, ale były również nudne, ponieważ zajmowaliśmy się tylko strumieniami skończonymi lub bardzo prostymi nieskończonymi. Aby było bardziej interesująco, poniżej zostanie wyjaśniony pełny strumień asynchroniczny i nieograniczony.
Przykład ClickStream
Na przykład chcemy mieć strumień, który przechwytuje zdarzenia kliknięcia. Aby było to trudniejsze, powiedzmy, że chcemy również grupować zdarzenia kliknięcia, które mają miejsce w krótkim czasie po sobie. W ten sposób możemy łatwo odkryć podwójne, potrójne lub dziesięciokrotne kliknięcia. Ponadto chcemy odfiltrować wszystkie pojedyncze kliknięcia. Weź głęboki oddech i wyobraź sobie, jak rozwiązałbyś ten problem w sposób nadrzędny. Założę się, że nikt nie byłby w stanie wdrożyć rozwiązania, które działa poprawnie przy pierwszej próbie. Reaktywnie ten problem jest prosty do rozwiązania. W rzeczywistości rozwiązanie jest tak proste i łatwe do wdrożenia, że możemy nawet wyrazić to na diagramie, który bezpośrednio opisuje zachowanie kodu:
Zdjęcie zaczerpnięte z wprowadzenia do programowania reaktywnego, którego brakowało .
Szare pola to funkcje opisujące, w jaki sposób jeden strumień jest przekształcany w inny. Dzięki throttle
funkcji gromadzimy kliknięcia w ciągu 250 milisekund, map
a filter
funkcje i powinny być zrozumiałe. Kolorowe kule reprezentują zdarzenie, a strzałki pokazują, jak przepływają one przez nasze funkcje. Później na etapach przetwarzania otrzymujemy coraz mniej elementów przepływających przez nasz strumień, ponieważ grupujemy je razem i odfiltrowujemy. Kod tego obrazu wyglądałby mniej więcej tak:
val multiClickStream = clickStream
.throttle(250.millis)
.map(clickEvents => clickEvents.length)
.filter(numberOfClicks => numberOfClicks >= 2)
Całą logikę można przedstawić tylko w czterech wierszach kodu! W Scali moglibyśmy napisać to jeszcze krócej:
val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)
Definicja clickStream
jest nieco bardziej złożona, ale dzieje się tak tylko dlatego, że przykładowy program działa na JVM, gdzie przechwytywanie zdarzeń kliknięcia nie jest łatwe. Inną komplikacją jest to, że Akka domyślnie nie zapewnia tej throttle
funkcji. Zamiast tego musieliśmy to napisać sami. Ponieważ ta funkcja jest (jak ma to miejsce w przypadku funkcji map
lub filter
) wielokrotnego użytku w różnych przypadkach użycia, nie liczę tych linii do liczby linii potrzebnych do implementacji logiki. Jednak w imperatywnych językach jest rzeczą normalną, że logiki nie można użyć tak łatwo i że różne logiczne kroki dzieją się w jednym miejscu, zamiast być stosowane sekwencyjnie, co oznacza, że prawdopodobnie zmienilibyśmy nasz kod z logiką dławienia. Pełny przykładowy kod jest dostępny jakoIstota i nie będzie tu więcej omawiana.
Przykład SimpleWebServer
Zamiast tego należy omówić inny przykład. Chociaż strumień kliknięć jest dobrym przykładem pozwalającym strumieniom Akka obsługiwać przykład z prawdziwego świata, brakuje mu mocy do pokazania równoległego wykonywania w akcji. Następny przykład powinien reprezentować mały serwer WWW, który może obsługiwać wiele żądań równolegle. Serwer WWW powinien akceptować połączenia przychodzące i odbierać od nich sekwencje bajtów reprezentujące drukowalne znaki ASCII. Te sekwencje bajtów lub ciągi znaków powinny być podzielone przy wszystkich znakach nowej linii na mniejsze części. Następnie serwer odpowiada klientowi każdą z linii podziału. Alternatywnie może zrobić coś innego z liniami i dać specjalny token odpowiedzi, ale chcemy uprościć ten przykład i dlatego nie wprowadzamy żadnych wymyślnych funkcji. Zapamiętaj, serwer musi być w stanie obsłużyć wiele żądań jednocześnie, co w zasadzie oznacza, że żadne żądanie nie może blokować żadnego innego żądania z dalszego wykonania. Rozwiązanie wszystkich tych wymagań może być trudne w bezwzględny sposób - jednak w przypadku strumieni Akka nie powinniśmy potrzebować więcej niż kilku linii do rozwiązania któregokolwiek z nich. Najpierw omówmy sam serwer:
Zasadniczo istnieją tylko trzy główne elementy składowe. Pierwszy musi akceptować połączenia przychodzące. Drugi musi obsłużyć przychodzące żądania, a trzeci musi wysłać odpowiedź. Wdrożenie wszystkich tych trzech elementów jest tylko trochę bardziej skomplikowane niż wdrożenie strumienia kliknięć:
def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
import system.dispatcher
val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
Sink.foreach[Tcp.IncomingConnection] { conn =>
println(s"Incoming connection from: ${conn.remoteAddress}")
conn.handleWith(serverLogic)
}
val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
Tcp().bind(address, port)
val binding: Future[Tcp.ServerBinding] =
incomingCnnections.to(connectionHandler).run()
binding onComplete {
case Success(b) =>
println(s"Server started, listening on: ${b.localAddress}")
case Failure(e) =>
println(s"Server could not be bound to $address:$port: ${e.getMessage}")
}
}
Funkcja mkServer
przyjmuje (oprócz adresu i portu serwera) także system aktorów i materializator jako parametry niejawne. Przepływ sterowania serwera jest reprezentowany przez binding
, który pobiera źródło połączeń przychodzących i przekazuje je do ujścia połączeń przychodzących. Wewnątrz connectionHandler
, który jest naszym zlewem, obsługujemy każde połączenie przepływem serverLogic
, które zostanie opisane później. binding
zwraca aFuture
, która kończy się, gdy serwer został uruchomiony lub uruchomienie nie powiodło się, co może mieć miejsce, gdy port jest już zajęty przez inny proces. Kod nie odzwierciedla jednak całkowicie grafiki, ponieważ nie widzimy bloku konstrukcyjnego, który obsługuje odpowiedzi. Powodem tego jest to, że połączenie samo zapewnia tę logikę. Jest to przepływ dwukierunkowy, a nie tylko jednokierunkowy, jak widzieliśmy w poprzednich przykładach. Tak jak w przypadku materializacji, takich złożonych przepływów nie należy tutaj wyjaśniać. Oficjalna dokumentacja ma mnóstwo materiału do pokrycia bardziej złożonych wykresów przepływu. Na razie wystarczy wiedzieć, że Tcp.IncomingConnection
reprezentuje połączenie, które wie, jak odbierać żądania i jak wysyłać odpowiedzi. Część, której wciąż brakuje, toserverLogic
blok konstrukcyjny. Może to wyglądać tak:
Po raz kolejny jesteśmy w stanie podzielić logikę na kilka prostych elementów składowych, które razem tworzą przepływ naszego programu. Najpierw chcemy podzielić naszą sekwencję bajtów na linie, co musimy zrobić, gdy znajdziemy znak nowej linii. Następnie bajty każdej linii należy przekonwertować na ciąg, ponieważ praca z bajtami surowymi jest uciążliwa. Ogólnie rzecz biorąc, moglibyśmy otrzymać binarny strumień skomplikowanego protokołu, co sprawiłoby, że praca z przychodzącymi surowymi danymi była niezwykle trudna. Gdy mamy czytelny ciąg, możemy utworzyć odpowiedź. Dla uproszczenia odpowiedzią może być w naszym przypadku cokolwiek. Na koniec musimy przekonwertować naszą odpowiedź na sekwencję bajtów, które można wysłać przewodowo. Kod całej logiki może wyglądać następująco:
val serverLogic: Flow[ByteString, ByteString, Unit] = {
val delimiter = Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true)
val receiver = Flow[ByteString].map { bytes =>
val message = bytes.utf8String
println(s"Server received: $message")
message
}
val responder = Flow[String].map { message =>
val answer = s"Server hereby responds to message: $message\n"
ByteString(answer)
}
Flow[ByteString]
.via(delimiter)
.via(receiver)
.via(responder)
}
Wiemy już, że serverLogic
jest to przepływ, który wymaga ByteString
i musi wytworzyć ByteString
. Dzięki delimiter
możemy podzielić na ByteString
mniejsze części - w naszym przypadku musi to nastąpić za każdym razem, gdy pojawi się znak nowej linii. receiver
to przepływ, który bierze wszystkie sekwencje podzielonych bajtów i konwertuje je na ciąg. Jest to oczywiście niebezpieczna konwersja, ponieważ tylko drukowalne znaki ASCII powinny być konwertowane na ciąg znaków, ale na nasze potrzeby jest wystarczająca. responder
jest ostatnim komponentem i odpowiada za utworzenie odpowiedzi i konwersję odpowiedzi z powrotem do sekwencji bajtów. W przeciwieństwie do grafiki nie podzieliliśmy tego ostatniego komponentu na dwie części, ponieważ logika jest trywialna. Na koniec łączymy wszystkie przepływy przezvia
funkcjonować. W tym momencie można zapytać, czy zadbaliśmy o wspomnianą na początku własność dla wielu użytkowników. I rzeczywiście tak zrobiliśmy, chociaż może nie być to od razu oczywiste. Patrząc na tę grafikę, powinna stać się wyraźniejsza:
serverLogic
Składnikiem jest niczym strumień, który zawiera mniejsze przepływy. Ten komponent pobiera dane wejściowe, które są żądaniami, i generuje dane wyjściowe, które są odpowiedzią. Ponieważ przepływy można konstruować wiele razy i wszystkie działają niezależnie od siebie, osiągamy to poprzez zagnieżdżanie naszej właściwości dla wielu użytkowników. Każde żądanie jest obsługiwane w ramach własnego żądania, dlatego też żądanie krótko działające może zastąpić wcześniej uruchomione żądanie długo działające. Jeśli zastanawiasz się, definicję serverLogic
tego pokazaną wcześniej można oczywiście napisać znacznie krócej, wstawiając większość jej wewnętrznych definicji:
val serverLogic = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(msg => s"Server hereby responds to message: $msg\n")
.map(ByteString(_))
Test serwera WWW może wyglądać następująco:
$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?
Aby powyższy przykład kodu działał poprawnie, najpierw musimy uruchomić serwer, który jest przedstawiony przez startServer
skrypt:
$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?
Pełny przykład kodu tego prostego serwera TCP można znaleźć tutaj . Jesteśmy w stanie napisać serwer za pomocą Akka Streams, ale także klienta. Może to wyglądać tak:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(println)
.map(_ ⇒ StdIn.readLine("> "))
.map(_+"\n")
.map(ByteString(_))
connection.join(flow).run()
Pełny kod klienta TCP można znaleźć tutaj . Kod wygląda dość podobnie, ale w przeciwieństwie do serwera nie musimy już zarządzać połączeniami przychodzącymi.
Złożone wykresy
W poprzednich sekcjach widzieliśmy, jak możemy konstruować proste programy z przepływów. Jednak w rzeczywistości często nie wystarczy polegać na już wbudowanych funkcjach, aby konstruować bardziej złożone strumienie. Jeśli chcemy mieć możliwość korzystania ze strumieni Akka do dowolnych programów, musimy wiedzieć, jak zbudować własne niestandardowe struktury sterowania i kombinowalne przepływy, które pozwolą nam uporać się ze złożonością naszych aplikacji. Dobrą wiadomością jest to, że strumienie Akka zostały zaprojektowane w taki sposób, aby skalować je w zależności od potrzeb użytkowników. Aby dać krótkie wprowadzenie do bardziej złożonych części strumieni Akka, dodajemy więcej funkcji do naszego przykładu klient / serwer.
Jedną rzeczą, której nie możemy jeszcze zrobić, jest zamknięcie połączenia. W tym momencie zaczyna się nieco komplikować, ponieważ interfejs API strumienia, który widzieliśmy do tej pory, nie pozwala nam zatrzymać strumienia w dowolnym miejscu. Istnieje jednak GraphStage
abstrakcja, za pomocą której można tworzyć dowolne etapy przetwarzania wykresów z dowolną liczbą portów wejściowych lub wyjściowych. Przyjrzyjmy się najpierw stronie serwera, gdzie wprowadzamy nowy komponent o nazwie closeConnection
:
val closeConnection = new GraphStage[FlowShape[String, String]] {
val in = Inlet[String]("closeConnection.in")
val out = Outlet[String]("closeConnection.out")
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = grab(in) match {
case "q" ⇒
push(out, "BYE")
completeStage()
case msg ⇒
push(out, s"Server hereby responds to message: $msg\n")
}
})
setHandler(out, new OutHandler {
override def onPull() = pull(in)
})
}
}
Ten interfejs API wygląda o wiele bardziej skomplikowany niż interfejs API flow. Nic dziwnego, że musimy tutaj zrobić wiele koniecznych kroków. W zamian mamy większą kontrolę nad zachowaniem naszych strumieni. W powyższym przykładzie podajemy tylko jeden port wejściowy i jeden port wyjściowy i udostępniamy je systemowi, zastępując shape
wartość. Ponadto zdefiniowaliśmy tak zwane InHandler
a OutHandler
, które w tej kolejności odpowiadają za odbieranie i wysyłanie elementów. Jeśli przyjrzałeś się przykładowi pełnego strumienia kliknięć, powinieneś już rozpoznać te komponenty. W InHandler
chwytamy element i jeśli jest to ciąg znaków z jednym znakiem 'q'
, chcemy zamknąć strumień. Aby dać klientowi szansę dowiedzieć się, że strumień zostanie wkrótce zamknięty, emitujemy ciąg"BYE"
a następnie natychmiast zamykamy scenę. closeConnection
Składnik może być łączony ze strumieniem za pomocą via
sposobu, który został wprowadzony w części o przepływach.
Oprócz możliwości zamykania połączeń, byłoby również miło, gdybyśmy mogli wyświetlić wiadomość powitalną dla nowo utworzonego połączenia. Aby to zrobić, musimy jeszcze raz pójść nieco dalej:
def serverLogic
(conn: Tcp.IncomingConnection)
(implicit system: ActorSystem)
: Flow[ByteString, ByteString, NotUsed]
= Flow.fromGraph(GraphDSL.create() { implicit b ⇒
import GraphDSL.Implicits._
val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
val logic = b.add(internalLogic)
val concat = b.add(Concat[ByteString]())
welcome ~> concat.in(0)
logic.outlet ~> concat.in(1)
FlowShape(logic.in, concat.out)
})
Funkcja serverLogic
przyjmuje teraz połączenie przychodzące jako parametr. Wewnątrz jego ciała używamy DSL, który pozwala nam opisać złożone zachowanie strumienia. Dzięki temu welcome
tworzymy strumień, który może emitować tylko jeden element - wiadomość powitalną. logic
jest tak, jak opisano serverLogic
w poprzedniej sekcji. Jedyną zauważalną różnicą jest to, że closeConnection
do tego dodaliśmy . Teraz faktycznie nadchodzi interesująca część DSL. Ta GraphDSL.create
funkcja udostępnia konstruktora b
, który służy do wyrażania strumienia jako wykresu. Za pomocą tej ~>
funkcji można ze sobą łączyć porty wejściowe i wyjściowe. Concat
Składnik, który jest używany w tym przykładzie można łączyć elementy i jest tu stosowany do poprzedź wiadomość powitalną przed innymi elementami, które pochodzą zinternalLogic
. W ostatnim wierszu udostępniamy tylko port wejściowy logiki serwera i port wyjściowy konkatenowanego strumienia, ponieważ wszystkie pozostałe porty powinny pozostać szczegółami implementacji serverLogic
komponentu. Aby uzyskać szczegółowe wprowadzenie do wykresu DSL strumieni Akka, odwiedź odpowiednią sekcję w oficjalnej dokumentacji . Pełny przykład kodu złożonego serwera TCP i klienta, który może się z nim komunikować, można znaleźć tutaj . Za każdym razem, gdy otwierasz nowe połączenie od klienta, powinieneś zobaczyć wiadomość powitalną, a po wpisaniu "q"
na kliencie powinieneś zobaczyć komunikat informujący, że połączenie zostało anulowane.
Wciąż jest kilka tematów, które nie zostały ujęte w tej odpowiedzi. Zwłaszcza materializacja może przestraszyć jednego lub drugiego czytelnika, ale jestem pewien, że dzięki omawianemu tutaj materiałowi każdy powinien być w stanie samodzielnie przejść kolejne kroki. Jak już wspomniano, oficjalna dokumentacja jest dobrym miejscem do dalszej nauki o strumieniach Akka.