Hazelcast 支持

Spring 集成提供了通道适配器和其他 Util 组件,以便与内存中的数据网格 Hazelcast 进行交互。spring-doc.cn

您需要将此依赖项包含在您的项目中:spring-doc.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-hazelcast</artifactId>
    <version>6.4.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:6.4.0"

Hazelcast 组件的 XML 命名空间和 schemaLocation 定义是:spring-doc.cn

xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
          https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"

Hazelcast 事件驱动的入站通道适配器

Hazelcast 提供分布式数据结构,例如:spring-doc.cn

它还提供事件侦听器,以便侦听对这些数据结构所做的修改。spring-doc.cn

Hazelcast 事件驱动的入站通道适配器侦听相关的缓存事件,并将事件消息发送到定义的通道。 它支持 XML 和 JavaConfig 驱动的配置。spring-doc.cn

XML 配置 :

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                      cache="map"
                      cache-events="UPDATED, REMOVED"
                      cache-listening-policy="SINGLE" />

Hazelcast 事件驱动的入站通道适配器需要以下属性:spring-doc.cn

  • channel:指定将消息发送到的通道;spring-doc.cn

  • cache:指定要侦听的分布式 Object 引用。 这是一个强制性属性;spring-doc.cn

  • cache-events:指定要侦听的缓存事件。 它是一个可选属性,其默认值为 。 其支持的值如下:ADDEDspring-doc.cn

  • 和 支持的缓存事件类型 : 、 、 、 和IMapMultiMapADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL;spring-doc.cn

  • 支持的缓存事件类型:、、、ReplicatedMapADDEDREMOVEDUPDATEDEVICTED;spring-doc.cn

  • 支持的缓存事件类型 , 和 : , . 没有 的缓存事件类型。IListISetIQueueADDEDREMOVEDITopicspring-doc.cn

  • cache-listening-policy:将缓存侦听策略指定为 或 。 它是一个可选属性,其默认值为 。 每个侦听具有相同 cache-events 属性的相同缓存对象的 Hazelcast 入站通道适配器可以接收单个事件消息或所有事件消息。 如果是,则所有侦听具有相同 cache-events 属性的同一缓存对象的 Hazelcast 入站通道适配器将收到所有事件消息。 如果是,他们将收到唯一的事件消息。SINGLEALLSINGLEALLSINGLEspring-doc.cn

一些配置示例:spring-doc.cn

分布式地图
<int:channel id="mapChannel"/>

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                              cache="map"
                              cache-events="UPDATED, REMOVED" />

<bean id="map" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="map"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>
分布式 MultiMap
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
                              cache="multiMap"
                              cache-events="ADDED, REMOVED, CLEAR_ALL" />

<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
    <constructor-arg value="multiMap"/>
</bean>
分布式列表
<int-hazelcast:inbound-channel-adapter  channel="listChannel"
                               cache="list"
                               cache-events="ADDED, REMOVED"
                               cache-listening-policy="ALL" />

<bean id="list" factory-bean="instance" factory-method="getList">
    <constructor-arg value="list"/>
</bean>
分布式集
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />

<bean id="set" factory-bean="instance" factory-method="getSet">
    <constructor-arg value="set"/>
</bean>
分布式队列
<int-hazelcast:inbound-channel-adapter  channel="queueChannel"
                               cache="queue"
                               cache-events="REMOVED"
                               cache-listening-policy="ALL" />

<bean id="queue" factory-bean="instance" factory-method="getQueue">
    <constructor-arg value="queue"/>
</bean>
分布式主题
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />

<bean id="topic" factory-bean="instance" factory-method="getTopic">
    <constructor-arg value="topic"/>
</bean>
复制的地图
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
                              cache="replicatedMap"
                              cache-events="ADDED, UPDATED, REMOVED"
                              cache-listening-policy="SINGLE"  />

<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
    <constructor-arg value="replicatedMap"/>
</bean>

Java 配置示例:

