Przeczytaj strumień dwukrotnie


127

Jak odczytujesz dwa razy ten sam strumień wejściowy? Czy da się to jakoś skopiować?

Muszę pobrać obraz z Internetu, zapisać go lokalnie, a następnie zwrócić zapisany obraz. Po prostu pomyślałem, że szybsze będzie użycie tego samego strumienia zamiast rozpoczynania nowego strumienia do pobranej zawartości, a następnie przeczytania go ponownie.


1
Może użyj znaku i zresetuj
Wiaczesław Szylkin

Odpowiedzi:


114

Możesz użyć, org.apache.commons.io.IOUtils.copyaby skopiować zawartość InputStream do tablicy bajtów, a następnie wielokrotnie czytać z tablicy bajtów przy użyciu ByteArrayInputStream. Na przykład:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
org.apache.commons.io.IOUtils.copy(in, baos);
byte[] bytes = baos.toByteArray();

// either
while (needToReadAgain) {
    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    yourReadMethodHere(bais);
}

// or
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
while (needToReadAgain) {
    bais.reset();
    yourReadMethodHere(bais);
}

1
Myślę, że jest to jedyne prawidłowe rozwiązanie, ponieważ znak nie jest obsługiwany dla wszystkich typów.
Warpzit

3
@Paul Grime: IOUtils.toByeArray wewnętrznie wywołuje metodę copy również od wewnątrz.
Ankit

4
Jak mówi @Ankit, to rozwiązanie nie jest dla mnie ważne, ponieważ dane wejściowe są odczytywane wewnętrznie i nie można ich ponownie użyć.
Xtreme Biker

30
Wiem, że ten komentarz jest spóźniony, ale tutaj w pierwszej opcji, jeśli odczytujesz strumień wejściowy jako tablicę bajtów, czy nie oznacza to, że ładujesz wszystkie dane do pamięci? co może być dużym problemem, jeśli ładujesz coś takiego jak duże pliki?
jaxkodex,

2
Można by użyć IOUtils.toByteArray (InputStream), aby uzyskać tablicę bajtów w jednym wywołaniu.
przydatne

30

W zależności od tego, skąd pochodzi InputStream, możesz nie być w stanie go zresetować. Możesz sprawdzić, czy mark()i czy reset()są obsługiwane przy użyciu markSupported().

Jeśli tak jest, możesz wywołać reset()InputStream, aby powrócić do początku. Jeśli nie, musisz ponownie odczytać InputStream ze źródła.


1
InputStream nie obsługuje „mark” - możesz wywołać znak w IS, ale nic nie robi. Podobnie wywołanie resetowania w IS spowoduje zgłoszenie wyjątku.
ayahuasca

4
@ayahuasca InputStreampodklasy, takie jak BufferedInputStreamobsługuje ` ` znak ''
Dmitry Bogdanovich

10

jeśli InputStreamwspierasz używanie znaku, możesz mark()swój inputStream, a następnie reset(). jeśli twój InputStremnie obsługuje znaku, możesz użyć klasy java.io.BufferedInputStream, więc możesz osadzić swój strumień w BufferedInputStreampodobnym

    InputStream bufferdInputStream = new BufferedInputStream(yourInputStream);
    bufferdInputStream.mark(some_value);
    //read your bufferdInputStream 
    bufferdInputStream.reset();
    //read it again

1
Buforowany strumień wejściowy może oznaczać tylko z powrotem do rozmiaru bufora, więc jeśli źródło nie pasuje, nie możesz wrócić do początku.
L. Blanc,

@ L.Blanc przepraszam, ale to nie wydaje się poprawne. Spójrz BufferedInputStream.fill(), jest sekcja „powiększ bufor”, w której nowy rozmiar bufora jest porównywany tylko z marklimiti MAX_BUFFER_SIZE.
eugene82

8

Możesz zawijać strumień wejściowy za pomocą PushbackInputStream. PushbackInputStream umożliwia nieprzeczytanie („ zapis z powrotem ”) bajtów, które zostały już odczytane, więc możesz to zrobić:

