Jak odczytać zawartość strumienia Node.js w zmiennej ciągu?


113

Włamuję się do programu Node, który używa smtp-protocoldo przechwytywania e-maili SMTP i działania na danych poczty. Biblioteka dostarcza dane poczty w postaci strumienia i nie wiem, jak zamienić je w łańcuch.

Obecnie piszę go na stdout stream.pipe(process.stdout, { end: false }), ale jak powiedziałem, potrzebuję zamiast tego danych strumienia w ciągu, którego mogę użyć po zakończeniu strumienia.

Jak zebrać wszystkie dane ze strumienia Node.js w ciągu?


Powinieneś skopiować strumień lub oznaczyć go (autoClose: false). Zanieczyszczenie pamięci jest złą praktyką.
19h

Odpowiedzi:


41

(Ta odpowiedź pochodzi sprzed lat, kiedy była to najlepsza odpowiedź. Jest teraz lepsza odpowiedź poniżej. Nie śledziłem node.js i nie mogę usunąć tej odpowiedzi, ponieważ jest oznaczona jako „poprawna w tym pytaniu ". Jeśli myślisz o kliknięciu w dół, co mam zrobić?)

Kluczem jest wykorzystanie datai endzdarzeń o czytelnej Stream . Posłuchaj tych wydarzeń:

stream.on('data', (chunk) => { ... });
stream.on('end', () => { ... });

Po otrzymaniu datazdarzenia dodaj nową porcję danych do buforu utworzonego w celu gromadzenia danych.

Po odebraniu endzdarzenia przekonwertuj wypełniony bufor na ciąg, jeśli to konieczne. Następnie zrób z tym, co musisz.


149
Kilka wierszy kodu ilustrujących odpowiedź jest lepsze niż po prostu wskazanie łącza na API. Nie zgadzaj się z odpowiedzią, po prostu nie wierz, że jest wystarczająco kompletna.
arcseldon

3
W nowszych wersjach node.js jest to bardziej przejrzyste: stackoverflow.com/a/35530615/271961
Simon A. Eugster

Odpowiedź powinna zostać zaktualizowana, aby nie zalecać korzystania z biblioteki Promises, ale używać natywnych obietnic.
Dan Dascalescu

@DanDascalescu Zgadzam się z tobą. Problem w tym, że napisałem tę odpowiedź 7 lat temu i nie nadążałem za node.js. Byłoby wspaniale, gdyby ktoś inny chciał go zaktualizować. Albo mógłbym go po prostu usunąć, ponieważ wydaje się, że jest już lepsza odpowiedź. Co byś polecił?
ControlAltDel

@ControlAltDel: Doceniam twoją inicjatywę usunięcia odpowiedzi, która nie jest już najlepsza. Żałuję, że inni nie mieli podobnej dyscypliny .
Dan Dascalescu

129

Innym sposobem byłoby przekonwertowanie strumienia na obietnicę (patrz przykład poniżej) i użycie then(lub await) w celu przypisania rozstrzygniętej wartości do zmiennej.

function streamToString (stream) {
  const chunks = []
  return new Promise((resolve, reject) => {
    stream.on('data', chunk => chunks.push(chunk))
    stream.on('error', reject)
    stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')))
  })
}

const result = await streamToString(stream)

Jestem naprawdę nowego do strumieni i obietnic, a ja dostaję ten błąd: SyntaxError: await is only valid in async function. Co ja robię źle?
JohnK

Musisz wywołać funkcję streamtostring w ramach funkcji asynchronicznej. Aby tego uniknąć, możesz również zrobićstreamToString(stream).then(function(response){//Do whatever you want with response});
Enclo Creations

23
To powinna być najlepsza odpowiedź. Gratulujemy stworzenia jedynego rozwiązania, które robi wszystko dobrze, z (1) przechowywaniem fragmentów jako buforów i wywoływaniem tylko .toString("utf8")na końcu, aby uniknąć problemu błędu dekodowania, jeśli fragment zostanie podzielony w środku znaku wielobajtowego; (2) faktyczna obsługa błędów; (3) umieszczenie kodu w funkcji, aby można go było ponownie wykorzystać, a nie skopiować i wkleić; (4) użycie Promises, aby można było włączyć funkcję await; (5) mały kod, który nie przeciąga miliona zależności, w przeciwieństwie do niektórych bibliotek npm; (6) Składnia ES6 i nowoczesne najlepsze praktyki.
MultiplyByZer0

Dlaczego nie przenieść tablicy porcji do obietnicy?
Jenny O'Reilly

1
Po tym, jak wymyśliłem zasadniczo ten sam kod, używając bieżącej najlepszej odpowiedzi jako wskazówki, zauważyłem, że powyższy kod może się nie powieść, Uncaught TypeError [ERR_INVALID_ARG_TYPE]: The "list[0]" argument must be an instance of Buffer or Uint8Array. Received type stringjeśli strumień stringzamiast tego tworzy fragmenty Buffer. Używanie chunks.push(Buffer.from(chunk))powinno działać zarówno z fragmentami, jak stringi Buffer.
Andrei LED

67

Żadne z powyższych nie działało dla mnie. Musiałem użyć obiektu Buffer:

  const chunks = [];

  readStream.on("data", function (chunk) {
    chunks.push(chunk);
  });

  // Send the buffer or you can put it into a var
  readStream.on("end", function () {
    res.send(Buffer.concat(chunks));
  });

7
to właściwie najczystszy sposób na zrobienie tego;)
Ivo

7
Działa świetnie. Uwaga: jeśli chcesz mieć odpowiedni typ ciągu, musisz wywołać .toString () na wynikowym obiekcie Buffer z wywołania concat ()
Bryan Johnson

64

Mam nadzieję, że jest to bardziej przydatne niż powyższa odpowiedź:

var string = '';
stream.on('data',function(data){
  string += data.toString();
  console.log('stream data ' + part);
});

stream.on('end',function(){
  console.log('final output ' + string);
});

Zwróć uwagę, że konkatenacja ciągów nie jest najbardziej wydajnym sposobem zbierania części ciągów, ale jest używana dla uproszczenia (i być może twój kod nie dba o wydajność).

Ponadto kod ten może powodować nieprzewidywalne błędy w przypadku tekstu innego niż ASCII (zakłada, że ​​każdy znak mieści się w bajcie), ale być może też Cię to nie obchodzi.


4
Jaki byłby bardziej efektywny sposób zbierania części strun? TY
sean2078

2
możesz użyć bufora docs.nodejitsu.com/articles/advanced/buffers/how-to-use-buffers, ale to naprawdę zależy od twojego zastosowania.
Tom Carchrae

2
Użyj tablicy ciągów, w której dołączasz każdy nowy fragment do tablicy i wywołaj join("")tablicę na końcu.
Valeriu Paloş

14
To nie jest w porządku. Jeśli bufor znajduje się w połowie wielobajtowego punktu kodowego, to toString () otrzyma zniekształcony utf-8, a otrzymasz kilka w ciągu.
alextgordon

2
@alextgordon ma rację. W bardzo rzadkich przypadkach, gdy miałem dużo porcji, otrzymywałem te - na początku i na końcu porcji. Zwłaszcza tam, gdzie na krawędziach znajdują się rosyjskie symbole. Dlatego poprawne jest łączenie fragmentów i konwertowanie ich na końcu zamiast konwertowania fragmentów i łączenia ich. W moim przypadku żądanie zostało wysłane z jednej usługi do drugiej z request.js z domyślnym kodowaniem
Mike Yermolayev

21

Zwykle używam tej prostej funkcji, aby przekształcić strumień w ciąg:

function streamToString(stream, cb) {
  const chunks = [];
  stream.on('data', (chunk) => {
    chunks.push(chunk.toString());
  });
  stream.on('end', () => {
    cb(chunks.join(''));
  });
}

Przykład użycia:

let stream = fs.createReadStream('./myFile.foo');
streamToString(stream, (data) => {
  console.log(data);  // data is now my string variable
});

1
Przydatna odpowiedź, ale wygląda na to, że każdy fragment musi zostać przekonwertowany na ciąg, zanim zostanie umieszczony w tablicy:chunks.push(chunk.toString());
Nicolas Le Thierry d'Ennequin

1
To jedyny, który działał dla mnie! Wielkie dzięki
538ROMEO

1
To była świetna odpowiedź!
Aft3rL1f3

12

I jeszcze jeden dla stringów używających obietnic:

function getStream(stream) {
  return new Promise(resolve => {
    const chunks = [];

    # Buffer.from is required if chunk is a String, see comments
    stream.on("data", chunk => chunks.push(Buffer.from(chunk)));
    stream.on("end", () => resolve(Buffer.concat(chunks).toString()));
  });
}

Stosowanie:

const stream = fs.createReadStream(__filename);
getStream(stream).then(r=>console.log(r));

usuń, .toString()aby używać z danymi binarnymi, jeśli to konieczne.

aktualizacja : @AndreiLED poprawnie wskazał, że ma to problemy ze stringami. Nie mogłem uzyskać strumienia zwracającego ciągi znaków z wersją węzła, którą mam, ale interfejs API zauważa, że ​​jest to możliwe.


Zauważyłem, że powyższy kod może się nie powieść, Uncaught TypeError [ERR_INVALID_ARG_TYPE]: The "list[0]" argument must be an instance of Buffer or Uint8Array. Received type stringjeśli strumień stringzamiast Buffer. Używanie chunks.push(Buffer.from(chunk))powinno działać zarówno z fragmentami, jak stringi Buffer.
Andrei LED

dobra uwaga, zaktualizowałem odpowiedź. Dzięki.
estani

8

Z dokumentacji nodejs powinieneś to zrobić - zawsze pamiętaj o łańcuchu, nie wiedząc, że kodowanie to tylko kilka bajtów:

var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
})

