此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

本节介绍如何发送消息。Spring中文文档

KafkaTemplate

本节介绍如何使用发送消息。KafkaTemplateSpring中文文档

概述

包装一个生产者,并提供将数据发送到 Kafka 主题的便捷方法。 以下列表显示了以下方法:KafkaTemplateKafkaTemplateSpring中文文档

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

有关更多详细信息,请参阅 JavadocSpring中文文档

在版本 3.0 中,以前返回的方法已更改为 return . 为了便于迁移,2.9 版本添加了一个方法,该方法提供具有返回类型的相同方法;此方法不再可用。ListenableFutureCompletableFutureusingCompletableFuture()CompletableFuture

API 要求已向模板提供默认主题。sendDefaultSpring中文文档

API 将 a 作为参数,并将此时间戳存储在记录中。 用户提供的时间戳的存储方式取决于 Kafka 主题上配置的时间戳类型。 如果主题配置为 使用 ,则会记录用户指定的时间戳(如果未指定,则生成时间戳)。 如果主题配置为 使用 ,则忽略用户指定的时间戳,并将代理添加本地代理时间。timestampCREATE_TIMELOG_APPEND_TIMESpring中文文档

和 方法委托给基础 Producer 上的相同方法。 该方法提供对基础生产者的直接访问。metricspartitionsForexecuteSpring中文文档

若要使用模板,可以配置生产者工厂,并在模板的构造函数中提供该工厂。 以下示例演示如何执行此操作:Spring中文文档

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

从版本 2.5 开始,您现在可以覆盖工厂的属性,以创建具有同一工厂不同生产者配置的模板。ProducerConfigSpring中文文档

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

请注意,类型(例如Spring Boot自动配置的bean)可以使用不同的缩小泛型类型进行引用。ProducerFactory<?, ?>Spring中文文档

您还可以使用标准定义配置模板。<bean/>Spring中文文档

然后,若要使用该模板,可以调用其方法之一。Spring中文文档

将方法与参数一起使用时,主题、分区、键和时间戳信息将在包含以下项的消息头中提供:Message<?>Spring中文文档

消息有效负载是数据。Spring中文文档

或者,您可以配置 with a 以获取包含发送结果(成功或失败)的异步回调,而不是等待 完成。 以下列表显示了接口的定义:KafkaTemplateProducerListenerFutureProducerListenerSpring中文文档

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

默认情况下,模板配置了 ,该模板会记录错误,并且在发送成功时不执行任何操作。LoggingProducerListenerSpring中文文档

为方便起见,如果您只想实现其中一种方法,则提供了默认方法实现。Spring中文文档

请注意,send 方法返回 . 您可以向监听器注册回调,以异步接收发送结果。 以下示例演示如何执行此操作:CompletableFuture<SendResult>Spring中文文档

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult具有两个属性,a 和 。 有关这些对象的信息,请参阅 Kafka API 文档。ProducerRecordRecordMetadataSpring中文文档

可以强制转换为 ;其属性包含失败的记录。ThrowableKafkaProducerExceptionproducerRecordSpring中文文档

如果您希望阻止发送线程以等待结果,则可以调用 future 的方法;建议使用带有超时的方法。 如果设置了 ,则可能希望在等待之前调用,或者为方便起见,模板具有一个构造函数,该构造函数具有一个参数,该参数会导致模板在每次发送时调用。 仅当设置了 producer 属性并希望立即发送部分批处理时,才需要刷新。get()linger.msflush()autoFlushflush()linger.msSpring中文文档

例子

本节显示了向 Kafka 发送消息的示例:Spring中文文档

