Index Changes

MQ BC Implementation: Redelivery Systemic Quality

The MQ Series Binding Component (MQ BC) implements the Redelivery Quality of Service, which is a part of the Systemic Qualities Specification.

Functionality

The Redelivery QoS concerns itself with message exchange delivery within the JBI environment. It manages delivery retries and delivery failure handling.

Redelivery parameters are configured through the Composite Application Service Assembly (CASA) editor. A set of redelivery parameters is specified between one consuming endpoint and one provisioning endpoint.


The MQ BC supports all four of the On Failure options defined by the Redelivery QoS: Error, Delete, Suspend, and Redirect. When a message exchange cannot be delivered to its destination service endpoint, despite its delivery-retry parameters, it is disposed based on the chosen On Failure option:

  • Delete - The message exchange is set to a DONE state. The MQ BC handles this transition normally, making permanent the removal of the message it retrieved from the source WebSphere MQ BC queue.
  • Error - The message exchange is set to an ERROR state. The MQ BC backs out the retrieval of the message from the WebSphere MQ BC queue, making it available again for a subsequent retrieval.
  • Redirect - The message exchange is directed by the Redelivery QoS to the specified, alternate service endpoint. Because the Redelivery functionality prosecutes the redirection as if the Error On Failure option is selected, then by transitivity, the MQ BC supports this mode as well.
  • Suspend - The message exchange is set to an ERROR state. As for the Error On Failure option, the MQ BC backs out the retrieval of the message from the WebSphere MQ BC queue and, additionally, suspends subsequent message exchange traffic for the failed (consuming) endpoint, until such time as the endpoint is resumed by the user.

Implementation

  1. Use MessagingChannel instead of DeliveryChannel.
  2. During Service Unit initialization, configure the MessagingChannel with service qualities.
  3. Create ExchangeTemplates instead of Message Exchanges.
  4. Remove message exchange send-retry logic from message exchange receiver implementations (e.g., OutboundMessageProcessor).
  5. Register a SendFailureListener.
  6. Implement component-specific Suspend On Failure handling

Use MessagingChannel instead of DeliveryChannel

MessagingChannel is a DeliveryChannel decorator class implemented by the QoS common library to carry service qualities and handle redelivery scenarios. The MQ BC obtains a MessagingChannel during Binding Component initialization:

import com.sun.jbi.common.qos.messaging.BaseMessagingChannel;
import com.sun.jbi.common.qos.messaging.MessagingChannel;

public class MQBindingComponent implements ComponentLifeCycle, Component {
    private MessagingChannel mChannel;
    public void init(ComponentContext context) {
        mChannel = new BaseMessagingChannel(context);
        ...
    }
    ...
}

Configure the MessagingChannel with service qualities

final class ServiceUnitImpl implements ServiceUnit {
    ServiceUnitImpl(String id,
                    String suPath,
                    MQComponentContext context,
                    ...) {
        mId = id;
        mSuPath = suPath;
        mContext = context;
        ...
    }

    public void init() throws JBIException {
        mContext.getMessagingChannel().installServiceQualities(mId, mSuPath);
        ...
    }
    ...
}

Create ExchangeTemplates instead of Message Exchanges

The MQ BC no longer creates message exchange objects directly and dispatches them to the delivery channel. It creates and sends BaseExchangeTemplates objects instead, which understand how to create the initial message exchange, and if necessary create subsequent ones (for delivery retries).

import com.sun.jbi.common.qos.messaging.BaseExchangeTemplates;
import com.sun.jbi.common.qos.messaging.MessagingChannel;

import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;

    ...

    private final MQNormalizer mNormalizer = new MQNormalizer();
    private final MessageExchangeFactory mMsgExchangeFactory;
    private final QName mOperationFullQName;
    private ServiceEndpoint mServiceEndpoint;
    private MessagingChannel mChannel;

    public InboundMessageProcessor(MQComponentContext context,
                                   Endpoint endpoint,
                                   QName operationFullQName)
            throws MessagingException {

        mContext = context;
        mChannel = context.getMessagingChannel();
        mOperationFullQName = operationFullQName;
        mMsgExchangeFactory = context.getMessagingChannel().createExchangeFactory();
        ...
    }

    private void sendMessageExchange(MQMessage msg)
            throws MQ BCMessagingException, Exception {

        Source input = mNormalizer.normalize(msg, ...);
        
        // A unique message ID for each message (not message exchange)
        // is required for Redelivery.  Retries will not work without it.
        // Group ID is optional, however.
        String msgId = generateGuid(msg);
        
        BaseExchangeTemplates template = new BaseExchangeTemplates(mServiceEndpoint,
                mOperationFullQName.toString(),
                true,
                null, // Group ID
                msgId,
                input,
                mMsgExchangeFactory);

        Properties properties = new Properties();
        ...
        template.setPropExchange(properties);
                
        try {
            mChannel.send(template);
            mEndpoint.getEndpointStatus().incrementSentRequests();
            mLogger.log(Level.FINE, "InboundMessageProcessor_SEND_TO_NMR", msgId);
        } catch (MessagingException e) {
            throw new MQMessagingException(msgId, e);
        }
    }

    ...