6

Strumienie nie mają prostej .toString()funkcji (którą rozumiem) ani czegoś w rodzaju .toStringAsync(cb)funkcji (której nie rozumiem).

Stworzyłem więc własną funkcję pomocniczą:

var streamToString = function(stream, callback) {
  var str = '';
  stream.on('data', function(chunk) {
    str += chunk;
  });
  stream.on('end', function() {
    callback(str);
  });
}

// how to use:
streamToString(myStream, function(myStr) {
  console.log(myStr);
});

4

Miałem więcej szczęścia używając w ten sposób:

let string = '';
readstream
    .on('data', (buf) => string += buf.toString())
    .on('end', () => console.log(string));

Używam węzła v9.11.1i readstreamjest odpowiedzią z http.getwywołania zwrotnego.


3

Najczystszym rozwiązaniem może być użycie pakietu „string-stream”, który konwertuje strumień na łańcuch z obietnicą.

const streamString = require('stream-string')

streamString(myStream).then(string_variable => {
    // myStream was converted to a string, and that string is stored in string_variable
    console.log(string_variable)

}).catch(err => {
     // myStream emitted an error event (err), so the promise from stream-string was rejected
    throw err
})

3

Łatwy sposób dzięki popularnej (ponad 5 mln pobrań tygodniowo) i lekkiej bibliotece Get Stream :