例 1.非阻塞(异步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
阻塞(同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

请注意,原因与属性有关。ExecutionExceptionKafkaProducerExceptionproducerRecordSpring中文文档

在版本 3.0 中,以前返回的方法已更改为 return . 为了便于迁移,2.9 版本添加了一个方法,该方法提供具有返回类型的相同方法;此方法不再可用。ListenableFutureCompletableFutureusingCompletableFuture()CompletableFuture

RoutingKafkaTemplate

从版本 2.5 开始,可以使用 在运行时根据目标名称选择生产者。RoutingKafkaTemplatetopicSpring中文文档

路由模板不支持事务、、 或操作,因为这些操作的主题未知。executeflushmetrics

该模板需要实例的映射。 这张地图应该按顺序排列(例如 a),因为它是按顺序遍历的;您应该在开始时添加更具体的模式。java.util.regex.PatternProducerFactory<Object, Object>LinkedHashMapSpring中文文档

以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用相同的模板发送到不同的主题,每个主题使用不同的值序列化程序。Spring中文文档

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

此示例的对应 s 显示在“注释属性”中。@KafkaListenerSpring中文文档

有关实现类似结果的另一种技术,但具有将不同类型发送到同一主题的附加功能,请参阅委派序列化程序和反序列化程序Spring中文文档

路由模板不支持事务、、 或操作,因为这些操作的主题未知。executeflushmetrics

DefaultKafkaProducerFactory

使用 KafkaTemplate 中所示,a 用于创建创建者。ProducerFactorySpring中文文档

不使用 Transactions 时,缺省情况下,会按照 JavaDocs 中的建议创建所有客户机使用的单例生产者。 但是,如果调用模板,则可能会导致使用同一生产者的其他线程出现延迟。 从版本 2.3 开始,具有新属性 。 设置为 时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。DefaultKafkaProducerFactoryKafkaProducerflush()DefaultKafkaProducerFactoryproducerPerThreadtrueSpring中文文档

当 时,当不再需要生产者时,用户代码必须调用工厂。 这将在物理上关闭生产者并将其从 . 打电话或不会清理这些生产者。producerPerThreadtruecloseThreadBoundProducer()ThreadLocalreset()destroy()

创建 时,可以通过调用仅接受属性映射的构造函数从配置中获取键和/或值类(请参阅使用 KafkaTemplate 中的示例),或者可以将实例传递给构造函数(在这种情况下,所有 s 共享相同的实例)。 或者,您可以提供 s(从 2.3 版开始),用于获取每个实例的单独实例:DefaultKafkaProducerFactorySerializerSerializerDefaultKafkaProducerFactoryProducerSupplier<Serializer>SerializerProducerSpring中文文档

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从版本 2.5.10 开始,您现在可以在创建工厂后更新生产者属性。 例如,如果您必须在凭据更改后更新 SSL 密钥/信任存储位置,这可能很有用。 这些更改不会影响现有的生产者实例;调用以关闭任何现有生产者,以便使用新属性创建新的生产者。 注意:不能将事务性生产者工厂更改为非事务性生产者工厂,反之亦然。reset()Spring中文文档

现在提供了两种新方法:Spring中文文档

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

从版本 2.8 开始,如果将序列化程序作为对象提供(在构造函数中或通过 setter),工厂将调用该方法以使用配置属性配置它们。configure()Spring中文文档

当 时,当不再需要生产者时,用户代码必须调用工厂。 这将在物理上关闭生产者并将其从 . 打电话或不会清理这些生产者。producerPerThreadtruecloseThreadBoundProducer()ThreadLocalreset()destroy()

ReplyingKafkaTemplate

版本 2.1.3 引入了一个子类来提供请求/回复语义。 该类已命名,并具有两个附加方法;下面显示了方法签名:KafkaTemplateReplyingKafkaTemplateSpring中文文档

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

结果是异步填充的结果(或超时的异常)。 结果还具有一个属性,该属性是调用 的结果。 您可以使用此将来确定发送操作的结果。CompletableFuturesendFutureKafkaTemplate.send()Spring中文文档

在版本 3.0 中,这些方法返回的期货(及其属性)已更改为 s 而不是 s。sendFutureCompletableFutureListenableFuture

如果使用第一种方法,或者参数为 ,则使用模板的属性(默认为 5 秒)。replyTimeoutnulldefaultReplyTimeoutSpring中文文档

从版本 2.8.8 开始,模板具有新方法。 如果应答容器配置为避免在容器初始化之前发送请求和应答,则此功能非常有用。waitForAssignmentauto.offset.reset=latestSpring中文文档

使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的属性,因为在第一次轮询完成之前不会发送通知。pollTimeout

以下 Spring Boot 应用程序显示了如何使用该功能的示例:Spring中文文档

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

请注意,我们可以使用 Boot 自动配置的容器工厂来创建回复容器。Spring中文文档

如果将非平凡的反序列化程序用于回复,请考虑使用委托给配置的反序列化程序的 ErrorHandlingDeserializer。 这样配置后,将异常完成,您可以在其属性中捕获 。RequestReplyFutureExecutionExceptionDeserializationExceptioncauseSpring中文文档

从版本 2.6.7 开始,除了检测 s 之外,模板还将调用该函数(如果提供)。 如果它返回异常,则将异常完成未来。DeserializationExceptionreplyErrorCheckerSpring中文文档

下面是一个示例:Spring中文文档

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

模板设置一个标头(默认命名),服务器端必须回显该标头。KafkaHeaders.CORRELATION_IDSpring中文文档

在这种情况下,以下应用程序响应:@KafkaListenerSpring中文文档

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

基础结构回显相关 ID 并确定回复主题。@KafkaListenerSpring中文文档

有关发送回复的更多信息,请参阅使用 @SendTo 转发侦听器结果。 该模板使用默认标题来指示回复所指向的主题。KafKaHeaders.REPLY_TOPICSpring中文文档

从版本 2.2 开始,模板会尝试从配置的回复容器中检测回复主题或分区。 如果容器配置为侦听单个主题或单个主题,则用于设置回复标头。 如果以其他方式配置容器,则用户必须设置回复标头。 在这种情况下,将在初始化期间写入日志消息。 以下示例使用:TopicPartitionOffsetINFOKafkaHeaders.REPLY_TOPICSpring中文文档

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

使用单个应答进行配置时,可以对多个模板使用相同的应答主题,只要每个实例侦听不同的分区即可。 使用单个回复主题进行配置时,每个实例必须使用不同的 . 在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例才能找到相关 ID。 这对于自动缩放可能很有用,但会带来额外网络流量的开销,并且丢弃每个不需要的回复的成本很小。 使用此设置时,建议将模板设置为 ,这样可以减少对 DEBUG 的意外回复的日志记录级别,而不是默认的 ERROR。TopicPartitionOffsetgroup.idsharedReplyTopictrueSpring中文文档

以下是将回复容器配置为使用同一共享回复主题的示例:Spring中文文档

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
如果您有多个客户端实例,并且未按照上一段所述进行配置,则每个实例都需要一个专用的回复主题。 另一种方法是为每个实例设置并使用专用分区。 包含一个四字节的 int (big-endian)。 服务器必须使用此标头将回复路由到正确的分区(执行此操作)。 但是,在这种情况下,应答容器不得使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(通过在其构造函数中使用 a)。KafkaHeaders.REPLY_PARTITIONHeader@KafkaListenerTopicPartitionOffsetContainerProperties
要求 Jackson 位于类路径上(对于 )。 如果它不可用,则消息转换器没有标头映射器,因此必须使用 配置 ,如前所述。DefaultKafkaHeaderMapper@KafkaListenerMessagingMessageConverterSimpleKafkaHeaderMapper

默认情况下,使用 3 个标头:Spring中文文档

基础结构使用这些标头名称来路由回复。@KafkaListenerSpring中文文档

从版本 2.3 开始,您可以自定义标头名称 - 模板有 3 个属性 、 和 。 如果您的服务器不是 Spring 应用程序(或不使用 )。correlationHeaderNamereplyTopicHeaderNamereplyPartitionHeaderName@KafkaListenerSpring中文文档

相反,如果请求应用程序不是 Spring 应用程序,并且将关联信息放在不同的标头中,从 3.0 版开始,您可以在侦听器容器工厂上配置自定义,并且该标头将被回显。 以前,侦听器必须回显自定义关联标头。correlationHeaderName

请求/回复 sMessage<?>

版本 2.7 在发送和接收 的抽象中添加了方法:ReplyingKafkaTemplatespring-messagingMessage<?>Spring中文文档

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

这些将使用模板的默认值,还有一些重载版本可以在方法调用中超时。replyTimeoutSpring中文文档

在版本 3.0 中,这些方法返回的期货(及其属性)已更改为 s 而不是 s。sendFutureCompletableFutureListenableFuture

如果使用者或模板可以通过回复消息中的配置或类型元数据转换有效负载,而无需任何其他信息,请使用第一种方法。DeserializerMessageConverterSpring中文文档

如果需要提供返回类型的类型信息,请使用第二种方法,以帮助消息转换器。 这也允许同一模板接收不同的类型,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。 以下是后者的示例:Spring中文文档

模板 Bean
@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}
使用模板
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
在版本 3.0 中,这些方法返回的期货(及其属性)已更改为 s 而不是 s。sendFutureCompletableFutureListenableFuture
使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的属性,因为在第一次轮询完成之前不会发送通知。pollTimeout
如果您有多个客户端实例,并且未按照上一段所述进行配置,则每个实例都需要一个专用的回复主题。 另一种方法是为每个实例设置并使用专用分区。 包含一个四字节的 int (big-endian)。 服务器必须使用此标头将回复路由到正确的分区(执行此操作)。 但是,在这种情况下,应答容器不得使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(通过在其构造函数中使用 a)。KafkaHeaders.REPLY_PARTITIONHeader@KafkaListenerTopicPartitionOffsetContainerProperties
要求 Jackson 位于类路径上(对于 )。 如果它不可用,则消息转换器没有标头映射器,因此必须使用 配置 ,如前所述。DefaultKafkaHeaderMapper@KafkaListenerMessagingMessageConverterSimpleKafkaHeaderMapper
相反,如果请求应用程序不是 Spring 应用程序,并且将关联信息放在不同的标头中,从 3.0 版开始,您可以在侦听器容器工厂上配置自定义,并且该标头将被回显。 以前,侦听器必须回显自定义关联标头。correlationHeaderName
在版本 3.0 中,这些方法返回的期货(及其属性)已更改为 s 而不是 s。sendFutureCompletableFutureListenableFuture

回复类型 message<?>

当返回 2.5 之前版本的 时,需要填充回复主题和相关 ID 标头。 在此示例中,我们使用请求中的回复主题标头:@KafkaListenerMessage<?>Spring中文文档

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

这还显示了如何在回复记录上设置密钥。Spring中文文档

从版本 2.5 开始,框架将检测这些标头是否缺失,并使用主题填充它们 - 根据值确定的主题或传入标头(如果存在)。 它还将回显传入的 和 (如果存在)。@SendToKafkaHeaders.REPLY_TOPICKafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITIONSpring中文文档

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

聚合多个回复

Using ReplyingKafkaTemplate 中的模板严格适用于单个请求/应答方案。 对于单个消息的多个接收方返回回复的情况,可以使用 . 这是 Scatter-Gather Enterprise Integration Pattern 的客户端实现。AggregatingReplyingKafkaTemplateSpring中文文档

与 一样,构造函数需要生产者工厂和侦听器容器来接收回复;它有第三个参数,每次收到回复时都会参考该参数;当谓词返回时,S 的集合用于完成方法返回的 。ReplyingKafkaTemplateAggregatingReplyingKafkaTemplateBiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategytrueConsumerRecordFuturesendAndReceiveSpring中文文档

还有一个附加属性(默认值为 false)。 当 设置为 时,部分结果通常不会用 完成未来,而是正常完成未来(只要至少收到一条回复记录)。returnPartialOnTimeouttrueKafkaReplyTimeoutExceptionSpring中文文档

从版本 2.3.5 开始,谓词也会在超时后调用(如果为 )。 第一个参数是当前记录列表;第二种情况是此调用是否由于超时而导致。 谓词可以修改记录列表。returnPartialOnTimeouttruetrueSpring中文文档

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

请注意,返回类型是 a,其值是 s 的集合。 “外部”不是“真实”记录,它是由模板合成的,作为请求收到的实际回复记录的持有者。 当发生正常发布(发布策略返回 true)时,主题设置为 ;如果为 true,并且发生超时(并且至少已收到一条回复记录),则主题设置为 。 该模板为这些“主题”名称提供了常量静态变量:ConsumerRecordConsumerRecordConsumerRecordaggregatedResultsreturnPartialOnTimeoutpartialResultsAfterTimeoutSpring中文文档

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

中的实数 s 包含从中接收回复的实际主题。ConsumerRecordCollectionSpring中文文档

应答的侦听器容器必须配置为 或 ;consumer 属性必须是(自 2.3 版起的默认值)。 为了避免丢失消息的可能性,模板仅在未完成的请求为零时(即发布策略释放最后一个未完成的请求)时提交偏移量。 重新平衡后,可能会重复发送回复;对于任何飞行中的请求,这些都将被忽略;当收到已发布的回复的重复回复时,您可能会看到错误日志消息。AckMode.MANUALAckMode.MANUAL_IMMEDIATEenable.auto.commitfalse
如果将 ErrorHandlingDeserializer 与此聚合模板一起使用,则框架不会自动检测 s。 相反,记录(带值)将原封不动地返回,标头中的反序列化异常。 建议应用程序调用实用工具方法方法来确定是否发生反序列化异常。 有关更多信息,请参阅其 JavaDocs。 此聚合模板也不需要 ;您应该对回复的每个元素执行检查。DeserializationExceptionnullReplyingKafkaTemplate.checkDeserialization()replyErrorChecker
应答的侦听器容器必须配置为 或 ;consumer 属性必须是(自 2.3 版起的默认值)。 为了避免丢失消息的可能性,模板仅在未完成的请求为零时(即发布策略释放最后一个未完成的请求)时提交偏移量。 重新平衡后,可能会重复发送回复;对于任何飞行中的请求,这些都将被忽略;当收到已发布的回复的重复回复时,您可能会看到错误日志消息。AckMode.MANUALAckMode.MANUAL_IMMEDIATEenable.auto.commitfalse
如果将 ErrorHandlingDeserializer 与此聚合模板一起使用,则框架不会自动检测 s。 相反,记录(带值)将原封不动地返回,标头中的反序列化异常。 建议应用程序调用实用工具方法方法来确定是否发生反序列化异常。 有关更多信息,请参阅其 JavaDocs。 此聚合模板也不需要 ;您应该对回复的每个元素执行检查。DeserializationExceptionnullReplyingKafkaTemplate.checkDeserialization()replyErrorChecker