概述

您需要将此依赖项包含在项目中:Spring中文文档

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>6.3.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:6.3.1"

它提供以下组件:Spring中文文档

出站通道适配器

出站通道适配器用于将消息从 Spring Integration 通道发布到 Apache Kafka 主题。 通道在应用程序上下文中定义,然后连接到向 Apache Kafka 发送消息的应用程序。 发送方应用程序可以使用 Spring Integration 消息发布到 Apache Kafka,这些消息由出站通道适配器在内部转换为 Kafka 记录,如下所示:Spring中文文档

  • Spring Integration 消息的有效负载用于填充 Kafka 记录的有效负载。Spring中文文档

  • 默认情况下,Spring Integration 消息的标头用于填充 Kafka 记录的键。kafka_messageKeySpring中文文档

您可以自定义目标主题和分区,以便分别通过 和 标头发布消息。kafka_topickafka_partitionIdSpring中文文档

此外,还提供了通过在出站消息上应用 SpEL 表达式来提取键、目标主题和目标分区的功能。 为此,它支持三对互斥的属性:<int-kafka:outbound-channel-adapter>Spring中文文档

通过这些方法,您可以分别将 、 和 指定为适配器上的静态值,或者在运行时根据请求消息动态评估它们的值。topicmessage-keypartition-idSpring中文文档

接口(由 提供)包含用于交互的常量 头。 和 默认标头现在需要前缀。 从使用旧标头的早期版本迁移时,需要在 . 或者,您可以使用 a 或 a 将上游标头更改为新标头。 如果使用常量值,还可以使用 和 在适配器上配置这些常量值。KafkaHeadersspring-kafkamessageKeytopickafka_message-key-expression="headers['messageKey']"topic-expression="headers['topic']"<int-kafka:outbound-channel-adapter>KafkaHeaders<header-enricher>MessageBuildertopicmessage-key

注意:如果适配器配置了主题或消息键(使用常量或表达式),则使用这些键并忽略相应的标头。 如果希望标头覆盖配置,则需要在表达式中对其进行配置,如下所示:Spring中文文档

topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"

适配器需要一个 ,而 又需要一个适当配置的 。KafkaTemplateKafkaProducerFactorySpring中文文档

如果提供了 () 并收到故障(同步或异步),则会向通道发送 an。 有效负载是具有 、 和 属性的 。 您可以通过设置属性来覆盖 。send-failure-channelsendFailureChannelsend()ErrorMessageKafkaSendFailureExceptionfailedMessagerecordProducerRecordcauseDefaultErrorMessageStrategyerror-message-strategySpring中文文档

如果提供了 (),则在成功发送后将发送具有有效负载类型的消息。send-success-channelsendSuccessChannelorg.apache.kafka.clients.producer.RecordMetadataSpring中文文档

如果应用程序使用事务,并且使用相同的通道适配器来发布由侦听器容器启动事务的消息,以及在没有现有事务的情况下发布消息,则必须配置 on 以覆盖容器或事务管理器使用的前缀。 容器启动的事务(生产者工厂或事务管理器属性)使用的前缀在所有应用程序实例上必须相同。 用于仅限生产者事务的前缀在所有应用程序实例上必须是唯一的。transactionIdPrefixKafkaTemplate

您可以配置必须解析为布尔值的 which。 如果使用 和 Kafka producer 属性,则在发送多条消息后刷新可能很有用;表达式应在最后一条消息上计算结果为 ,并且将立即发送不完整的批处理。 默认情况下,表达式在标头 () 中查找值。 如果值为 ,则刷新将发生,如果值为 或标头不存在,则不会。flushExpressionlinger.msbatch.sizeBoolean.TRUEBooleanKafkaIntegrationHeaders.FLUSHkafka_flushtruefalseSpring中文文档

默认值已从 10 秒更改为 Kafka 生产者属性,以便超时后的实际 Kafka 错误传播到应用程序,而不是此框架生成的超时。 为了保持一致性,这已被更改,因为您可能会遇到意外行为(Spring 可能会超时发送,而实际上最终会成功)。 重要说明:默认情况下,该超时为 120 秒,因此您可能希望缩短该超时以获得更及时的故障。KafkaProducerMessageHandler.sendTimeoutExpressiondelivery.timeout.ms+ 5000Spring中文文档

