TL; DR To nie jest banalne
Wygląda jak ktoś już pisał pełny kod dla Utf8JsonStreamReader
struktury, które odczytuje bufory ze strumienia i przekazuje je do Utf8JsonRreader, umożliwiając łatwy deserializacji z JsonSerializer.Deserialize<T>(ref newJsonReader, options);
. Kod też nie jest trywialny. Powiązane pytanie jest tutaj, a odpowiedź jest tutaj .
To jednak nie wystarczy - HttpClient.GetAsync
powróci dopiero po otrzymaniu całej odpowiedzi, zasadniczo buforując wszystko w pamięci.
Aby tego uniknąć, należy używać HttpClient.GetAsync (ciąg, HttpCompletionOption) z HttpCompletionOption.ResponseHeadersRead
.
Pętla deserializacji powinna również sprawdzić token anulowania i wyjść lub rzucić, jeśli jest sygnalizowany. W przeciwnym razie pętla będzie działać, dopóki cały strumień nie zostanie odebrany i przetworzony.
Ten kod jest oparty na przykładzie pokrewnej odpowiedzi i wykorzystuje HttpCompletionOption.ResponseHeadersRead
i sprawdza token anulowania. Może analizować ciągi JSON, które zawierają odpowiednią tablicę elementów, np .:
[{"prop1":123},{"prop1":234}]
Pierwsze wywołanie jsonStreamReader.Read()
przesuwa się na początek tablicy, a drugie przesuwa się na początek pierwszego obiektu. Sama pętla kończy się po ]
wykryciu końca tablicy ( ).
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
//Don't cache the entire response
using var httpResponse = await httpClient.GetAsync(url,
HttpCompletionOption.ResponseHeadersRead,
cancellationToken);
using var stream = await httpResponse.Content.ReadAsStreamAsync();
using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);
jsonStreamReader.Read(); // move to array start
jsonStreamReader.Read(); // move to start of the object
while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
{
//Gracefully return if cancellation is requested.
//Could be cancellationToken.ThrowIfCancellationRequested()
if(cancellationToken.IsCancellationRequested)
{
return;
}
// deserialize object
var obj = jsonStreamReader.Deserialize<T>();
yield return obj;
// JsonSerializer.Deserialize ends on last token of the object parsed,
// move to the first token of next object
jsonStreamReader.Read();
}
}
Fragmenty JSON, AKA streaming JSON aka ... *
Dość często zdarza się, że w scenariuszach przesyłania strumieniowego lub rejestrowania zdarzeń poszczególne obiekty JSON są dołączane do pliku, po jednym elemencie w wierszu, np .:
{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}
To nie jest prawidłowy dokument JSON, ale poszczególne fragmenty są prawidłowe. Ma to kilka zalet w przypadku dużych zbiorów danych / wysoce współbieżnych scenariuszy. Dodanie nowego zdarzenia wymaga jedynie dodania nowego wiersza do pliku, a nie analizowania i przebudowywania całego pliku. Przetwarzanie , zwłaszcza przetwarzanie równoległe, jest łatwiejsze z dwóch powodów:
- Poszczególne elementy można pobierać pojedynczo, po prostu czytając jedną linię ze strumienia.
- Plik wejściowy można łatwo podzielić na partycje i podzielić na granice linii, podając każdą część osobnemu procesowi roboczemu, np. W klastrze Hadoop, lub po prostu innym wątkom w aplikacji: Oblicz punkty podziału, np. Dzieląc długość przez liczbę pracowników , a następnie poszukaj pierwszej nowej linii. Podaj wszystko do tego momentu osobnemu pracownikowi.
Korzystanie z StreamReadera
Aby to zrobić, należy użyć TextReadera, czytać jeden wiersz na raz i parsować go za pomocą JsonSerializer.Deserialize :
using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken
while((line=await reader.ReadLineAsync()) != null)
{
var item=JsonSerializer.Deserialize<T>(line);
yield return item;
if(cancellationToken.IsCancellationRequested)
{
return;
}
}
Jest to o wiele prostsze niż kod, który deserializuje odpowiednią tablicę. Istnieją dwa problemy:
ReadLineAsync
nie akceptuje tokenu anulowania
- Każda iteracja przydziela nowy ciąg, jedną z rzeczy, których chcieliśmy uniknąć , używając System.Text.Json
Może to wystarczyć, ponieważ próba wytworzenia ReadOnlySpan<Byte>
buforów wymaganych przez JsonSerializer.Deserialize nie jest trywialna.
Rurociągi i czytnik sekwencji
Aby uniknąć alokacji, musimy pobrać ReadOnlySpan<byte>
ze strumienia. Wykonanie tego wymaga użycia potoków System.IO.Pipeline i struktury SequenceReader . Steve Gordon's An Introduction to SequenceReader wyjaśnia, w jaki sposób można wykorzystać tę klasę do odczytu danych ze strumienia przy użyciu ograniczników.
Niestety SequenceReader
jest strukturą referencyjną, co oznacza, że nie można jej używać w metodach asynchronicznych ani lokalnych. Dlatego Steve Gordon w swoim artykule tworzy
private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
metoda odczytu elementów z ReadOnlySequence i zwrócenie pozycji końcowej, aby PipeReader mógł z niej wznowić. Niestety chcemy zwrócić IEnumerable lub IAsyncEnumerable, a metody iteratora nie lubią parametrów ani in
też out
parametrów.
Możemy zebrać deserializowane elementy z Listy lub Kolejki i zwrócić je jako pojedynczy wynik, ale to nadal przydzieli listy, bufory lub węzły i będziemy musieli poczekać, aż wszystkie elementy w buforze zostaną odserializowane przed zwróceniem:
private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
Potrzebujemy czegoś , co zachowuje się jak wyliczenie, nie wymagając metody iteratora, działa z asynchronizacją i nie buforuje wszystkiego.
Dodawanie kanałów w celu utworzenia IAsyncEnumerable
ChannelReader.ReadAllAsync zwraca IAsyncEnumerable. Możemy zwrócić ChannelReader z metod, które nie mogłyby działać jako iteratory i nadal generować strumień elementów bez buforowania.
Dostosowując kod Steve'a Gordona do korzystania z kanałów, otrzymujemy ReadItems (ChannelWriter ...) i ReadLastItem
metody. Pierwszy z nich odczytuje jeden element na raz, aż do nowej linii za pomocą ReadOnlySpan<byte> itemBytes
. Może to być wykorzystane przez JsonSerializer.Deserialize
. Jeśli ReadItems
nie może znaleźć separatora, zwraca swoją pozycję, aby PipelineReader mógł pobrać następny fragment ze strumienia.
Kiedy dotrzemy do ostatniego fragmentu i nie będzie już innego ogranicznika, ReadLastItem` odczytuje pozostałe bajty i deserializuje je.
Kod jest prawie identyczny z kodem Steve'a Gordona. Zamiast pisać do konsoli piszemy do ChannelWriter.
private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;
private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence,
bool isCompleted, CancellationToken token)
{
var reader = new SequenceReader<byte>(sequence);
while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
{
if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
{
var item=JsonSerializer.Deserialize<T>(itemBytes);
writer.TryWrite(item);
}
else if (isCompleted) // read last item which has no final delimiter
{
var item = ReadLastItem<T>(sequence.Slice(reader.Position));
writer.TryWrite(item);
reader.Advance(sequence.Length); // advance reader to the end
}
else // no more items in this sequence
{
break;
}
}
return reader.Position;
}
private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
var length = (int)sequence.Length;
if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
{
Span<byte> byteBuffer = stackalloc byte[length];
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
else // otherwise we'll rent an array to use as the buffer
{
var byteBuffer = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
}
}
}
DeserializeToChannel<T>
Metoda stwarza czytelnikowi rurociągu na górze strumienia, tworzy kanał i rozpoczyna zadanie pracownika, który analizuje kawałki i popycha je do kanału:
ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
var pipeReader = PipeReader.Create(stream);
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
while (!token.IsCancellationRequested)
{
var result = await pipeReader.ReadAsync(token); // read from the pipe
var buffer = result.Buffer;
var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer
if (result.IsCompleted)
break; // exit if we've read everything from the pipe
pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
}
pipeReader.Complete();
},token)
.ContinueWith(t=>{
pipeReader.Complete();
writer.TryComplete(t.Exception);
});
return channel.Reader;
}
ChannelReader.ReceiveAllAsync()
może być wykorzystany do zużycia wszystkich przedmiotów poprzez IAsyncEnumerable<T>
:
var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
//Do something with it
}