Po wykonaniu (mniej więcej) asynchronicznego socket
programowania „niskiego poziomu” wiele lat temu (w sposób oparty na zdarzeniu w oparciu o asynchroniczny wzorzec asynchroniczny (EAP) ) i ostatnio przeniesieniu „w górę” do TcpListener
(model programowania asynchronicznego (APM) ), a następnie próbując przejść do async/await
( TAP) opartego na zadaniach (ang. Task-Asynchronous Pattern, TAP) ) Prawie to robiłem, ponieważ musiałem przejmować się całym tym „niskim poziomem hydrauliki”. Więc pomyślałem; dlaczego nie spróbować RX
( Reactive Extensions ), ponieważ może lepiej pasować do mojej problematycznej domeny.
Dużo kodu, który piszę, musi być z wieloma klientami łączącymi się z Tcp z moją aplikacją, która następnie rozpoczyna dwukierunkową (asynchroniczną) komunikację. Klient lub serwer może w dowolnym momencie zdecydować, że wiadomość musi zostać wysłana i zrobić to, więc nie jest to twoja klasyczna request/response
konfiguracja, ale raczej dwustronna „linia” w czasie rzeczywistym, otwarta dla obu stron, aby wysłać co tylko zechce. , kiedy tylko chcą. (Jeśli ktoś ma dobre imię, by to opisać, chętnie to usłyszę!).
„Protokół” różni się w zależności od aplikacji (i tak naprawdę nie ma związku z moim pytaniem). Mam jednak wstępne pytanie:
- Biorąc pod uwagę, że działa tylko jeden „serwer”, ale musi on śledzić wiele (zwykle tysiące) połączeń (np. Klientów), z których każde ma (z braku lepszego opisu) własną „maszynę stanu”, aby śledzić swoje wewnętrzne stwierdza itp., jakie podejście wolisz? EAP / TAP / APM? Czy RX byłby nawet uważany za opcję? Jeśli nie to dlaczego?
Muszę więc pracować w trybie asynchronicznym, ponieważ a) nie jest to protokół żądania / odpowiedzi, więc nie mogę mieć wątku / klienta w „czekającym na wiadomość” - blokującym połączeniu lub „wysyłającym wiadomość” - blokującym połączeniu (jednak, jeśli wysyłanie jest blokowanie tylko dla tego klienta, z którym mógłbym z nim żyć) ib) muszę obsłużyć wiele jednoczesnych połączeń. Nie widzę możliwości (niezawodnego) wykonania tego przy użyciu blokowania połączeń.
Większość moich aplikacji jest związanych z VoiP; czy to wiadomości SIP od klientów SIP, czy wiadomości PBX (powiązane) z aplikacji takich jak FreeSwitch / OpenSIPS itp., ale możesz, w najprostszej formie, wyobrazić sobie serwer „czatu” próbujący obsłużyć wielu klientów „czatu”. Większość protokołów jest oparta na tekście (ASCII).
Po wdrożeniu wielu różnych permutacji wyżej wymienionych technik chciałbym uprościć moją pracę, tworząc obiekt, który mogę po prostu utworzyć, wskazać, na co IPEndpoint
ma słuchać, i powiedzieć mi, gdy dzieje się coś interesującego (co zwykle użyj zdarzeń dla, więc niektóre EAP są zwykle mieszane z pozostałymi dwiema technikami). Klasa nie powinna zawracać sobie głowy próbą „zrozumienia” protokołu; powinien po prostu obsługiwać łańcuchy przychodzące / wychodzące. I tak, mając oko na RX, mając nadzieję, że to (w końcu) uprości pracę, stworzyłem od nowa nowe „skrzypce”:
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;
class Program
{
static void Main(string[] args)
{
var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
f.Start();
Console.ReadKey();
f.Stop();
Console.ReadKey();
}
}
public class FiddleServer
{
private TcpListener _listener;
private ConcurrentDictionary<ulong, FiddleClient> _clients;
private ulong _currentid = 0;
public IPEndPoint LocalEP { get; private set; }
public FiddleServer(IPEndPoint localEP)
{
this.LocalEP = localEP;
_clients = new ConcurrentDictionary<ulong, FiddleClient>();
}
public void Start()
{
_listener = new TcpListener(this.LocalEP);
_listener.Start();
Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
//OnNext
tcpclient =>
{
//Create new FSClient with unique ID
var fsclient = new FiddleClient(_currentid++, tcpclient);
//Keep track of clients
_clients.TryAdd(fsclient.ClientId, fsclient);
//Initialize connection
fsclient.Send("connect\n\n");
Console.WriteLine("Client {0} accepted", fsclient.ClientId);
},
//OnError
ex =>
{
},
//OnComplete
() =>
{
Console.WriteLine("Client connection initialized");
//Accept new connections
_listener.AcceptTcpClientAsync();
}
);
Console.WriteLine("Started");
}
public void Stop()
{
_listener.Stop();
Console.WriteLine("Stopped");
}
public void Send(ulong clientid, string rawmessage)
{
FiddleClient fsclient;
if (_clients.TryGetValue(clientid, out fsclient))
{
fsclient.Send(rawmessage);
}
}
}
public class FiddleClient
{
private TcpClient _tcpclient;
public ulong ClientId { get; private set; }
public FiddleClient(ulong id, TcpClient tcpclient)
{
this.ClientId = id;
_tcpclient = tcpclient;
}
public void Send(string rawmessage)
{
Console.WriteLine("Sending {0}", rawmessage);
var data = Encoding.ASCII.GetBytes(rawmessage);
_tcpclient.GetStream().WriteAsync(data, 0, data.Length); //Write vs WriteAsync?
}
}
Zdaję sobie sprawę, że w tym „skrzypcach” jest trochę szczegółów związanych z implementacją; w tym przypadku pracuję z FreeSwitch ESL, więc "connect\n\n"
skrzypce powinny zostać usunięte podczas refaktoryzacji do bardziej ogólnego podejścia.
Wiem również, że muszę refaktoryzować metody anonimowe do metod instancji prywatnej w klasie Server; Po prostu nie jestem pewien, jakiej konwencji (np. „ OnSomething
”) Użyć dla ich nazw metod?
To jest moja podstawa / punkt początkowy / fundament (który wymaga trochę „ulepszenia”). Mam kilka pytań na ten temat:
- Zobacz powyższe pytanie „1”
- Czy jestem na dobrej drodze? Czy moje decyzje „projektowe” są niesprawiedliwe?
- Pod względem współbieżności: czy poradziłoby sobie z tysiącami klientów (parsowanie / obsługa rzeczywistych wiadomości na bok)
- O wyjątkach: Nie jestem pewien, jak uzyskać wyjątki zgłaszane przez klientów „do” serwera („RX-mądry”); jaki byłby dobry sposób?
- Mogę teraz uzyskać dowolnego podłączonego klienta z mojej klasy serwera (używając go
ClientId
), zakładając, że narażam klientów w taki czy inny sposób i wywołuję metody bezpośrednio na nich. Mogę również wywoływać metody za pośrednictwem klasy Server (na przykładSend(clientId, rawmessage)
metoda (podczas gdy to drugie podejście byłoby metodą „wygodną” do szybkiego przesyłania wiadomości na drugą stronę). - Nie jestem do końca pewien, gdzie (i jak) iść stąd:
- a) Muszę obsługiwać wiadomości przychodzące; jak mam to skonfigurować? Mogę pobrać strumień kursu, ale gdzie miałbym poradzić sobie z odzyskiwaniem odebranych bajtów? Myślę, że potrzebuję czegoś w rodzaju „ObservableStream”, do którego mogę się zapisać? Czy umieściłbym to w
FiddleClient
lubFiddleServer
? - b) Zakładając, że chcę uniknąć używania zdarzenia, dopóki te
FiddleClient
/FiddleServer
klasy nie zostaną zaimplementowane bardziej konkretnie, aby dostosować ich protokół do konkretnej aplikacji itp. za pomocą bardziej szczegółowychFooClient
/FooServer
klas: w jaki sposób miałbym przejść od pobierania danych z podstawowych klas „Fiddle” do ich bardziej konkretne odpowiedniki?
- a) Muszę obsługiwać wiadomości przychodzące; jak mam to skonfigurować? Mogę pobrać strumień kursu, ale gdzie miałbym poradzić sobie z odzyskiwaniem odebranych bajtów? Myślę, że potrzebuję czegoś w rodzaju „ObservableStream”, do którego mogę się zapisać? Czy umieściłbym to w
Artykuły / linki, które już przeczytałem / przejrzałem / wykorzystałem w celach informacyjnych:
- Zużyj gniazdo używając Reactive Extension
- Tworzenie i subskrybowanie prostych obserwowalnych sekwencji
- Jak dodać lub powiązać kontekst z każdym połączeniem / sesją ObservableTcpListener
- Programowanie asynchroniczne za pomocą Reactive Framework i Task Parallel Library - Część 1 , 2 i 3 .
- Korzystanie z rozszerzeń reaktywnych (Rx) do programowania gniazd jest praktyczne?
- Serwer internetowy napędzany .NET Rx i serwer internetowy napędzany .NET Rx, Take 2
- Niektóre samouczki Rx