Consume a remote Java method with AMQP using Spring Integration

After the unexpected interest about exposing a method (thank you Josh mentioning it in the weekly news) , I will go ahead. Let’s consume such a service we had exposed.

Remember: We had a service method which is integrated via an AMQP queue (in my case RabbitMQ, but you can change this into whatever you want). That means the consumer only have to connect itself against this queue and have to send and receive suitable objects in JSON.

Consuming a method with AMQP using Spring Integration

We use the same example like in „Expose a Java method with AMQP using Spring Integration“, but from the consumer perspective.

We will put the cart before the horse and start with the AMQP setup. Instead of the inbound-gateway we have to use the outbound-gateway. Well, the name  is self-explanatory. An outbound gateway integrates a channel (requestChannel) with an external service (here AMQP). And because it is a gateway, it will handle the reply message as well (put into resultChannel).

Now we could bind the channels with some Spring Integration? Well, we must not forget the messages are transported in JSON. Said this, we have to ensure the message are transformed from and to JSON correctly before connecting them with the rest.

Both transformers intercept the messages. The result are two channels (requestChannel and resultChannel) which are for the service; the other two channels are only for serializing and deserializing. Okay, let’s go.


That’s it.

Wait, what?! Yep.

We have already both channels (one for outgoing invocations, one for the return values). The rest is more or less boiler code, so you can use one of the built-in gateway proxies provided by Spring Integration. The Spring Integration <int:gateway> builds a proxy object for the given interface and do the „magic“ integration with the Spring Integration message flow. including waiting for the reply. You should have remember the @Payload annotation which is the same as on receiver side.

For each „call“ onto MyGateway.handle(Request):

  1. the proxy creates a new Spring Integration message containing the object „request“ as payload and put it into the channel „requestChannel“ defined in the configuration;
  2. the message will be transformed into JSON, and will be put into requestChannelJson;
  3. the message will be transmitted via AMQP (just another gateway actually) and waiting for a reply;
  4. the reply message will be received and put into resultChannelJson;
  5. the message will be transformed from JSON into a POJO and put into resultChannel;
  6. the message finally receives the waiting gateway and returns the reply like a regular method

Because this can take some time and would block the current thread, you can (better: should) use futures. The gateway proxy will take care of this!




Expose a Java method with AMQP using Spring Integration

The goal for this article: expose a method of a bean via AMQP and use JSON as a unified transport which can be serialized and deserialized in every language (and forget the xml overhead, of course).

Disclaimer: All examples are cleaned up and being reduced of irrelevant boiler code like for demo-purpose unnecessary xml headers.

Let’s start with an implementation:

Both Request and Response are good old plain Java objects with can easy serialized and deserialized. Excited about Joda? I’ve written something about it!

If we want to expose that one, we first have to connect this with Spring Integration. Speaking of Spring Integration, that means basically connecting something (i.e. a method) with a channel which is the instrument of message flows in Integration. Well, that means actually integrating stuff. Not a freely chosen name? 🙂

For configuration purpose, you can use both JavaConfig and XML. However, while JavaConfig is more confidently usable with parameters, especially the Integration configuration is sometimes more readable in XML. Chose your own way, but be consistent. Mixing up configurations is a no-brainer.

First of all, we have to define two channels. One for incoming messages (the actual argument of the method invocation), another one for outgoing messages (the return value).

Because we want to invoke a service on a message, we simply use a ServiceActivator. For each message in the requestChannel, it will be invoked.


So far, each message in requestChannel will be an invocation of ApiImpl. The additions in the class defines which method should be invoked and which message argument should be extracted from the payload (you could use the actual Integration Message object, but we stick with the payload). The return result will be pushed wrapped into a message again into the defined output channel resultChannel.

The method is now well „integrated“.

Said this, we want to connect the channels with an AMQP server in order to handle incoming requests and process them. Then plan is simple: For each message at a specific queue, we create a message and put it into a channel and wait for a corresponding reply message on another channel to make a real „reply behavior“. The whole stuff is already implemented and built-in available in Spring Integration AMQP.

For example, you have a queue named queue (hell, yes).

And that.. that was it? Well, actually we have reached the goal of connection already. We have defined an inbound gateway (read: get all messages and put them into a channel and do this for the returning result vice versa) and connect all channels. Basically, this works. However, this means that the message’s payload will be transmitted with a standard serializer and deserializer which means Java Object serializing. That is not good between different components or even the non Java world.

Solving this issue means introducing some neat transformers. A transformer is nothing less than a mapper converting objects from one type to another. In our case, we want to transport the objects via JSON. So: Read JSON from AMQP, deserialize it back to a POJO, process the message, return a value and serialize it to JSON and finally send it back via AMQP.

We add two more channels which are being the new connection points for the AMQP endpoint: both requestChannelJson and resultChannelJson will be feeded with JSON messages.

As you see, the built-in directives defines that all messages of requestChannelJson will be transformed to a POJO and put into requestChannel. The same way back for the result channels. Finally, we have to adjust the AMQP configuration to use the new Json channels. That’s it.

Migration Spring Integration 2.x to 3.x with Jackson2 and Joda DateTime

Until Spring Integration 3 finally introduced built-in support for Jackson2, you are probably sticked at Jackson 1 (1.6 or so) for messaging serialization, i.e. used for AMQP endpoints like RabbitMQ. That was even more ugly in web projects where the complete MVC stack was Jackson2 ready, but not the integration part. Even though that situation is unattractive, you could always deal with without hacking converters because Jackson 1 and 2 have completely different namespaces (because of the migration back from codehaus to the fasterxml domain).

That changes now with Spring Integration 3.

My personal check list when migrating:

  1. Ensure no Jackson 1.x is in the classpath anymore. Yes, check it again. There are sometimes these little annoying transitive dependencies. If so, the automatic/magic resolver (aka legacy compatibility code) of Spring Integration will prefer it. You can handle this with custom transformers, but not the built-in directives.
  2. Forget the new introduced MessageConverter in case of (AMQP) gateways, because it will not be used in the situation of message replies (INT-3285 which was actually a regression bug and will be fixed in 3.0.2).
  3. Know the existence of a Jackson’s ObjectMapper wrapper. Yes, really. The JsonObjectMapper can contain both Jackson1 and Jackson2 ObjectMappers.
  4. After point 3: If you want to customize the ObjectMapper (i.e. for a simple mapper.registerModule(new JodaModule());) you have to provide such a JsonObjectMapper. Easy with a small config.
  5. After point 4: Do not forget to add this custom ObjectMapper for each JSON transformer in use.

An example: [gist id=8825643 file=config.xml]

Updated 15.02.2014 adding reference to INT-3285

Updated 03.05.2014 INT-3285 resolved with release of Spring Integration 3.0.2