Consume a remote Java method with AMQP using Spring Integration

After the unexpected interest about exposing a method (thank you Josh mentioning it in the weekly news) , I will go ahead. Let’s consume such a service we had exposed.

Remember: We had a service method which is integrated via an AMQP queue (in my case RabbitMQ, but you can change this into whatever you want). That means the consumer only have to connect itself against this queue and have to send and receive suitable objects in JSON.

Consuming a method with AMQP using Spring Integration

We use the same example like in „Expose a Java method with AMQP using Spring Integration“, but from the consumer perspective.

We will put the cart before the horse and start with the AMQP setup. Instead of the inbound-gateway we have to use the outbound-gateway. Well, the name  is self-explanatory. An outbound gateway integrates a channel (requestChannel) with an external service (here AMQP). And because it is a gateway, it will handle the reply message as well (put into resultChannel).

Now we could bind the channels with some Spring Integration? Well, we must not forget the messages are transported in JSON. Said this, we have to ensure the message are transformed from and to JSON correctly before connecting them with the rest.

Both transformers intercept the messages. The result are two channels (requestChannel and resultChannel) which are for the service; the other two channels are only for serializing and deserializing. Okay, let’s go.


That’s it.

Wait, what?! Yep.

We have already both channels (one for outgoing invocations, one for the return values). The rest is more or less boiler code, so you can use one of the built-in gateway proxies provided by Spring Integration. The Spring Integration <int:gateway> builds a proxy object for the given interface and do the „magic“ integration with the Spring Integration message flow. including waiting for the reply. You should have remember the @Payload annotation which is the same as on receiver side.

For each „call“ onto MyGateway.handle(Request):

  1. the proxy creates a new Spring Integration message containing the object „request“ as payload and put it into the channel „requestChannel“ defined in the configuration;
  2. the message will be transformed into JSON, and will be put into requestChannelJson;
  3. the message will be transmitted via AMQP (just another gateway actually) and waiting for a reply;
  4. the reply message will be received and put into resultChannelJson;
  5. the message will be transformed from JSON into a POJO and put into resultChannel;
  6. the message finally receives the waiting gateway and returns the reply like a regular method

Because this can take some time and would block the current thread, you can (better: should) use futures. The gateway proxy will take care of this!




Kommentar verfassen