Using a PipedInputstream and PipedOutputstream
Every once in a while, there comes a need to convert an OutputStream into an Inputstream. The PipedInputStream and PipedOutPutstream classes allow one to achieve this. The idea behind the piped stream is that at one end of the pipe, a writer thread writes to a PipedOutputStream. A PipedInputStream thread concurrently reads whatever is written on the other side. Here is an example use case…
Assume that a thread streams random words and another thread needs to read these words and write them to System.out. Lets call the producer of the words, a DataSource, and the client the DataConsumer (which is the consumer of the producer). The DataConsumer has a problem. Some of the words from the DataSource are deemed inappropriate by the DataConsumer. The consumer would rather receive ‘***’ than the inappropriate word.
What would be great, is if we can pass the stream that is generated from the source to a middle-man which will filter out these words and pass them across to the consumer. This is what the DataSource and DataConsumer look like. The DataSource structures the data using XML, not that it is necessary.
DataSource and DataConsumer:
package com.pipe; import java.io.OutputStream; import javax.xml.stream.XMLOutputFactory; import javax.xml.stream.XMLStreamWriter; public class DataSource implements Runnable { private OutputStream outputStream = null; private String [] words = {"crack","hi","you","what","snort","me"}; private XMLStreamWriter writer = null; public DataSource(OutputStream stream) { outputStream = stream; } @Override public void run() { try { XMLOutputFactory output = XMLOutputFactory.newInstance(); writer = output.createXMLStreamWriter( outputStream ); writer.writeStartElement("root"); for (int counter = 0; counter < 100000; counter++) { int result = (int)(Math.random()*10) % 6; String word = words[result]; writer.writeStartElement("word"); writer.writeCharacters(word); writer.writeEndElement(); } writer.writeEndElement(); } catch (Exception e) { e.printStackTrace(); } finally { try { outputStream.flush(); outputStream.close(); } catch (Exception e) { e.printStackTrace(); } } } }
package com.pipe; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; public class DataConsumer implements Runnable { private InputStream inputStream=null; public DataConsumer(InputStream inputStream) { this.inputStream = inputStream; } @Override public void run() { try { BufferedReader bufferedReader = new BufferedReader( new InputStreamReader( inputStream )); String temp=null; while((temp=bufferedReader.readLine())!=null) { System.out.println(temp); } } catch (IOException e) { e.printStackTrace(); } } }
Firstly, the producer and consumer cannot be connected without a pipe. The producer streams data to an OutputStream while the consumer consumes it through its InputStream. They can be connected to each other by passing one end of the pipe to each class.
int BUFFER = 2048; PipedInputStream convertPipe = new PipedInputStream(BUFFER); PipedOutputStream dataPipe = new PipedOutputStream(convertPipe); DataSource dataSource = new DataSource(dataPipe); DataConsumer dataConsumer = new DataConsumer(convertPipe);
When the PipedOutputStream is instantiated, it ‘connects’ with the PipedInputStream. Which is a fancy way of saying, data that is streamed to the PipedOutputStream will be passed on to the PipedInputStream. This is achieved by storing the data in an internal byte[] buffer, whose size can be chosen when instantiating the PipedInputStream.
It is also possible to insert another thread in the middle by attaching one more pipe. The WordWatcher thread, reads from one end of the pipe (PipedInputStream), filters out objectionable words, and writes data to the other end of the pipe(PipedOutputstream). It uses StaX to parse whatever was written (again, not that it matters)
WordWatcher:
package com.pipe; import java.io.InputStream; import java.io.OutputStream; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLStreamConstants; import javax.xml.stream.XMLStreamReader; public class WordWatcher implements Runnable { private InputStream inputStream; private OutputStream outputStream; public WordWatcher(InputStream inputStream, OutputStream outputStream) { this.inputStream = inputStream; this.outputStream = outputStream; } @Override public void run() { try { XMLInputFactory factory = XMLInputFactory.newInstance(); XMLStreamReader reader = factory.createXMLStreamReader(inputStream); while (reader.hasNext()) { int next = reader.next(); switch (next) { case XMLStreamConstants.CHARACTERS: { String text = reader.getText(); if(text.equals("crack")) { text = "*****"; } outputStream.write(text.getBytes()); break; } } } } catch (Exception e) { e.printStackTrace(); } finally { try { outputStream.flush(); outputStream.close(); } catch (Exception e) { e.printStackTrace(); } } } }
The following code connects all the pieces. It creates 2 pipes and passes them along to three threads. The threads read and write to these pipes transparently without knowing what implementation class the InputStream and OutputStream belong to.
Connecting the dots:
package com.pipe; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class Start { public static void main(String... args) { new Start().go(); } public void go() { try { long past = System.currentTimeMillis(); int BUFFER = 2048; PipedInputStream convertPipe = new PipedInputStream(BUFFER); PipedOutputStream dataPipe = new PipedOutputStream(convertPipe); PipedInputStream convertedWordPipe = new PipedInputStream(BUFFER); PipedOutputStream outputPipe = new PipedOutputStream(convertedWordPipe); DataSource dataSource = new DataSource(dataPipe); WordWatcher pipe = new WordWatcher(convertPipe,outputPipe); DataConsumer dataConsumer = new DataConsumer(convertedWordPipe); Thread producerThread = new Thread(dataSource); Thread pipeThread = new Thread(pipe); Thread consumerThread = new Thread(dataConsumer); producerThread.start(); pipeThread.start(); consumerThread.start(); consumerThread.join(); long now = System.currentTimeMillis(); System.out.println("Time taken: " + (now-past) + " ms"); } catch (Exception e) { e.printStackTrace(); } } }
Gotchas:
Flush and close, lest you want a ‘Write end dead’:
When one end of the stream is done writing data, always remember to flush and close the output stream. Failure to do this will result in an IOException with the very helpful message “Write end dead”. The exception is supposed to imply that the thread that was at the end of the pipe writing the data, is now dead. Talk about cryptic messages !
Taking care of the pipeSize buffer:
In the programs above, try reducing the buffer size, say to 16 instead of 2048. Doing this will make the program slam the brakes. In the real world, there can be a latency between a read from one end and a write to the other end of the pipe. If the buffer is not large enough, the pipe will wait for a read before it can fill in the next chunk of data written by the OutputStream. That will inevitably slow down the program.


Thanks for sharing the information!!
Its like making the flow of water smooth without any spilling.
Excellent write-up! This came in handy the other day.
Sprint rocks for three reasons:They approved me, despite my shitty credit rating (we ;re talking below 550, if I ;m even that lucky).Setting up the mobile broadband fake hermes http://hermeshandbags-replica.blogspot.com/