Remove message exchange send-retry logic from MEx receiver implementations

In previous iterations of the Redelivery support, it was required that a component execute the following processing for a given message exchange received with an ERROR status:

  1. Look-up a message exchange's Redelivery configuration.
  2. Determine from the configuration whether the exchange is due a delivery retry.
  3. If redelivery is needed, increment try counter(s), comply with wait-time delays, and then re-dispatch the message exchange.
  4. If redelivery is exhausted, look up the specified recourse action from its Redelivery configuration, and effect it in a component-specific way.

By utilizing BaseExchangeTemplates objects and MessagingChannel.send(ExchangeTemplates), the work of evaluating the redelivery configuration and performing delivery retries now rest with the MessagingChannel implementation. The MQ BC MEx receiver implementation now processes Message Exchanges with DONE and ERROR status without needing to concern itself with message exchange redelivery.

The only exception to this is support for the Suspend On Failure action, which is described later.

Register a SendFailureListener.

When MessagingChannel.send(ExchangeTemplates) is invoked with a Redelivery configuration that specifies a non-zero wait time, the QoS library creates a timer/schedule task for the exchange dispatch. This creates a separate thread on which the channel-sending is executed. If the send attempt fails (e.g., when the service endpoint is no longer available), the exception is raised in the scheduled thread, not on the original thread from which the MessagingChannel.send was invoked.

For the MQ BC implementation, the original thread requires knowledge about the exception, in order to execute recourse actions such as transactional rollbacks. The MQ BC handles this scenario by registering SendFailureListeners with the MessagingChannel:

import com.sun.jbi.common.qos.messaging.SendFailureListener;

final class InboundMessageProcessor implements SendFailureListener, Runnable {
    public InboundMessageProcessor(MQComponentContext context,
                                   Endpoint endpoint,
                                   QName operationFullQName)
            throws MessagingException {
        context.getMessagingChannel().addSendFailureListener(this);
        ...
    }

    /**
     * Handles {@link javax.jbi.messaging.MessagingException}s that occur when a
     * {@link javax.jbi.messaging.MessageExchange} cannot be sent on the {@link
     * javax.jbi.messaging.DeliveryChannel}.
     *
     * @param error The exception that is caught or prepared by {@link
     * com.sun.jbi.common.qos.messaging.MessagingChannel}.
     * @param mex The message exchange related to the specified error.
     */
    public void handleSendFailure(MessagingException error, MessageExchange mex) {
        // Note that this listener is only notified if an async send is
        // performed (e.g., Redelivery QoS waitTime > 0).  If a synchronous
        // send fails, this listener is not involved, the send will raise
        // an exception immediately.
        //
        // This means any exception handling addition/removal/modification done
        // in this method, probably should also occur for the exception handling
        // code in the catch block(s) of wherever MessageChannel.send is called
        // (and vice versa).
        mLogger.log(Level.SEVERE, mMessages.getString(
                "InboundMessageProcessor_Failed_send_mex",
                new Object[]{mex.getExchangeId(), error.getMessage()}), error);
        endTransactionWithRollback();
    }

    ...
}

Implement component-specific Suspend On Failure handling

The MQ BC support for the Suspend option consists of stopping all message exchange traffic for the consuming endpoint on which a delivery fails.

