Want to tweet from your Activiti process?

Any BPM (specially Activiti) has integration requirements. For instance, in a customer order we may have multiple products and the customer may want to make public its purchased in his Twitter wall. So how can we publish this information in Twitter? Using the Mule’s Activiti module and the upcoming version of Activiti 5.6.

In our case, the process order will have a send task that invokes Mule to publish the information in Twitter. For the sake of conciseness we will keep the process definition simply:

The sendTask invokes the mule endpoint vm://tweetFromActiviti which will receive the message “I have just purchased a #{productName} from Mulesoft.com” (where #{productName} refers to the value productName in the context of the process).

Then we need to define the endpoint in the Mule’s side; the flow is simple and is shown as follows:

The twitter:update-status tag is the one that executes the update status to the user’s twitter wall. For simplicity we use the same user on each status update; all this information is configured in the twitter connector in Mule:

That’s it; so simple! Check our user’s twitter wall.
Want to check a full running example? Check the full project with tests included at our GitHub repository.
Also check the documentation of the Twitter cloud connector for further configuration details.

Testing multiple versions of your code with Ant and Maven

Did you face the problem that your code must work with 2 or more different versions of a different component? I did.

While I was developing version 3.2 of Mule’s Activiti module I need to check that it works with version Activiti 5.3 and 5.4 (and all the upcoming versions as these guys release every month!).

The steps I need to follow for each version are:

  1. Stop, clean and start Activiti server.
  2. Deploy some examples to the Activiti server.
  3. Run all the tests of my Maven project.

So I create a build file for these steps (you can see the full file here). Below I show the call to the maven script from ant:

Notice that the ant script receives a parameter called activiti.version that is passed to maven to select the version of their dependencies in the pom.xml file:

Hope you start to automate your testing across multiple versions soon!

Activiti + Mule ESB – Part 2

In the first part of this post we show how to start an Activiti process from Mule ESB using the new Mule’s activiti module. In this part we are going to show how to call a Web Service from Activiti that is going to be contained in Mule. Notice that you can expose anything as a Web service in Mule such as: beans, queues, etc.

Activiti configuration

First of all we need to create our proces. It will receive a request containing the username that has requested the image and the image name. The process will work as follows: check if the image needs copyright; if it needs copyright then allow the owner to approve or disapprove the request. If the request is approved, then charge the requester otherwise cancel it. In case that the image doesn’t need approval, just charge the requester with the cost.

Continue reading

Activiti + Mule ESB – Part 1

We are pleased to announce Mule 3 Activiti support. We’re very happy to be working with the Activiti team and see a lot of value for our community using Mule and Activiti together. I already discussed why BPM and ESB need to work together and two different uses of ESBs with BPM. Today I am going to present a real world example.

For those that may not be familiar, Actiiviti is an open source BPMN 2.0 modeling tool and BPM runtime, founded by Alfresco and developed with other open source companies.

Getting Started

My example application uses Mule ESB and Activiti allowing users to view low-resolution images and request a high-resolution version that needs to be approved by a human participant (similar to Stock.XCHNG). Mule will handle the creation of the Activiti process and will take care of exposing it as a service. Activiti will take care of orchestrating the process that requires a user task (approval).

Continue reading

Why ESB + BPM?

While should we use an BPM like Activiti with an ESB like Mule ESB?

Well, a BPM is a management approach focused on aligning all aspects of an organization with the wants and needs of clients. A BPM helps business people to define their organizational’s process and users are able to participate in those process by providing input and actions. However on several cases a process may need to interact with different systems that may already exist in the organization; that’s where ESBs are a strong player. Therefore the integration between both products seems reasonable in this case.

A typical scenario would be starting BPM process using the messages stored in a JMS queue. Obviously, we don’t want to duplicate the functionality already present in a ESB into a BPM just to read JMS messages. A picture of this scenario is shown in Fig. 1.

Using a ESB as a source for starting/manipulating processes
Fig. 1. Using a ESB as a source for starting/manipulating processes