public class StreamTest {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's wrap it with PushBackInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new PushbackInputStream(originalStream, 10); // 10 means that maximnum 10 characters can be "written back" to the stream

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    ((PushbackInputStream) wrappedStream).unread(readBytes, 0, readBytes.length);

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3


  }

  private static byte[] getBytes(InputStream is, int howManyBytes) throws IOException {
    System.out.print("Reading stream: ");

    byte[] buf = new byte[howManyBytes];

    int next = 0;
    for (int i = 0; i < howManyBytes; i++) {
      next = is.read();
      if (next > 0) {
        buf[i] = (byte) next;
      }
    }
    return buf;
  }

  private static void printBytes(byte[] buffer) throws IOException {
    System.out.print("Reading stream: ");

    for (int i = 0; i < buffer.length; i++) {
      System.out.print(buffer[i] + " ");
    }
    System.out.println();
  }


}

Zwróć uwagę, że PushbackInputStream przechowuje wewnętrzny bufor bajtów, więc naprawdę tworzy bufor w pamięci, który przechowuje bajty „zapisane z powrotem”.

Znając to podejście, możemy pójść dalej i połączyć je z FilterInputStream. FilterInputStream przechowuje oryginalny strumień wejściowy jako delegata. Pozwala to na utworzenie nowej definicji klasy, która pozwala na automatyczne „ nieprzeczytanie ” oryginalnych danych. Definicja tej klasy jest następująca:

public class TryReadInputStream extends FilterInputStream {
  private final int maxPushbackBufferSize;

  /**
  * Creates a <code>FilterInputStream</code>
  * by assigning the  argument <code>in</code>
  * to the field <code>this.in</code> so as
  * to remember it for later use.
  *
  * @param in the underlying input stream, or <code>null</code> if
  *           this instance is to be created without an underlying stream.
  */
  public TryReadInputStream(InputStream in, int maxPushbackBufferSize) {
    super(new PushbackInputStream(in, maxPushbackBufferSize));
    this.maxPushbackBufferSize = maxPushbackBufferSize;
  }

  /**
   * Reads from input stream the <code>length</code> of bytes to given buffer. The read bytes are still avilable
   * in the stream
   *
   * @param buffer the destination buffer to which read the data
   * @param offset  the start offset in the destination <code>buffer</code>
   * @aram length how many bytes to read from the stream to buff. Length needs to be less than
   *        <code>maxPushbackBufferSize</code> or IOException will be thrown
   *
   * @return number of bytes read
   * @throws java.io.IOException in case length is
   */
  public int tryRead(byte[] buffer, int offset, int length) throws IOException {
    validateMaxLength(length);

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int bytesRead = 0;

    int nextByte = 0;

    for (int i = 0; (i < length) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        buffer[offset + bytesRead++] = (byte) nextByte;
      }
    }

    if (bytesRead > 0) {
      ((PushbackInputStream) in).unread(buffer, offset, bytesRead);
    }

    return bytesRead;

  }

  public byte[] tryRead(int maxBytesToRead) throws IOException {
    validateMaxLength(maxBytesToRead);

    ByteArrayOutputStream baos = new ByteArrayOutputStream(); // as ByteArrayOutputStream to dynamically allocate internal bytes array instead of allocating possibly large buffer (if maxBytesToRead is large)

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int nextByte = 0;

    for (int i = 0; (i < maxBytesToRead) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        baos.write((byte) nextByte);
      }
    }

    byte[] buffer = baos.toByteArray();

    if (buffer.length > 0) {
      ((PushbackInputStream) in).unread(buffer, 0, buffer.length);
    }

    return buffer;

  }

  private void validateMaxLength(int length) throws IOException {
    if (length > maxPushbackBufferSize) {
      throw new IOException(
        "Trying to read more bytes than maxBytesToRead. Max bytes: " + maxPushbackBufferSize + ". Trying to read: " +
        length);
    }
  }

}

Ta klasa ma dwie metody. Jeden do odczytu do istniejącego bufora (definicja jest analogiczna do wywołania public int read(byte b[], int off, int len)klasy InputStream). Drugi, który zwraca nowy bufor (może to być bardziej efektywne, jeśli rozmiar bufora do odczytu jest nieznany).