https://www.npmjs.com/package/get-stream

const fs = require('fs');
const getStream = require('get-stream');

(async () => {
    const stream = fs.createReadStream('unicorn.txt');
    console.log(await getStream(stream)); //output is string
})();

2

A co z czymś takim jak reduktor strumienia?

Oto przykład użycia klas ES6, jak z nich korzystać.

var stream = require('stream')

class StreamReducer extends stream.Writable {
  constructor(chunkReducer, initialvalue, cb) {
    super();
    this.reducer = chunkReducer;
    this.accumulator = initialvalue;
    this.cb = cb;
  }
  _write(chunk, enc, next) {
    this.accumulator = this.reducer(this.accumulator, chunk);
    next();
  }
  end() {
    this.cb(null, this.accumulator)
  }
}

// just a test stream
class EmitterStream extends stream.Readable {
  constructor(chunks) {
    super();
    this.chunks = chunks;
  }
  _read() {
    this.chunks.forEach(function (chunk) { 
        this.push(chunk);
    }.bind(this));
    this.push(null);
  }
}

// just transform the strings into buffer as we would get from fs stream or http request stream
(new EmitterStream(
  ["hello ", "world !"]
  .map(function(str) {
     return Buffer.from(str, 'utf8');
  })
)).pipe(new StreamReducer(
  function (acc, v) {
    acc.push(v);
    return acc;
  },
  [],
  function(err, chunks) {
    console.log(Buffer.concat(chunks).toString('utf8'));
  })
);

