Entity view (Content)

Asynchronous Web Services in Mule

By aabed
Jul. 8, 2016

When you invoke a Web service synchronously, the invoking client application waits for the response to return before it can continue with its work. In cases where the response returns immediately, this method of invoking the Web service might be adequate. However, because request processing can be delayed, it is often useful for the client application to continue its work and handle the response later on. By calling a Web service asynchronously, the client can continue its processing, without interrupt, and will be notified when the asynchronous response is returned.

To achieve the asynchronous behavior, we will use the WS-Addressing standard to communicate the response endpoint reference where the response should be sent and other message correlation information between web service request and response.

WS_Addressing is a specification of transport-neutral mechanism that allows web services to communicate addressing information. It supports the use of asynchronous interactions by specifying a common SOAP header (wsa:ReplyTo) that contains the endpoint reference (EPR) to which the response is to be sent. The service provider transmits the response message over a separate connection to the wsa:ReplyTo endpoint. This decouples the lifetime of the SOAP request/response interaction from the lifetime of the HTTP request/response protocol, thus enabling long-running interactions that can span arbitrary periods of time.

Mulesoft ESB support WS-Addressing by configuring the CXF component to use the WS-Adressing features as shown below:

<cxf:jaxws-service ... >
<cxf:features>
<wsa:addressing />
</cxf:features>
</cxf:jaxws-service/>

More information available in the link here. Unfortunately it is not clear in the documentation how to implement the Asynchronous behaviour. 

After research, I found the expected behaviour is as follows:

  1. Client sends a SOAP request to the server HTTP endpoint. The SOAP request contains the address where the response should be sent.
  2. Server responds immediately with an HTTP status 202 (The request has been accepted, but the processing has not been completed).
  3. Server sends a SOAP response to the client HTTP endpoint that was define din the request.

To implement that we need the following:

  1. Server Endpoint : Mule flow with HTTP endpoint and CXF service
  2. Client of service : SoapUI
  3. Callback Endpoint : Mule flow with another HTTP endpoint and CXF service

In our example below, we will implement both server and client endpoints. We will use Java-first approach to implement the CXF web services.

The web service classes:

The web service interface interface shown below. The @Addressing annotation is the key here in generating all the required WSDL tags for WS-Addressing.

Notice that the interface define both web service request (The sayHello method) and the web service response (The sayHelloResponse method). I am just bing lazy here, you may want to separate them into two separate interface files.

@WebService
@Addressing
public interface Hello {
	@WebMethod(action = "http://service.appnovation.com/Hello/sayHello", operationName = "sayHello")
	String sayHello(String text);

	@Oneway
	@WebMethod(action = "http://service.appnovation.com/Hello/sayHelloResponse", operationName = "sayHelloResponse")
	void sayHelloResponse(@WebParam(name = "return") String ret);
}

The web service implementation class is shown below. Again, if you are separating the request and response interfaces then you need to have two java implementation files.

@WebService(endpointInterface = "com.appnovation.service.Hello", serviceName = "Hello")
public class HelloImpl implements Hello {

	public String sayHello(String text) {
		try {
			Thread.sleep(5000);
		} catch (InterruptedException ie) {
			ie.printStackTrace();
		}
		return "Hello " + text + " !";
	}

	public void sayHelloResponse(String ret) {
		System.out.println("Web service response: " + ret);
	}
}

Notice that I added a delay in the request method to simulate a long running process and make it clear to observe the asynchronous behaviour.

The web service flows:

Following the Mulesoft documentation mentioned earlier, the server endpoint implementation should be as simple as the following:

<http:listener-config name="HTTPListenerConfig" host="0.0.0.0" port="8081"/>
<flow name="serviceFlow">
  <http:listener config-ref="HTTPListenerConfig" path="/hello"/>
  <cxf:jaxws-service serviceClass="com.appnovation.service.Hello">
    <cxf:features>
      <wsa:addressing/>
    </cxf:features>
  </cxf:jaxws-service>
  <component class="com.appnovation.service.impl.HelloImpl"/>
</flow>

The callback endpoint implementation is almost identical to the server (Class names will be different if you are separating the request and response into different interface and implementation).

<flow name="callbackFlow">
  <http:listener config-ref="HTTPListenerConfig" path="/callback"/>
  <cxf:jaxws-service serviceClass="com.appnovation.service.Hello"/>
  <component class="com.appnovation.service.impl.HelloImpl"/>
</flow>

Unfortunately the above server flow implementation will not result the desired asynchronous behaviour. It seems that MuleSoft implementations do return the response to the callback http endpoint but the request http call is blocked until the response is sent back. Also, the client initial request will receive an HTTP 200 (OK) response instead of the HTTP 202. Most probably because the web service actually finished processing at that stage and HTTP 200 is the correct status.

My initial attempt to add the asynchronous behaviour is to wrap the java component by an "async" scope (shown below) but this actually made things worse since the callback endpoint started to receive an echo of the SOAP request immediately without waiting for the Thread sleep to finish. So if I am passing an argument "100" to the sayHello request I will get an immediate response of the same 100 value in the callback endpoint while the correct behaviour should be receiving a response of "Hello 100 !" after 5 seconds.

