For the latest stable version, please use Spring Integration 6.4.0! |
Zookeeper Support
Version 4.2 added Zookeeper support to the framework in version 4.2, which consists of:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zookeeper</artifactId>
<version>6.2.11</version>
</dependency>
compile "org.springframework.integration:spring-integration-zookeeper:6.2.11"
Zookeeper Metadata Store
You can use the ZookeeperMetadataStore
where any MetadataStore
is needed, such as for persistent file list filters.
See Metadata Store for more information.
The following example configures a Zookeeper metadata store with XML:
<bean id="client" class="org.springframework.integration.zookeeper.config.CuratorFrameworkFactoryBean">
<constructor-arg value="${connect.string}" />
</bean>
<bean id="meta" class="org.springframework.integration.zookeeper.metadata.ZookeeperMetadataStore">
<constructor-arg ref="client" />
</bean>
The following example shows how to configure a Zookeeper metadata store with Java:
@Bean
public MetadataStore zkStore(CuratorFramework client) {
return new ZookeeperMetadataStore(client);
}
Zookeeper Lock Registry
The ZookeeperLockRegistry
can be used where any LockRegistry
is needed, such as when using an aggregator in a clustered environment with a shared MessageStore
.
A LockRegistry
is used to “look up” a lock based on a key (the aggregator uses correlationId
).
By default, locks in the ZookeeperLockRegistry
are maintained in zookeeper under the following path: /SpringIntegration-LockRegistry/
.
You can customize the path by providing an implementation of ZookeeperLockRegistry.KeyToPathStrategy
, as the following example shows:
public interface KeyToPathStrategy {
String pathFor(String key);
boolean bounded();
}
If the strategy returns true
from isBounded
, unused locks do not need to be harvested.
For unbounded strategies (such as the default), you need to periodically invoke expireUnusedOlderThan(long age)
to remove old unused locks from memory.
Starting with version 5.5.6, the ZookeeperLockRegistry
is support automatically clean up cache for ZkLock in ZookeeperLockRegistry.locks
via ZookeeperLockRegistry.setCacheCapacity()
.
See its JavaDocs for more information.
Zookeeper Leadership Event Handling
The following example uses XML to configure an application for leader election in Zookeeper:
<int-zk:leader-listener client="client" path="/siNamespace" role="cluster" />
client
is a reference to a CuratorFramework
bean.
A CuratorFrameworkFactoryBean
is available.
When a leader is elected, an OnGrantedEvent
is published for the role cluster
.
Any endpoints in that role are started.
When leadership is revoked, an OnRevokedEvent
is published for the role cluster
.
Any endpoints in that role are stopped.
See Endpoint Roles for more information.
You can use Java configuration to create an instance of the leader initiator, as the following example shows:
@Bean
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
return new LeaderInitiatorFactoryBean()
.setClient(client)
.setPath("/siTest/")
.setRole("cluster");
}
Starting with version 5.3, a candidate
option is exposed on the LeaderInitiatorFactoryBean
for more configuration control of the externally provided Candidate
instance.
Only one of the candidate
or role
options has to be provided, but not both; the role
options creates internally a DefaultCandidate
instance with an UUID
for id
option.