配置

以下示例演示如何为 Apache Kafka 配置出站通道适配器:Spring中文文档

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
    return f -> f
            .splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
            .publishSubscribeChannel(c -> c
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
                                    .timestampExpression("T(Long).valueOf('1487694048633')"),
                            e -> e.id("kafkaProducer1")))
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
                                   .timestamp(m -> 1487694048644L),
                            e -> e.id("kafkaProducer2")))
            );
}

@Bean
public DefaultKafkaHeaderMapper mapper() {
    return new DefaultKafkaHeaderMapper();
}

private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
        ProducerFactory<Integer, String> producerFactory, String topic) {
    return Kafka
            .outboundChannelAdapter(producerFactory)
            .messageKey(m -> m
                    .getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .headerMapper(mapper())
            .partitionId(m -> 10)
            .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
            .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    handler.setSuccessChannel(successes());
    handler.setFailureChannel(failures());
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    error-message-strategy="ems"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ... <!-- more producer properties -->
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>
接口(由 提供)包含用于交互的常量 头。 和 默认标头现在需要前缀。 从使用旧标头的早期版本迁移时,需要在 . 或者,您可以使用 a 或 a 将上游标头更改为新标头。 如果使用常量值,还可以使用 和 在适配器上配置这些常量值。KafkaHeadersspring-kafkamessageKeytopickafka_message-key-expression="headers['messageKey']"topic-expression="headers['topic']"<int-kafka:outbound-channel-adapter>KafkaHeaders<header-enricher>MessageBuildertopicmessage-key
如果应用程序使用事务,并且使用相同的通道适配器来发布由侦听器容器启动事务的消息,以及在没有现有事务的情况下发布消息,则必须配置 on 以覆盖容器或事务管理器使用的前缀。 容器启动的事务(生产者工厂或事务管理器属性)使用的前缀在所有应用程序实例上必须相同。 用于仅限生产者事务的前缀在所有应用程序实例上必须是唯一的。transactionIdPrefixKafkaTemplate

消息驱动的通道适配器

() 使用 或 。KafkaMessageDrivenChannelAdapter<int-kafka:message-driven-channel-adapter>spring-kafkaKafkaMessageListenerContainerConcurrentListenerContainerSpring中文文档

此外,该属性可用。 它可以接受 or 的值(默认值:)。 对于模式,每个消息有效负载都是从单个 . 对于模式,有效负载是从使用者轮询返回的所有实例转换而来的对象列表。 与批处理一样,、、 、 和 标头也是列表,其位置对应于有效负载中的位置。moderecordbatchrecordrecordConsumerRecordbatchConsumerRecord@KafkaListenerKafkaHeaders.RECEIVED_KEYKafkaHeaders.RECEIVED_PARTITIONKafkaHeaders.RECEIVED_TOPICKafkaHeaders.OFFSETSpring中文文档

收到的邮件填充了某些标头。 有关更多信息,请参见 KafkaHeadersSpring中文文档

对象(在标头中)不是线程安全的。 只能在适配器中调用侦听器的线程上调用其方法。 如果将消息传递给另一个线程,则不得调用其方法。Consumerkafka_consumer

提供 a 时,将根据其重试策略重试传递失败。 如果还提供了 an,则在重试用尽后,将使用默认值作为恢复回调。 您还可以使用 to 指定在这种情况下要执行的其他一些操作,或将其设置为将最终异常抛向侦听器容器,以便在其中进行处理。retry-templateerror-channelErrorMessageSendingRecovererrecovery-callbacknullSpring中文文档

生成 (用于 或 ) 时,可以通过设置属性来自定义错误消息。 默认情况下,使用 a 来提供对转换后的消息以及原始消息的访问。ErrorMessageerror-channelrecovery-callbackerror-message-strategyRawRecordHeaderErrorMessageStrategyConsumerRecordSpring中文文档

这种形式的重试会阻塞,如果所有轮询记录的聚合重试延迟可能超过使用者属性,则可能会导致重新平衡。 相反,请考虑将 a 添加到使用 .max.poll.interval.msDefaultErrorHandlerKafkaErrorSendingMessageRecoverer

配置

以下示例演示如何配置消息驱动的通道适配器:Spring中文文档

@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                    KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
                    .configureListenerContainer(c ->
                            c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
                                    .id("topic1ListenerContainer"))
                    .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
                            new RawRecordHeaderErrorMessageStrategy()))
                    .retryTemplate(new RetryTemplate())
                    .filterInRetry(true))
            .filter(Message.class, m ->
                            m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                    f -> f.throwExceptionOnRejection(true))
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("listeningFromKafkaResults1"))
            .get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    // set more properties
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>