Zobaczmy teraz naszą klasę w akcji:

public class StreamTest2 {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's use our TryReadInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new TryReadInputStream(originalStream, 10);

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // NOTE: no manual call to "unread"(!) because TryReadInputStream handles this internally
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3);
    printBytes(readBytes); // prints 1 2 3

    // we can also call normal read which will actually read the bytes without "writing them back"
    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 4 5 6

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // now we can try read next bytes
    printBytes(readBytes); // prints 7 8 9

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 7 8 9


  }



}

5

Jeśli korzystasz z implementacji InputStream, możesz sprawdzić wynik InputStream#markSupported()i powiedzieć, czy możesz użyć metody mark()/ reset().

Jeśli możesz oznaczyć strumień podczas czytania, zadzwoń, reset()aby wrócić i rozpocząć.

Jeśli nie możesz, będziesz musiał ponownie otworzyć strumień.

Innym rozwiązaniem byłoby przekonwertowanie InputStream na tablicę bajtów, a następnie iterowanie po tablicy tyle razy, ile potrzebujesz. Możesz znaleźć kilka rozwiązań w tym poście Konwertuj InputStream na tablicę bajtów w Javie, używając bibliotek innych firm lub nie. Uwaga, jeśli odczytana zawartość jest zbyt duża, mogą wystąpić problemy z pamięcią.

Na koniec, jeśli potrzebujesz przeczytać obraz, użyj:

BufferedImage image = ImageIO.read(new URL("http://www.example.com/images/toto.jpg"));

Używanie ImageIO#read(java.net.URL)pozwala również na użycie pamięci podręcznej.


1
słowo ostrzeżenia podczas używania ImageIO#read(java.net.URL): niektóre serwery WWW i CDN mogą odrzucać gołe wywołania (tj. bez agenta użytkownika, który sprawia, że ​​serwer uważa, że ​​wywołanie pochodzi z przeglądarki internetowej) wykonane przez ImageIO#read. W takim przypadku URLConnection.openConnection()ustawienie agenta użytkownika na to połączenie + za pomocą `ImageIO.read (InputStream) w większości przypadków załatwi sprawę.
Clint Eastwood

InputStreamnie jest interfejsem
Brice

3

Co powiesz na:

if (stream.markSupported() == false) {

        // lets replace the stream object
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        IOUtils.copy(stream, baos);
        stream.close();
        stream = new ByteArrayInputStream(baos.toByteArray());
        // now the stream should support 'mark' and 'reset'

    }

5
To okropny pomysł. W ten sposób umieszczasz całą zawartość strumienia w pamięci.
Niels Doucet

3

Aby podzielić na InputStreamdwie części, unikając ładowania wszystkich danych do pamięci , a następnie przetwarzania ich niezależnie:

  1. Utwórz OutputStreamdokładnie kilka:PipedOutputStream
  2. Połącz każdy PipedOutputStream z PipedInputStream, to PipedInputStreamsą zwracane InputStream.
  3. Połącz źródło wejściowe InputStream z właśnie utworzonym plikiem OutputStream. Więc wszystko, co przeczytasz ze źródła InputStream, będzie napisane w obu OutputStream. Nie ma takiej potrzeby, ponieważ jest to już zrobione w TeeInputStream(commons.io).
  4. W oddzielnym wątku odczytaj cały źródłowy strumień inputStream, a dane wejściowe są niejawnie przesyłane do docelowych strumieni inputStreams.

    public static final List<InputStream> splitInputStream(InputStream input) 
        throws IOException 
    { 
        Objects.requireNonNull(input);      
    
        PipedOutputStream pipedOut01 = new PipedOutputStream();
        PipedOutputStream pipedOut02 = new PipedOutputStream();
    
        List<InputStream> inputStreamList = new ArrayList<>();
        inputStreamList.add(new PipedInputStream(pipedOut01));
        inputStreamList.add(new PipedInputStream(pipedOut02));
    
        TeeOutputStream tout = new TeeOutputStream(pipedOut01, pipedOut02);
    
        TeeInputStream tin = new TeeInputStream(input, tout, true);
    
        Executors.newSingleThreadExecutor().submit(tin::readAllBytes);  
    
        return Collections.unmodifiableList(inputStreamList);
    }

