While I was working in the refactoring of the PGP module of Mule, I faced the problem that input streams weren’t lazy enough for me. Let me introduce the scenario; for those of you who are not familiar with Mule, Mule is a lightweight Java-based enterprise service bus (ESB) and integration platform that allows developers to connect applications together quickly and easily, enabling them to exchange data. Data is carried in messages that contain a payload (data, e.g. the contents of a file) and can be transformed and written to an endpoint (a different directory, post the information to a URL, etc).
In many situations the payload of the messages could be huge enough and may not fit into memory. To avoid this problem, Mule allows the developer to configure the messages in streaming mode. This works fine for small streams and for big streams that don’t need to be transformed. However when we need to transform those streams without having to write a temporal copy to disk, we get an OutOfMemoryException exception as Mule tries to load the contents into memory to perform the transformation. That was my scenario, I need to encrypt big files but without having to write temporary files to disk. A big picture of the scenario is shown in Fig.1 where you can see that files in the form of messages are polled from a directory, move through a flow and occasionally encrypted and written to disk.
The main idea of the solution is to have an intermediate object that wraps the original input stream and will start the encryption process only when other object needs to read something from the stream. I called this object LazyTransformedInputStream which represents an input stream that is going to be lazy transformed. The first proposal was configuring this object with a transformation that will perform the encryption. The idea of having a transformation is because we would like to have lazy decryptions or any other lazy transformation (e.g. compression). Fig. 2 shows the first sketch of the classes:
Everything was perfect in theory but in practice I had some problems; when I tried the solution using the Bouncy castle PGP implementation I realized that on the decryption case it was not going to work. 100 encrypted bytes may represent a fewer number of decrypted bytes (e.g. 30 bytes) and features like encryption + compression may required more bytes to be read. On the other hand, a simple transformation like increasing by 1 the byte read will produce 1 byte and therefore will be easier to implement. In general, this scenario may happened on more cases like compression; therefore I modeled both scenarios as different TransformPolicy subclasses. Each policy works different regarding how many bytes are transformed:
- TransformPerRequestPolicy: will transform only the bytes requested.
- TransformPerRequestInChunksPolicy: will transform only the bytes requested but in chunks. For example if we configure this policy with a chunk size of 10 bytes and some object requests 2 bytes, the policy will force to transform 8 bytes more (or until the end of the stream).
- TransformContinuouslyPolicy: will transform all the bytes without waiting them to be requested (decryption case).
You can see a summary of the classes in Fig. 3 and you can check the behavior of the classes in LazyTransformedInputStreamTestCase test case.
Last but not least, we have to mention a small but important thing about how LazyTransformedInputStream works. On the one hand, there is an object the reads from the transformed stream and needs to be delayed when the information has not been transformed. On the other hand we have the transformation process that should run on a different thread. Trying to execute both processes in the same thread will probably cause deadlock.
Internally, each TransformPolicy has a thread that is the one who executes the transformation code. We reuse the existing PipedInputStream and PipedOutputStream of java.io for this task. The object who wants to read the transformed stream will read from the PipedInputStream and the transformation will output the transformed bytes into a PipedOutputStream instance. In addition, the transforming thread needs to be kept alive till the object that consumes the PipedInputStream does not want more bytes so that the pipe doesn’t break. In our implementation, we terminate the thread either when the LazyTransformedInputStream is closed or when the LazyTransformedInputStream is finalized.
We did a memory profiling of the implementation with great results in the PGP module and can be found here.
Hope you enjoy the post and looking forward to hearing your comments!!!