<flow name="serviceFlow">
  <http:listener config-ref="HTTPListenerConfig" path="/hello"/>
  <cxf:jaxws-service serviceClass="com.appnovation.service.Hello">
    <cxf:features>
      <wsa:addressing/>
    </cxf:features>
  </cxf:jaxws-service>
  <sync>
    <component class="com.appnovation.service.impl.HelloImpl"/>
  </sync>
</flow>

Moving the "cxf:jaxws-service" component inside the async scope didn't help either. I started to get NullPointerExceptions.

My final attempt is to proxy the initial request and return the HTTP 202 status immediately while the http web service flow continue processing. The client should call the /proxy endpoint instead of the original /hello to achieve the asynchronous behaviour. The proxy flow should be similar to the following:

<http:request-config name="HTTPRequestConfig" host="localhost" port="8081"/>
<flow name="proxyFlow" processingStrategy="non-blocking">
    <http:listener config-ref="HTTPListenerConfig" path="/proxy">
        <http:response-builder statusCode="202"/>
    </http:listener>
    <byte-array-to-object-transformer/>
    <async>
        <copy-properties propertyName="*"/>
        <http:request config-ref="HTTPRequestConfig" path="/hello" method="#[message.inboundProperties.'http.method']" parseResponse="false">
            <http:request-builder>
                <http:query-params expression="#[message.inboundProperties.'http.query.params']"/>
            </http:request-builder>
        </http:request>
    </async>
    <set-payload value="#['']"/>
</flow>

Notice the following in the above proxy flow:

  1. The http request input stream need to be deserialized into an object before passed to the "async" scope. The "byte-array-to-object-transformer" transformer is used.
  2. The http request inside the "async" scope will timeout after the default timeout setting. Until then this thread if blocked waiting for the http response that is useless. To accelerate the timeout we may need to set the responseTimout attribute into a very small value. In earlier Mule version we could've used the "http:outbound-endpoint" component that has an exchange pattern attribute and we could've set it into "one-way" to ignore the response.

Another approach is to use JMS as shown below. No need for the "async" scope here since a "one-way" jms outbound endpoint can send a request to a "request-response" one.

<flow name="mainFlow">
    <http:listener config-ref="HTTPListenerConfig" path="/hello">
        <http:response-builder statusCode="202"/>
    </http:listener>
    <byte-array-to-object-transformer/>
    <jms:outbound-endpoint queue="hello" connector-ref="JMSConnector"/>
    <set-payload value="#['']"/>
</flow>
<flow name="serviceFlow">
    <jms:inbound-endpoint exchange-pattern="request-response" queue="hello" connector-ref="JMSConnector">
        <response>
          <message-properties-transformer>
              <delete-message-property key="http.method" />
              <delete-message-property key="Content-Type" />
          </message-properties-transformer>
        </response>
    </jms:inbound-endpoint>
    <cxf:jaxws-service serviceClass="com.appnovation.service.Hello">
        <cxf:features>
            <wsa:addressing/>
        </cxf:features>
    </cxf:jaxws-service>
    <component class="com.appnovation.service.impl.HelloImpl"/>
</flow>

Notice the following in the above proxy flow:

  1. The message exchange pattern of the JMS outbound endpoint is "one-way" to achieve the asynchronous coupling behaviour.
  2. The message exchange pattern of the JMS inbound endpoint need to be "request-response" so that the CXF service will return a response after it finish executing the web service implementation.
  3. I removed some message response properties. You will get a warning message if you keep them.

If you don't want to use an external message broker, you may use ActiveMQ in embedded mode as explained here.

The SOAP request:

Using a SOAP client like SoapUI, we can send the SOAP request to the server endpoint http://localhost:8081/hello and provide the callback address http://localhost:8081/callback in the WS-addressing headers. The SOAP request will look like the following:

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://service.appnovation.com/">
	<soapenv:Header xmlns:wsa="http://www.w3.org/2005/08/addressing">
  		<wsa:Action>http://service.appnovation.com/Hello/sayHelloRequest</wsa:Action>
   		<wsa:ReplyTo>
   			<wsa:Address>http://localhost:8081/callback</wsa:Address>
   		</wsa:ReplyTo>
   		<wsa:MessageID>uuid:12747591-e933-486d-ad4e-faf5d182c722</wsa:MessageID>
   	</soapenv:Header>
   	<soapenv:Body>
    	<ser:sayHello>
        	<arg0>Test</arg0>
      	</ser:sayHello>
   	</soapenv:Body>
</soapenv:Envelope>

You can use the WS-Addressing tab in the SoapUI request pane (as shown below) instead of adding the SOAP envelope headers manually.

 

 Final Thoughts:

After all the work-arounds in implementing the asynchronous behaviour in web services using WS-Addressing, I would seriously consider ignoring MuleSoft broken implementation of the standard and either pass the response endpoint as part of the payload and write a MuleSoft http request endpoint to send the response (instead of replying on the CXF to internally do that for me). Another approach is to expect the caller to have a "/callback" or "/<service_path>/callback" endpoint and call it explicitly once done with the web service execution explicitly using http request or outbound endpoint.