您还可以使用用于注释的容器工厂创建用于其他目的的实例。 有关示例,请参阅 Spring for Apache Kafka 文档@KafkaListenerConcurrentMessageListenerContainerSpring中文文档

使用 Java DSL 时,不必将容器配置为 ,因为 DSL 将容器注册为 Bean。 以下示例演示如何执行此操作:@BeanSpring中文文档

@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
            KafkaMessageDrivenChannelAdapter.ListenerMode.record)
                .id("topic2Adapter"))
            ...
            get();
}

请注意,在本例中,适配器被赋予 ()。 容器在应用程序上下文中注册,名称为 。 如果适配器没有属性,则容器的 Bean 名称是容器的完全限定类名加上 ,其中每个容器的递增值。idtopic2Adaptertopic2Adapter.containerid#nnSpring中文文档

对象(在标头中)不是线程安全的。 只能在适配器中调用侦听器的线程上调用其方法。 如果将消息传递给另一个线程,则不得调用其方法。Consumerkafka_consumer
这种形式的重试会阻塞,如果所有轮询记录的聚合重试延迟可能超过使用者属性,则可能会导致重新平衡。 相反,请考虑将 a 添加到使用 .max.poll.interval.msDefaultErrorHandlerKafkaErrorSendingMessageRecoverer

入站通道适配器

提供可轮询通道适配器实现。KafkaMessageSourceSpring中文文档

配置