A different but typical scenario in enterprise applications is dealing with existing components of the organization. For example, while we run a process we need to call an existing EJB to execute some business logic (e.g. to charge a credit card) and the EJB could exists in the ESB space. We expect to easily expose the EJB using the infrastructure existing on the ESB to be used by the BPM. A picture of this scenario is shown in Fig. 2.

Using a ESB to connect existing components
Fig. 2. Using a ESB to connect existing components

We must notice that there is a difference between both scenarios; in the first case, the ESB is used as a source for starting the process and it doesn’t play an important role in the process flow (the ESB is more passive). In the second scenario the call to the EJB is key in the process flow and therefore a bit more critical (the ESB is more active).

As a summary the two scenarios described above use the ESB as:

  • A source for starting/manipulating processes.
  • To connect existing components like EJBs, Web services with the BPM.

Am I missing any scenarios? Feel free to comment. I hope to post how to cover both scenarios using Activiti and Mule soon.

Input streams aren’t lazy enough?

Motivation

While I was working in the refactoring of the PGP module of Mule, I faced the problem that input streams weren’t lazy enough for me. Let me introduce the scenario; for those of you who are not familiar with Mule, Mule is a lightweight Java-based enterprise service bus (ESB) and integration platform that allows developers to connect applications together quickly and easily, enabling them to exchange data. Data is carried in messages that contain a payload (data, e.g. the contents of a file) and can be transformed and written to an endpoint (a different directory, post the information to a URL, etc).

In many situations the payload of the messages could be huge enough and may not fit into memory. To avoid this problem, Mule allows the developer to configure the messages in streaming mode. This works fine for small streams and for big streams that don’t need to be transformed. However when we need to transform those streams without having to write a temporal copy to disk, we get an OutOfMemoryException exception as Mule tries to load the contents into memory to perform the transformation. That was my scenario, I need to encrypt big files but without having to write temporary files to disk. A big picture of the scenario is shown in Fig.1 where you can see that files in the form of messages are polled from a directory, move through a flow and occasionally encrypted and written to disk.

Scenario
Fig. 1. Scenario

Proposal

The main idea of the solution is to have an intermediate object that wraps the original input stream and will start the encryption process only when other object needs to read something from the stream. I called this object LazyTransformedInputStream which represents an input stream that is going to be lazy transformed. The first proposal was configuring this object with a transformation that will perform the encryption. The idea of having a transformation is because we would like to have lazy decryptions or any other lazy transformation (e.g. compression). Fig. 2 shows the first sketch of the classes:

First sketch of classes
Fig. 2. First sketch of classes

Everything was perfect in theory but in practice I had some problems; when I tried the solution using the Bouncy castle PGP implementation I realized that on the decryption case it was not going to work. 100 encrypted bytes may represent a fewer number of decrypted bytes (e.g. 30 bytes) and features like encryption + compression may required more bytes to be read. On the other hand, a simple transformation like increasing by 1 the byte read will produce 1 byte and therefore will be easier to implement. In general, this scenario may happened on more cases like compression; therefore I modeled both scenarios as different TransformPolicy subclasses. Each policy works different regarding how many bytes are transformed:

  • TransformPerRequestPolicy: will transform only the bytes requested.
  • TransformPerRequestInChunksPolicy: will transform only the bytes requested but in chunks. For example if we configure this policy with a chunk size of 10 bytes and some object requests 2 bytes, the policy will force to transform 8 bytes more (or until the end of the stream).
  • TransformContinuouslyPolicy: will transform all the bytes without waiting them to be requested (decryption case).

You can see a summary of the classes in Fig. 3 and you can check the behavior of the classes in LazyTransformedInputStreamTestCase test case.

Final version of the classes
Fig. 3. Final version of the classes

Technical aspects

Last but not least, we have to mention a small but important thing about how LazyTransformedInputStream works. On the one hand, there is an object the reads from the transformed stream and needs to be delayed when the information has not been transformed. On the other hand we have the transformation process that should run on a different thread. Trying to execute both processes in the same thread will probably cause deadlock.