With the exception of the Delete On Failure option, redelivery failures cause message exchanges to transition to an ERROR status. This is insufficient information for the MQ BC to determine if an endpoint suspension is necessary, so the component's MEx receiver implementation looks up the Redelivery configuration associated with a given message exchange:

    private void processInbound(Endpoint destination, InOnly exchange)
            throws MessagingException {

        String exchangeId = exchange.getExchangeId();
        ExchangeStatus status = exchange.getStatus();
        
        if (status == ExchangeStatus.DONE) {
            mLog.log(Level.INFO,
                    "OutboundMessageProcessor_IN_ONLY_ACTIVE_READY_COMMIT",
                    exchangeId);

        } else if (status == ExchangeStatus.ERROR) {
            mLog.log(Level.INFO,
                    "OutboundMessageProcessor_IN_ONLY_ERROR_READY_ROLLBACK",
                    exchangeId);
                
            // An exchange may have failed due to a delivery failure.
            // The Redelivery QoS library would have taken care of retries.
            // If Redelivery QoS is in effect, arriving here means the
            // On Failure action is set to Error or Suspend.
            //
            // Error On Failure: Rollback the message.
            // Suspend On Failure: Suspend the endpoint and rollback the message.
            //
            final InboundMessageProcessor processor =
                    (InboundMessageProcessor) exchange.getProperty(
                            InboundMessageProcessor.SYNCPOINT_DISPOSITION_CONTROL_PROPERTY_NAME);

            EndpointInfo endpointInfo = EndpointInfo.valueOf(
                    destination.getServiceEndpoint(),
                    false); // false = consumer endpoint

            RedeliveryConfig config =
                    mComponentContext.getMessagingChannel().getServiceQuality(
                            endpointInfo, RedeliveryConfig.class);
            if (config != null && config.getFailure() == RedeliveryConfig.Failure.suspend) {
                // Suspend endpoint.
                processor.suspend(destination);
                // Fall-thru to do the rollback.
            }

            Transaction tx = (Transaction) exchange.getProperty(
                    MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
            if (tx != null) {
                try {
                    tx.setRollbackOnly();
                } catch (Exception e) {
                    throw new MessagingException(e);
                }
            } else if (processor != null) {
                processor.setSyncpointRollback();
            }
            ...
        }
        ...
    }

Samples

MQRedeliveryTest(info) demonstrates the Redelivery QoS support implementation.

Requirements

  • OpenESB 2.0 with Open JBI Components: HTTP BC, File BC, MQ BC, and BPEL SE.
  • WebSphere MQ 6.0 server installation.

Preparation

  • Ensure the WebSphere MQ service is running with a reachable, writable and readable queue.
  • Before deploying the sample project, review the WSDL files, particularly QueuePoller.wsdl, and ensure that MQ connectivity parameters are changed to match the WebSphere MQ environment in use.

Overview

  • MQ BC consumes a QueuePoller service using messages retrieved from an MQ BC queue as requests.
  • Two BPEL processes are used to provision the QueuePoller service.
    • The first BPEL process (InvokeAndFail) is orchestrated to consume a second service provisioned thru the HTTP BC.
    • The second BPEL process (WriteToFile) consumes a second service provisioned thru the File BC.
  • The HTTP BC is configured with an invalid address, guaranteeing that the invocation of the second service will fail.
  • The File BC is configured to function correctly.
  • The MQ BC consumption endpoint is connected to the InvokeAndFail provisioning endpoint.
  • The sample is preconfigured to use the Redirect On Failure action.
    • The WriteToFile provisioning endpoint is the Redirection target.

Execution

  1. Use the WebSphere MQ Explorer to create a message in the MQ BC's target queue.
  2. MQ BC polls the queue, finds the message, and consumes the QueuePoller service.
  3. InvokeAndFail BPEL process executes, and fails due to misconfigured HTTP BC.
  4. After redelivery retries, the Redelivery QoS causes the message exchange to be diverted to the WriteToFile BPEL process.
  5. The WriteToFile BPEL process consumes the File BC-provisioned service, causing the message to be written to the file system.
  • Try changing the Redelivery configuration:
    • An On Failure action of Delete will remove the message permanently from the MQ queue without redirection.
    • An On Failure action of Error will cause the MQ BC to repeatedly send the message and then rollback.
    • An On Failure action of Suspend will cause the MQ BC to try to consume the service only once. After the delivery fails, the endpoint is suspended; the MQ BC will repeatedly poll the MQ queue for messages, but abort its dispatch to the channel for that particular endpoint.

JSPWiki v2.4.100
[RSS]
« Home Index Changes Prefs
This page (revision-13) was last changed on 18-Aug-08 18:16 PM, -0700 by NoelDAng