@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf)  {
    return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
                          e -> e.poller(Pollers.fixedDelay(5000)))
            .handle(System.out::println)
            .get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
    integrationFlow(Kafka.inboundChannelAdapter(cf,
        ConsumerProperties(TEST_TOPIC3).also {
            it.groupId = "kotlinMessageSourceGroup"
        }),
        { poller(Pollers.fixedDelay(100)) }) {
        handle { m ->

        }
    }
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf)  {
    ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
	consumerProperties.setGroupId("myGroupId");
	consumerProperties.setClientId("myClientId");
    retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
        id="adapter1"
        consumer-factory="consumerFactory"
        consumer-properties="consumerProperties1"
        ack-factory="ackFactory"
        channel="inbound"
        message-converter="converter"
        payload-type="java.lang.String"
        raw-header="true"
        auto-startup="false">
    <int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>

<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <map>
            <entry key="max.poll.records" value="1"/>
        </map>
    </constructor-arg>
</bean>

<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
    <constructor-arg name="topics" value="topic1"/>
    <property name="groupId" value="group"/>
    <property name="clientId" value="client"/>
</bean>

有关可用属性,请参阅 javadocs。Spring中文文档

默认情况下,必须在使用者工厂中显式设置,或者如果使用者工厂是 . 可以将属性设置为重写此行为。max.poll.recordsDefaultKafkaConsumerFactoryallowMultiFetchtrueSpring中文文档

您必须轮询内部的使用者以避免重新平衡。 如果设置为“必须处理所有检索到的记录”,则在 中再次轮询。max.poll.interval.msallowMultiFetchtruemax.poll.interval.ms

此适配器发出的消息包含一个标头,其中包含上次轮询中剩余的记录计数。kafka_remainingRecordsSpring中文文档

从 version 开始,支持在使用者属性中提供。 从记录标头中提取 A 并抛给被调用者。 对于 ,此例外被包装成 an 并发布到其 . 有关更多信息,请参见 ErrorHandlingDeserializer 文档。6.2KafkaMessageSourceErrorHandlingDeserializerDeserializationExceptionSourcePollingChannelAdapterErrorMessageerrorChannelSpring中文文档

您必须轮询内部的使用者以避免重新平衡。 如果设置为“必须处理所有检索到的记录”,则在 中再次轮询。max.poll.interval.msallowMultiFetchtruemax.poll.interval.ms

出站网关

出站网关用于请求/应答操作。 它与大多数 Spring Integration 网关的不同之处在于,发送线程不会在网关中阻塞,并且在回复侦听器容器线程上处理回复。 如果代码调用同步消息传递网关后面的网关,则用户线程将阻塞该网关,直到收到回复(或发生超时)。Spring中文文档

在为应答容器分配其主题和分区之前,网关不接受请求。 建议在模板的应答容器属性中添加 a 并等待调用,然后再向网关发送消息。ConsumerRebalanceListeneronPartitionsAssigned

默认值为 Kafka 生产者属性,以便将超时后的实际 Kafka 错误传播到应用程序,而不是此框架生成的超时。 为了保持一致性,这已更改,因为您可能会遇到意外行为(Spring 可能会超时 ,而实际上最终会成功)。 重要说明:默认情况下,该超时为 120 秒,因此您可能希望缩短超时以获得更及时的故障。KafkaProducerMessageHandlersendTimeoutExpressiondelivery.timeout.ms+ 5000send()Spring中文文档

配置

以下示例演示如何配置网关:Spring中文文档

@Bean
public IntegrationFlow outboundGateFlow(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {

    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(kafkaTemplate))
            .channel("kafkaReplies")
            .get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
    return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
    id="allProps"
    error-message-strategy="ems"
    kafka-template="template"
    message-key-expression="'key'"
    order="23"
    partition-id-expression="2"
    reply-channel="replies"
    reply-timeout="43"
    request-channel="requests"
    requires-reply="false"
    send-success-channel="successes"
    send-failure-channel="failures"
    send-timeout-expression="44"
    sync="true"
    timestamp-expression="T(System).currentTimeMillis()"
    topic-expression="'topic'"/>

有关可用属性,请参阅 javadocs。Spring中文文档

请注意,使用与出站通道适配器相同的类,唯一的区别是传递到构造函数的是 . 有关更多信息,请参阅 Spring for Apache Kafka 文档KafkaTemplateReplyingKafkaTemplateSpring中文文档

出站主题、分区、键等的确定方式与出站适配器相同。 回复主题确定如下:Spring中文文档

  1. 根据模板的回复容器的订阅主题验证名为(如果存在,则它必须具有 or 值)的消息标头。KafkaHeaders.REPLY_TOPICStringbyte[]Spring中文文档

  2. 如果模板仅订阅一个主题,则使用该主题。replyContainerSpring中文文档

还可以指定标头来确定要用于回复的特定分区。 同样,这是针对模板的回复容器的订阅进行验证的。KafkaHeaders.REPLY_PARTITIONSpring中文文档

或者,您也可以使用类似于以下 Bean 的配置:Spring中文文档

@Bean
public IntegrationFlow outboundGateFlow() {
    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(producerFactory(), replyContainer())
                .configureKafkaTemplate(t -> t.replyTimeout(30_000)))
            .channel("kafkaReplies")
            .get();
}
在为应答容器分配其主题和分区之前,网关不接受请求。 建议在模板的应答容器属性中添加 a 并等待调用,然后再向网关发送消息。ConsumerRebalanceListeneronPartitionsAssigned

入站网关

入站网关用于请求/应答操作。Spring中文文档

配置

以下示例演示如何配置入站网关:Spring中文文档

@Bean
public IntegrationFlow serverGateway(
        ConcurrentMessageListenerContainer<Integer, String> container,
        KafkaTemplate<Integer, String> replyTemplate) {
    return IntegrationFlow
            .from(Kafka.inboundGateway(container, replyTemplate)
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
        AbstractMessageListenerContainer<Integer, String>container,
        KafkaTemplate<Integer, String> replyTemplate) {

    KafkaInboundGateway<Integer, String, String> gateway =
        new KafkaInboundGateway<>(container, replyTemplate);
    gateway.setRequestChannel(requests);
    gateway.setReplyChannel(replies);
    gateway.setReplyTimeout(30_000);
    return gateway;
}
<int-kafka:inbound-gateway
        id="gateway1"
        listener-container="container1"
        kafka-template="template"
        auto-startup="false"
        phase="100"
        request-timeout="5000"
        request-channel="nullChannel"
        reply-channel="errorChannel"
        reply-timeout="43"
        message-converter="messageConverter"
        payload-type="java.lang.String"
        error-message-strategy="ems"
        retry-template="retryTemplate"
        recovery-callback="recoveryCallback"/>

有关可用属性,请参阅 javadocs。Spring中文文档

提供 a 时,将根据其重试策略重试传递失败。 如果还提供了 an,则在重试用尽后,将使用默认值作为恢复回调。 您还可以使用 to 指定在这种情况下要执行的其他一些操作,或将其设置为将最终异常抛向侦听器容器,以便在其中进行处理。RetryTemplateerror-channelErrorMessageSendingRecovererrecovery-callbacknullSpring中文文档

生成 (用于 或 ) 时,可以通过设置属性来自定义错误消息。 默认情况下,使用 a 来提供对转换后的消息以及原始消息的访问。ErrorMessageerror-channelrecovery-callbackerror-message-strategyRawRecordHeaderErrorMessageStrategyConsumerRecordSpring中文文档

这种形式的重试会阻塞,如果所有轮询记录的聚合重试延迟可能超过使用者属性,则可能会导致重新平衡。 相反,请考虑将 a 添加到使用 .max.poll.interval.msDefaultErrorHandlerKafkaErrorSendingMessageRecoverer

以下示例演示如何使用 Java DSL 配置简单的大写转换器:Spring中文文档

或者,可以使用类似于以下内容的代码配置大写转换器:Spring中文文档

@Bean
public IntegrationFlow serverGateway() {
    return IntegrationFlow
            .from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
                    producerFactory())
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}

您还可以使用用于注释的容器工厂创建用于其他目的的实例。 有关示例,请参阅 Spring for Apache Kafka 文档消息驱动的通道适配器@KafkaListenerConcurrentMessageListenerContainerSpring中文文档

这种形式的重试会阻塞,如果所有轮询记录的聚合重试延迟可能超过使用者属性,则可能会导致重新平衡。 相反,请考虑将 a 添加到使用 .max.poll.interval.msDefaultErrorHandlerKafkaErrorSendingMessageRecoverer

Apache Kafka 主题支持的频道

Spring Integration 具有由 Apache Kafka 主题支持的实现,以实现持久性。MessageChannelSpring中文文档

每个通道都需要一个用于发送端和一个侦听器容器工厂(用于可订阅通道)或一个用于可轮询通道。KafkaTemplateKafkaMessageSourceSpring中文文档

Java DSL 配置

@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
            ...
            .get();
}

