Enterprise Integration Patterns - Aggregate
Overview
The purpose of the aggregate pattern is to take a set of one or more input messages and create a single output message.
Use Cases
Aggregate can be used to satisfy a wide range of use cases depending on the message type and message flow requirements. The following use cases represent the most common categories for using aggregate.
Reassembly
Take a set of related messages and cram them into a single message. Most commonly used in conjunction with split and broadcast, where the number of messages in the input set is known.
Batching
Hold a bunch of messages and release them as a single batch message. The determination of when to release the batch message can be based on a time window, number of messages, size of messages, etc.
Enveloping
A refinement of batching, an enveloping aggregate can be used to create a group of messages with common characteristics (same type, same destination, same source, etc.). These messages are wrapped in one or more envelopes before they are forwarded as a single message.
Gating
An aggregate which can be used to make sure that messages from different flows are held until all messages arrive at the aggregate.
Selection
The aggregate can choose to only forward a subset of the aggregated messages.
IFL
Syntax
aggregate [type] [name | expression]
type := aggregate implementation (e.g. set, java)
name := named configuration which contains the aggregate details
expression := inline aggregate configuration
Examples
Aggregate with inline configuration
route do
from "multiple-in"
aggregate set ("count=5")
to "single-out"
end
Aggregate with external configuration
route do
from "multiple-in"
aggregate set "message-batch"
to "single-out"
end
> cat message-batch/aggregate.properties
count=5
Configuration
Each aggregate implementation will have it's own specific configuration. There however is configuration which can be defined for all types in general. This being:
| Attribute | Description | Default |
| timeout | amount of time to wait before the set is aggregated. Valid units are ms, s, m, h (e.g. 1h = 1 hour). |
| on-timeout | action to be performed if aggregate timeout interval expires and the aggregate conditions are not met. i.e on-timeout={error,send} | send |
| correlation-id | This is not yet a configurable parameter, but will be made configurable in the future. Currently the set and java aggregate types infer the correlation id to be the value of the "org.glassfish.openesb.messaging.groupid" Normalized Message property | Value of the "org.glassfish.openesb.messaging.groupid" property |
Aggregate Implementations
There are two aggregate types supported in the current release:
- set - out of the box implementation of a Set aggregator.
- java - provides the extensibility to plugin in a user defined POJO for aggregation.
Packaging
At application build time, all aggregate configurations are packaged as part of the application. Since the ultimate consumer of this configuration information is the runtime aggregate implementation, and not a component, it is not appropriate to package the individual configuration instances into a service unit. Rather, the following convention is being followed:
app-root /
META-INF /
flow/
aggregate/
[configuration]
Where '{name}' is the named configuration provided in IFL. This layout mimics the configuration layout, but includes META-INF/ and flow/ as parent directories.
All aggregate configuration is included in this directory, including inline configuration. Since inline configuration is not named, a name must be generated. Using a pattern of 'aggregate$n' should work, where 'n > 0' and increments for each inline aggregate definition in the IFL.