以下示例显示了一个配置。 相同的配置可用于其他分布式数据结构(、 和 ):DistributedMapIMapMultiMapReplicatedMapIListISetIQueueITopicspring-doc.cn

@Bean
public PollableChannel distributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hazelcastInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
    final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
    producer.setOutputChannel(distributedMapChannel());
    producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
    producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);

    return producer;
}

Hazelcast Continuous Query 入站通道适配器

Hazelcast Continuous Query 允许侦听对特定 map 条目执行的修改。 Hazelcast Continuous Query Inbound Channel Adapter 是一个事件驱动的通道适配器,它根据定义的谓词侦听相关的分布式 map 事件。spring-doc.cn

@Bean
public PollableChannel cqDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> cqDistributedMap() {
    return hazelcastInstance().getMap("CQ_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
    final HazelcastContinuousQueryMessageProducer producer =
        new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
    producer.setOutputChannel(cqDistributedMapChannel());
    producer.setCacheEventTypes("UPDATED");
    producer.setIncludeValue(false);

    return producer;
}
<int:channel id="cqMapChannel"/>

<int-hazelcast:cq-inbound-channel-adapter
                channel="cqMapChannel"
                cache="cqMap"
                cache-events="UPDATED, REMOVED"
                predicate="name=TestName AND surname=TestSurname"
                include-value="true"
                cache-listening-policy="SINGLE"/>

<bean id="cqMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="cqMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它支持以下 6 个属性:spring-doc.cn

  • channel:指定将消息发送到的通道;spring-doc.cn

  • cache:指定要侦听的分布式 Map 引用。 命令的;spring-doc.cn

  • cache-events:指定要侦听的缓存事件。 Optional 属性的默认值。 支持的值为 、 和ADDEDADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL;spring-doc.cn

  • predicate:指定一个谓词,用于侦听对特定映射条目执行的修改。 命令的;spring-doc.cn

  • include-value:指定在连续查询结果中包含 value 和 oldValue。 可选,默认为;truespring-doc.cn

  • cache-listening-policy:将缓存侦听策略指定为 或 。 可选,默认值为 。 每个侦听具有相同 cache-events 属性的相同缓存对象的 Hazelcast CQ 入站通道适配器都可以接收单个事件消息或所有事件消息。 如果是,则侦听具有相同 cache-events 属性的同一缓存对象的所有 Hazelcast CQ 入站通道适配器将收到所有事件消息。 如果是,他们将收到唯一的事件消息。SINGLEALLSINGLEALLSINGLEspring-doc.cn

Hazelcast 群集监视器入站通道适配器

Hazelcast Cluster Monitor 支持侦听在集群上执行的修改。 Hazelcast Cluster Monitor Inbound Channel Adapter 是一个事件驱动的通道适配器,它监听相关的 Membership、Distributed Object、Migration、Lifecycle 和 Client 事件:spring-doc.cn

@Bean
public PollableChannel eventChannel() {
    return new QueueChannel();
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
    HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
    producer.setOutputChannel(eventChannel());
    producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");

    return producer;
}
<int:channel id="monitorChannel"/>

<int-hazelcast:cm-inbound-channel-adapter
                 channel="monitorChannel"
                 hazelcast-instance="instance"
                 monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它支持以下三个属性:spring-doc.cn

  • channel:指定将消息发送到的通道;spring-doc.cn

  • hazelcast-instance:指定用于侦听集群事件的 Hazelcast 实例引用。 这是一个强制性属性;spring-doc.cn

  • monitor-types:指定要侦听的监视器类型。 它是一个可选属性,是默认值。 支持的值为 、 、 、 、 。MEMBERSHIPMEMBERSHIPDISTRIBUTED_OBJECTMIGRATIONLIFECYCLECLIENTspring-doc.cn

Hazelcast 分布式 SQL 入站通道适配器

Hazelcast 允许在分布式 map 上运行分布式查询。 Hazelcast 分布式 SQL 入站通道适配器是一个轮询入站通道适配器。 它运行定义的 distributed-sql 命令,并根据迭代类型返回结果。spring-doc.cn

@Bean
public PollableChannel dsDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> dsDistributedMap() {
    return hazelcastInstance().getMap("DS_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
    final HazelcastDistributedSQLMessageSource messageSource =
        new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
            "name='TestName' AND surname='TestSurname'");
    messageSource.setIterationType(DistributedSQLIterationType.ENTRY);

    return messageSource;
}
<int:channel id="dsMapChannel"/>

<int-hazelcast:ds-inbound-channel-adapter
            channel="dsMapChannel"
            cache="dsMap"
            iteration-type="ENTRY"
            distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
    <int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>

<bean id="dsMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="dsMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它需要一个 Poller 并支持四个属性:spring-doc.cn

  • channel:指定消息发送到的通道。 这是一个强制性属性;spring-doc.cn

  • cache:指定要查询的分布式引用。 它是 mandatory 属性;IMapspring-doc.cn

  • iteration-type:指定结果类型。 分布式 SQL 可以在 、 或 上运行。 它是一个可选属性,是默认属性。 支持的值为 , 和EntrySetKeySetLocalKeySetValuesVALUEENTRY, `KEYLOCAL_KEYVALUE;spring-doc.cn

  • distributed-sql:指定 sql 语句的 where 子句。 这是一个强制性属性。spring-doc.cn

Hazelcast 出站通道适配器

Hazelcast 出站通道适配器侦听其定义的通道并将传入消息写入相关的分布式缓存。 对于分布式对象定义,它需要 或 之一。 支持的分布式对象包括:、、、、 和 。cachecache-expressionHazelcastHeaders.CACHE_NAMEIMapMultiMapReplicatedMapIListISetIQueueITopicspring-doc.cn

@Bean
public MessageChannel distributedMapChannel() {
    return new DirectChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hzInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hzInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
    HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
    handler.setDistributedObject(distributedMap());
    handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
    handler.setExtractPayload(true);
    return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
                    cache-expression="headers['CACHE_HEADER']"
                    key-expression="payload.key"
                    extract-payload="true"/>

它需要以下属性:spring-doc.cn

  • channel:指定将消息发送到的通道;spring-doc.cn

  • cache:指定分布式对象引用。 自选;spring-doc.cn

  • cache-expression:通过 Spring 表达式语言 (SpEL) 指定分布式对象。 自选;spring-doc.cn

  • key-expression:通过 Spring 表达式语言 (SpEL) 指定键值对的键。 可选,仅对于 和 分布式数据结构是必需的。IMapMultiMapReplicatedMapspring-doc.cn

  • extract-payload:指定是发送整个消息还是仅发送有效负载。 Optional 属性。 如果为 true,则只会将有效负载写入分布式对象。 否则,将通过转换 message header 和 payload 来写入整个消息。truespring-doc.cn

通过在 header 中设置分布式对象名称,可以通过同一通道将消息写入不同的分布式对象。 如果未定义 or 属性,则必须在请求中设置标头 。cachecache-expressionHazelcastHeaders.CACHE_NAMEMessagespring-doc.cn

Hazelcast 领导人选举

如果需要领导者选举(例如,对于只有一个节点应接收消息的高可用性消息消费者),可以使用基于 Hazelcast 的:LeaderInitiatorspring-doc.cn

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public LeaderInitiator initiator() {
    return new LeaderInitiator(hazelcastInstance());
}

当一个节点被选为 leader 时,它将向所有应用程序侦听器发送 an。OnGrantedEventspring-doc.cn

Hazelcast 消息存储

对于分布式消息传递状态管理,例如对于持久性或跟踪消息组,提供了以下实现:QueueChannelAggregatorHazelcastMessageStorespring-doc.cn

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MessageGroupStore messageStore() {
    return new HazelcastMessageStore(hazelcastInstance());
}

默认情况下,用于将消息和组存储为键/值。 任何自定义都可以提供给 .SPRING_INTEGRATION_MESSAGE_STOREIMapIMapHazelcastMessageStorespring-doc.cn

Hazelcast 元数据存储

可以使用后备 Hazelcast 实现 a 。 默认映射是使用可自定义的名称创建的。ListenableMetadataStoreIMapSPRING_INTEGRATION_METADATA_STOREspring-doc.cn

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MetadataStore metadataStore() {
    return new HazelcastMetadataStore(hazelcastInstance());
}

允许您注册自己的侦听器类型的 implements ,以便通过 .HazelcastMetadataStoreListenableMetadataStoreMetadataStoreListeneraddListener(MetadataStoreListener callback)spring-doc.cn

Hazelcast Lock 注册表

可以使用支持的 Hazelcast 分布式支持来实现 a:LockRegistryILockspring-doc.cn

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public LockRegistry lockRegistry() {
    return new HazelcastLockRegistry(hazelcastInstance());
}

当与共享的 (例如 store management),可用于跨多个应用程序实例提供此功能,以便一次只有一个实例可以操作组。MessageGroupStoreAggregatorHazelcastLockRegistryspring-doc.cn

对于所有分布式操作,必须在 上启用 CP 子系统。HazelcastInstance

使用 Hazelcast 的消息通道

Hazelcast 和分布式对象本质上是消息传递原语,可以与 Spring 集成核心组件一起使用,而无需在此 Hazelcast 模块中进行额外的实现。IQueueITopicspring-doc.cn

QueueChannel 可以由任何 ,包括提到的 Hazelcast distributed 提供:java.util.QueueIQueuespring-doc.cn

@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
    return new QueueChannel(hazelcastInstance.getQueue("springIntegrationQueue"));
}

将此配置放在应用程序的 Hazelcast 集群中的多个节点上,将使 be distributed 的,并且只有一个节点能够从该 . 这类似于PollableJmsChannelPollableKafkaChannelPollableAmqpChannelQueueChannelMessageIQueuespring-doc.cn

如果生产者端不是 Spring 集成应用程序,则无法配置,因此使用普通的 Hazelcast API 来生成数据。 在这种情况下,使用者端的方法是错误的:必须使用 Inbound Channel Adapter 解决方案:QueueChannelIQueueQueueChannelspring-doc.cn

@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
    return hazelcastInstance.getQueue("springIntegrationQueue");
}

@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
    return myStringHzQueue::poll;
}

Hazelcast 中的抽象与 JMS 中的 a 具有相似的语义:所有订阅者都会收到已发布的消息。 对于一对简单的 bean,此机制作为开箱即用的功能受到支持:ITopicTopicMessageChannelspring-doc.cn

@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
        MessageChannel fromHazelcastTopicChannel) {

    ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
	topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
	return topic;
}

@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
    return new FixedSubscriberChannel(springIntegrationTopic::publish);
}

@Bean
public MessageChannel fromHazelcastTopicChannel() {
    return new DirectChannel();
}

是 的优化变体,它需要在初始化时进行。 由于 this 是一个函数式接口,因此可以为该方法提供简单的 lambda。 当消息发送到 时,它只会发布到 Hazelcast 上。 它也是一个功能接口,因此可以提供 的 lambda。 因此,订阅者将消耗发送到上述 的所有消息。FixedSubscriberChannelDirectChannelMessageHandlerMessageHandlerhandleMessagepublishToHazelcastTopicChannelITopiccom.hazelcast.topic.MessageListenerITopic#addMessageListenerfromHazelcastTopicChannelITopicspring-doc.cn

可以随 . 例如,使用相应的配置,可以实现集群范围的单例:ExecutorChannelIExecutorServicespring-doc.cn

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(
                new Config()
                    .addExecutorConfig(new ExecutorConfig()
                         .setName("singletonExecutor")
                         .setPoolSize(1)));
}

@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
    return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}