@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .publishSubscribeChannel(pubSub(template, containerFactory),
                pubsub -> pubsub
                            .subscribe(subflow -> ...)
                            .subscribe(subflow -> ...))
            .get();
}

@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
            .groupId("group2")
            .get();
}

@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
        KafkaMessageSource<Integer, String> source) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
            .handle(...,  e -> e.poller(...))
            ...
            .get();
}
/**
 * Channel for a single subscriber.
 **/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicA");
    channel.setGroupId("group1");
    return channel;
}

/**
 * Channel for multiple subscribers.
 **/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicB", true);
    channel.setGroupId("group2");
    return channel;
}

/**
 * Pollable channel (topic is configured on the source)
 **/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
    KafkaMessageSource<String, String> source)

    PollableKafkaChannel channel =
        new PollableKafkaChannel(template, source);
    channel.setGroupId("group3");
    return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
    container-factory="containerFactory" />

<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
    group-id = "pollableGroup"/>

<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
    group-id="pubSubGroup" container-factory="containerFactory" />

消息转换

提供了 A。 有关更多信息,请参阅 Spring for Apache Kafka 文档StringJsonMessageConverterSpring中文文档

将此转换器与消息驱动的通道适配器一起使用时,可以指定要将传入有效负载转换为的类型。 这是通过在适配器上设置属性(属性)来实现的。 下面的示例演示如何在 XML 配置中执行此操作:payload-typepayloadTypeSpring中文文档

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        payload-type="com.example.Thing"
        error-channel="errorChannel" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>