Pamiętaj, aby zamknąć inputStreams po wykorzystaniu i zamknąć działający wątek: TeeInputStream.readAllBytes()

W przypadku, musisz podzielić go na wieleInputStream , a nie tylko na dwie. Zastąp w poprzednim fragmencie kodu klasę TeeOutputStreamdla własnej implementacji, która hermetyzuje a List<OutputStream>i przesłoni OutputStreaminterfejs:

public final class TeeListOutputStream extends OutputStream {
    private final List<? extends OutputStream> branchList;

    public TeeListOutputStream(final List<? extends OutputStream> branchList) {
        Objects.requireNonNull(branchList);
        this.branchList = branchList;
    }

    @Override
    public synchronized void write(final int b) throws IOException {
        for (OutputStream branch : branchList) {
            branch.write(b);
        }
    }

    @Override
    public void flush() throws IOException {
        for (OutputStream branch : branchList) {
            branch.flush();
        }
    }

    @Override
    public void close() throws IOException {
        for (OutputStream branch : branchList) {
            branch.close();
        }
    }
}

Czy mógłbyś trochę dokładniej wyjaśnić krok 4? Dlaczego musimy uruchamiać czytanie ręcznie? Dlaczego odczyt któregokolwiek z pipedInputStream NIE wyzwala odczytu źródła inputStream? I dlaczego robimy to połączenie asynchronicznie?
Дмитрий Кулешов

2

Konwertuj strumień wejściowy na bajty, a następnie przekaż go do funkcji savefile, w której składasz to samo do strumienia wejściowego. Również w oryginalnej funkcji używaj bajtów do innych zadań


5
Mówię, że to zły pomysł, wynikowa tablica może być ogromna i okradnie urządzenie z pamięci.
Kevin Parker

0

W przypadku, gdy ktoś korzysta z aplikacji Spring Boot i chcesz przeczytać treść odpowiedzi RestTemplate(dlatego chcę dwukrotnie przeczytać strumień), istnieje czysty sposób na zrobienie tego.

Przede wszystkim musisz użyć Springa, StreamUtilsaby skopiować strumień do String:

String text = StreamUtils.copyToString(response.getBody(), Charset.defaultCharset()))

Ale to nie wszystko. Musisz także użyć fabryki żądań, która może buforować strumień, na przykład:

ClientHttpRequestFactory factory = new BufferingClientHttpRequestFactory(new SimpleClientHttpRequestFactory());
RestTemplate restTemplate = new RestTemplate(factory);

Lub, jeśli używasz fasoli fabrycznej, to (to jest Kotlin, ale mimo wszystko):

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createRestTemplate(): RestTemplate = RestTemplateBuilder()
  .requestFactory { BufferingClientHttpRequestFactory(SimpleClientHttpRequestFactory()) }
  .additionalInterceptors(loggingInterceptor)
  .build()

Źródło: https://objectpartners.com/2018/03/01/log-your-resttemplate-request-and-response-without-destroying-the-body/


0

Jeśli używasz RestTemplate do wykonywania połączeń http, po prostu dodaj przechwytywacz. Treść odpowiedzi jest buforowana przez implementację ClientHttpResponse. Teraz strumień wejściowy może być pobierany z odpowiedzi tyle razy, ile potrzebujemy

ClientHttpRequestInterceptor interceptor =  new ClientHttpRequestInterceptor() {

            @Override
            public ClientHttpResponse intercept(HttpRequest request, byte[] body,
                    ClientHttpRequestExecution execution) throws IOException {
                ClientHttpResponse  response = execution.execute(request, body);

                  // additional work before returning response
                  return response 
            }
        };

    // Add the interceptor to RestTemplate Instance 

         restTemplate.getInterceptors().add(interceptor); 
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.