Why ESB + BPM?

While should we use an BPM like Activiti with an ESB like Mule ESB?

Well, a BPM is a management approach focused on aligning all aspects of an organization with the wants and needs of clients. A BPM helps business people to define their organizational’s process and users are able to participate in those process by providing input and actions. However on several cases a process may need to interact with different systems that may already exist in the organization; that’s where ESBs are a strong player. Therefore the integration between both products seems reasonable in this case.

A typical scenario would be starting BPM process using the messages stored in a JMS queue. Obviously, we don’t want to duplicate the functionality already present in a ESB into a BPM just to read JMS messages. A picture of this scenario is shown in Fig. 1.

Using a ESB as a source for starting/manipulating processes
Fig. 1. Using a ESB as a source for starting/manipulating processes

A different but typical scenario in enterprise applications is dealing with existing components of the organization. For example, while we run a process we need to call an existing EJB to execute some business logic (e.g. to charge a credit card) and the EJB could exists in the ESB space. We expect to easily expose the EJB using the infrastructure existing on the ESB to be used by the BPM. A picture of this scenario is shown in Fig. 2.

Using a ESB to connect existing components
Fig. 2. Using a ESB to connect existing components

We must notice that there is a difference between both scenarios; in the first case, the ESB is used as a source for starting the process and it doesn’t play an important role in the process flow (the ESB is more passive). In the second scenario the call to the EJB is key in the process flow and therefore a bit more critical (the ESB is more active).

As a summary the two scenarios described above use the ESB as:

  • A source for starting/manipulating processes.
  • To connect existing components like EJBs, Web services with the BPM.

Am I missing any scenarios? Feel free to comment. I hope to post how to cover both scenarios using Activiti and Mule soon.

Input streams aren’t lazy enough?

Motivation

While I was working in the refactoring of the PGP module of Mule, I faced the problem that input streams weren’t lazy enough for me. Let me introduce the scenario; for those of you who are not familiar with Mule, Mule is a lightweight Java-based enterprise service bus (ESB) and integration platform that allows developers to connect applications together quickly and easily, enabling them to exchange data. Data is carried in messages that contain a payload (data, e.g. the contents of a file) and can be transformed and written to an endpoint (a different directory, post the information to a URL, etc).

In many situations the payload of the messages could be huge enough and may not fit into memory. To avoid this problem, Mule allows the developer to configure the messages in streaming mode. This works fine for small streams and for big streams that don’t need to be transformed. However when we need to transform those streams without having to write a temporal copy to disk, we get an OutOfMemoryException exception as Mule tries to load the contents into memory to perform the transformation. That was my scenario, I need to encrypt big files but without having to write temporary files to disk. A big picture of the scenario is shown in Fig.1 where you can see that files in the form of messages are polled from a directory, move through a flow and occasionally encrypted and written to disk.

Scenario
Fig. 1. Scenario

Proposal

The main idea of the solution is to have an intermediate object that wraps the original input stream and will start the encryption process only when other object needs to read something from the stream. I called this object LazyTransformedInputStream which represents an input stream that is going to be lazy transformed. The first proposal was configuring this object with a transformation that will perform the encryption. The idea of having a transformation is because we would like to have lazy decryptions or any other lazy transformation (e.g. compression). Fig. 2 shows the first sketch of the classes:

First sketch of classes
Fig. 2. First sketch of classes

Everything was perfect in theory but in practice I had some problems; when I tried the solution using the Bouncy castle PGP implementation I realized that on the decryption case it was not going to work. 100 encrypted bytes may represent a fewer number of decrypted bytes (e.g. 30 bytes) and features like encryption + compression may required more bytes to be read. On the other hand, a simple transformation like increasing by 1 the byte read will produce 1 byte and therefore will be easier to implement. In general, this scenario may happened on more cases like compression; therefore I modeled both scenarios as different TransformPolicy subclasses. Each policy works different regarding how many bytes are transformed:

  • TransformPerRequestPolicy: will transform only the bytes requested.
  • TransformPerRequestInChunksPolicy: will transform only the bytes requested but in chunks. For example if we configure this policy with a chunk size of 10 bytes and some object requests 2 bytes, the policy will force to transform 8 bytes more (or until the end of the stream).
  • TransformContinuouslyPolicy: will transform all the bytes without waiting them to be requested (decryption case).

You can see a summary of the classes in Fig. 3 and you can check the behavior of the classes in LazyTransformedInputStreamTestCase test case.

Final version of the classes
Fig. 3. Final version of the classes

Technical aspects

Last but not least, we have to mention a small but important thing about how LazyTransformedInputStream works. On the one hand, there is an object the reads from the transformed stream and needs to be delayed when the information has not been transformed. On the other hand we have the transformation process that should run on a different thread. Trying to execute both processes in the same thread will probably cause deadlock.

Internally, each TransformPolicy has a thread that is the one who executes the transformation code. We reuse the existing PipedInputStream and PipedOutputStream of java.io for this task. The object who wants to read the transformed stream will read from the PipedInputStream and the transformation will output the transformed bytes into a PipedOutputStream instance. In addition, the transforming thread needs to be kept alive till the object that consumes the PipedInputStream does not want more bytes so that the pipe doesn’t break. In our implementation, we terminate the thread either when the LazyTransformedInputStream is closed or when the LazyTransformedInputStream is finalized.

We did a memory profiling of the implementation with great results in the PGP module and can be found here.

Hope you enjoy the post and looking forward to hearing your comments!!!