Internally, each TransformPolicy has a thread that is the one who executes the transformation code. We reuse the existing PipedInputStream and PipedOutputStream of java.io for this task. The object who wants to read the transformed stream will read from the PipedInputStream and the transformation will output the transformed bytes into a PipedOutputStream instance. In addition, the transforming thread needs to be kept alive till the object that consumes the PipedInputStream does not want more bytes so that the pipe doesn’t break. In our implementation, we terminate the thread either when the LazyTransformedInputStream is closed or when the LazyTransformedInputStream is finalized.

We did a memory profiling of the implementation with great results in the PGP module and can be found here.

Hope you enjoy the post and looking forward to hearing your comments!!!

Announcing Mule’s Activiti transport

Hi all!
A couple of weeks ago I started the integration between Mule ESB and Activiti BPMN. Activiti is a Business Process Management (BPM) and workflow system targeted at business people, developers and system admins. Its core is a super-fast and rock-solid BPMN 2 process engine for Java. It’s open-source and distributed under the Apache license. Activiti is being written from the ground up with the requirement that the engine should be easily embedded and that the project would be aimed at developers not the business. This is a very natural fit for MuleSoft since our approach to middleware starts with the developer. We have been working with the Activiti guys since July focusing in the integration aspects of the project.

The idea behind Mule’s Activiti transport is that you are able to perform several administrative actions in an Activiti server from Mule. For example, let’s suppose that you would like to retrieve the candidate tasks of the user kermit, do some processing and CLAIM a subset of those tasks. The following configuration will work for you:

[sourcecode language=”xml”]
<activiti:connector name="actServer"
activitiServerURL="http://localhost:8080/activiti-rest/service/"
username="kermit" password="kermit" />

<model>
<service name="Activiti service">
<inbound>
<activiti:inbound-endpoint activiti-server="actServer">
<activiti:list-candidate-tasks user="kermit" />
</activiti:inbound-endpoint>
</inbound>

<component class="org.mule.transport.activiti.SelectTaskComponent" />

<outbound>
<multicasting-router>
<vm:outbound-endpoint path="out" connector-ref="asynchVm" />
<activiti:outbound-endpoint activiti-server="actServer" >
<activiti:perform-task-operation operation="CLAIM" />
</activiti:outbound-endpoint>
</multicasting-router>
</outbound>
</service>
</model>
[/sourcecode]

We next describe each part of the configuration, but for those of you who are familiar with Mule, the configuration is pretty straightforward.

Let’s start with the basics; for configuring the transport you need a connector. The connector will hold all the information related with the Activiti server such as server URL, administrative user and password, e.g:

[sourcecode language=”xml”]
<activiti:connector
name="actServer"
activitiServerURL="http://localhost:8080/activiti-rest/service/"
username="kermit"
password="kermit" />
[/sourcecode]

Then you can configure an inbound endpoint for polling information from the Activiti server. For example the following endpoint polls the list of candidate tasks of the user pablo.

[sourcecode language=”xml”]
<activiti:inbound-endpoint activiti-server="actServer">
<activiti:list-candidate-tasks user="pablo" />
</activiti:inbound-endpoint>
[/sourcecode]

Finally, you can execute operations in the outbound endpoints. For example, the following endpoint executes the operation claim over the task that comes inside the message. The payload of the message must be a map which must contain a task id and a operation. If the operation inside the map is null then the default operation configured in the endpoint is used.

[sourcecode language=”xml”]
<activiti:outbound-endpoint activiti-server="actServer" >
<activiti:perform-task-operation operation="CLAIM" />
</activiti:outbound-endpoint>
[/sourcecode]

For further examples please check the following configuration files in the SVN:

You can obtain a snapshot version of the transport from here. Currently the transport has been tested with Mule 2.2.6 and shortly with Mule 3.0.

In future releases we plan to cover more operations that could be perform on the server and let the user provide custom spring beans for operations that are not covered by the implementation.

Looking forward to hearing your comments!