以下示例显示了如何在 Java 配置中设置适配器上的属性(属性):payload-typepayloadTypeSpring中文文档

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
    return kafkaMessageDrivenChannelAdapter;
}

空有效负载和日志压缩“逻辑删除”记录

Spring Messaging 对象不能具有有效负载。 使用 Apache Kafka 的终结点时,有效负载(也称为逻辑删除记录)由类型为 的有效负载表示。 有关更多信息,请参阅 Spring for Apache Kafka 文档Message<?>nullnullKafkaNullSpring中文文档

Spring Integration 端点的 POJO 方法可以使用 true 值而不是 。 为此,请用 标记参数。 以下示例演示如何执行此操作:nullKafkaNull@Payload(required = false)Spring中文文档

@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}

KStream

您可以使用 a 从以下位置调用集成流:MessagingTransformerKStreamSpring中文文档

@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
        MessagingTransformer<byte[], byte[], byte[]> transformer)  transformer) {
    KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
            ...
            .transform(() -> transformer)
            .to(streamingTopic2);

    stream.print(Printed.toSysOut());

    return stream;
}

@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
        MessagingFunction function) {

    MessagingMessageConverter converter = new MessagingMessageConverter();
    converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
    return new MessagingTransformer<>(function, converter);
}

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(MessagingFunction.class)
        ...
        .get();
}

当集成流从接口启动时,创建的代理具有流 Bean 的名称,并附加“.gateway”,以便根据需要可以使用此 Bean 名称。@QualifierSpring中文文档

读/处理/写入方案的性能注意事项

许多应用程序从一个主题使用,执行一些处理并写入另一个主题。 在大多数情况下,如果失败,应用程序会希望引发异常,以便可以重试传入请求和/或发送到死信主题。 基础消息侦听器容器以及适当配置的错误处理程序支持此功能。 但是,为了支持这一点,我们需要阻止侦听器线程,直到写入操作成功(或失败),以便可以将任何异常抛向容器。 使用单个记录时,这是通过在出站适配器上设置属性来实现的。 但是,在使用批处理时,使用会导致性能显著下降,因为应用程序将在发送下一条消息之前等待每次发送的结果。 您还可以执行多次发送,然后等待这些发送的结果。 这是通过向消息处理程序添加 a 来实现的。 要启用该功能,请添加到出站消息;然后,这可用于将 A 与特定发送的消息相关联。 下面是如何使用此功能的示例:writesyncsyncfuturesChannelKafkaIntegrationHeaders.FUTURE_TOKENFutureSpring中文文档

@SpringBootApplication
public class FuturesChannelApplication {

    public static void main(String[] args) {
        SpringApplication.run(FuturesChannelApplication.class, args);
    }

    @Bean
    IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
        return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
                    ListenerMode.batch, "inTopic"))
                .handle(handler)
                .get();
    }

    @Bean
    IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlow.from(Gate.class)
                .enrichHeaders(h -> h
                        .header(KafkaHeaders.TOPIC, "outTopic")
                        .headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                        .futuresChannel("futures"))
                .get();
    }

    @Bean
    PollableChannel futures() {
        return new QueueChannel();
    }

}

@Component
@DependsOn("outbound")
class Handler {

    @Autowired
    Gate gate;

    @Autowired
    PollableChannel futures;

    public void handle(List<String> input) throws Exception {
        System.out.println(input);
        input.forEach(str -> this.gate.send(str.toUpperCase()));
        for (int i = 0; i < input.size(); i++) {
            Message<?> future = this.futures.receive(10000);
            ((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
        }
    }

}

interface Gate {

    void send(String out);

}