1

To zadziałało dla mnie i jest oparte na dokumentacji Node v6.7.0 :

let output = '';
stream.on('readable', function() {
    let read = stream.read();
    if (read !== null) {
        // New stream data is available
        output += read.toString();
    } else {
        // Stream is now finished when read is null.
        // You can callback here e.g.:
        callback(null, output);
    }
});

stream.on('error', function(err) {
  callback(err, null);
})

1

setEncoding ('utf8');

Dobra robota, Sebastian J. powyżej.

Miałem „problem z buforem” z kilkoma wierszami kodu testowego, które miałem, dodałem informacje o kodowaniu i rozwiązałem go, patrz poniżej.

Zademonstruj problem

oprogramowanie

// process.stdin.setEncoding('utf8');
process.stdin.on('data', (data) => {
    console.log(typeof(data), data);
});

Wejście

hello world

wynik

object <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64 0d 0a>

Zademonstruj rozwiązanie

oprogramowanie

process.stdin.setEncoding('utf8'); // <- Activate!
process.stdin.on('data', (data) => {
    console.log(typeof(data), data);
});

Wejście

hello world

wynik

string hello world

1

Wszystkie wymienione odpowiedzi wydają się otwierać strumień do odczytu w trybie płynnym, który nie jest domyślny w NodeJS i może mieć ograniczenia, ponieważ nie ma wsparcia dla ciśnienia wstecznego, które NodeJS zapewnia w trybie wstrzymanego odczytu strumienia. Oto implementacja wykorzystująca Just Buffers, Native Stream i Native Stream Transforms oraz wsparcie dla Object Mode

import {Transform} from 'stream';

let buffer =null;    

function objectifyStream() {
    return new Transform({
        objectMode: true,
        transform: function(chunk, encoding, next) {

            if (!buffer) {
                buffer = Buffer.from([...chunk]);
            } else {
                buffer = Buffer.from([...buffer, ...chunk]);
            }
            next(null, buffer);
        }
    });
}

process.stdin.pipe(objectifyStream()).process.stdout

1

Co o tym myślisz ?

// lets a ReadableStream under stream variable 
const chunks = [];

for await (let chunk of stream) {
    chunks.push(chunk)
}

const buffer  = Buffer.concat(chunks);
const str = buffer.toString("utf-8")

Działa, bardzo czysto, bez zależności, fajnie!
ViRuSTriNiTy

0

Korzystając z dość popularnego stream-bufferspakietu, który prawdopodobnie masz już w zależnościach projektu, jest to dość proste:

// imports
const { WritableStreamBuffer } = require('stream-buffers');
const { promisify } = require('util');
const { createReadStream } = require('fs');
const pipeline = promisify(require('stream').pipeline);

// sample stream
let stream = createReadStream('/etc/hosts');

// pipeline the stream into a buffer, and print the contents when done
let buf = new WritableStreamBuffer();
pipeline(stream, buf).then(() => console.log(buf.getContents().toString()));

0

W moim przypadku nagłówki odpowiedzi typu zawartości to Content-Type: text / plain . Czytałem więc dane z bufora, takie jak:

let data = [];
stream.on('data', (chunk) => {
 console.log(Buffer.from(chunk).toString())
 data.push(Buffer.from(chunk).toString())
});
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.