此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

该 jar 包含一些有用的实用程序,可帮助您测试应用程序。spring-kafka-testSpring中文文档

嵌入式 Kafka 代理

提供了两种实现:Spring中文文档

  • EmbeddedKafkaZKBroker- 启动嵌入式实例的旧实现(使用 时仍是默认设置)。ZookeeperEmbeddedKafkaSpring中文文档

  • EmbeddedKafkaKraftBroker- 使用而不是在组合控制器和代理模式下使用(从 3.1 开始)。KraftZookeeperSpring中文文档

有几种方法可以配置代理,如以下各节所述。Spring中文文档

KafkaTestUtils

org.springframework.kafka.test.utils.KafkaTestUtils提供了许多静态帮助程序方法来使用记录、检索各种记录偏移量等。 有关完整的详细信息,请参阅其 JavadocsSpring中文文档

JUnit

org.springframework.kafka.test.utils.KafkaTestUtils还提供了一些静态方法来设置生产者和使用者属性。 以下列表显示了这些方法签名:Spring中文文档

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

从版本 2.5 开始,该方法将 设置为 。 这是因为,在大多数情况下,您希望使用者使用在测试用例中发送的任何消息。 默认值是,这意味着在使用者启动之前,测试已发送的消息将不会收到这些记录。 若要还原到以前的行为,请将属性设置为“调用方法后”。consumerPropsConsumerConfig.AUTO_OFFSET_RESET_CONFIGearliestConsumerConfiglatestlatestSpring中文文档

使用嵌入式代理时,通常对每个测试使用不同的主题,以防止串扰。 如果由于某种原因无法做到这一点,请注意,该方法的默认行为是在分配后将分配的分区查找到开头。 由于它无权访问使用者属性,因此必须使用重载方法,该方法采用布尔参数来查找到末尾而不是开头。consumeFromEmbeddedTopicsseekToEndSpring中文文档

提供了 JUnit 4 包装器,用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。 (有关与 JUnit 5 一起使用的信息,请参见 @EmbeddedKafka 注解)。 以下列表显示了这些方法的签名:@RuleEmbeddedKafkaZKBroker@EmbeddedKafkaSpring中文文档

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
JUnit4 不支持 JUnit4。EmbeddedKafkaKraftBroker

该类具有一个实用工具方法,可让您使用它创建的所有主题。 以下示例演示如何使用它:EmbeddedKafkaBrokerSpring中文文档

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

有一些实用方法可以从使用者那里获取结果。 以下列表显示了这些方法签名:KafkaTestUtilsSpring中文文档

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

以下示例演示如何使用:KafkaTestUtilsSpring中文文档

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

当嵌入式 Kafka 和嵌入式 Zookeeper 服务器由 启动时,将 将 命名为 的系统属性设置为 Kafka 代理的地址,并将 命名的系统属性设置为 Zookeeper 的地址。 为此属性提供了方便的常量 ( 和 )。EmbeddedKafkaBrokerspring.embedded.kafka.brokersspring.embedded.zookeeper.connectEmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERSEmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECTSpring中文文档

Kafka 代理的地址可以暴露给任何任意且方便的属性,而不是默认的系统属性。 为此,可以在启动嵌入式 Kafka 之前设置 () 系统属性。 例如,使用 Spring Boot 时,应分别为自动配置 Kafka 客户端设置配置属性。 因此,在随机端口上使用嵌入式 Kafka 运行测试之前,我们可以将其设置为系统属性 - 并将使用它来公开其代理地址。 这现在是此属性的默认值(从版本 3.0.10 开始)。spring.embedded.kafka.brokersspring.embedded.kafka.brokers.propertyEmbeddedKafkaBroker.BROKER_LIST_PROPERTYspring.kafka.bootstrap-serversspring.embedded.kafka.brokers.property=spring.kafka.bootstrap-serversEmbeddedKafkaBrokerSpring中文文档

使用 ,可以为 Kafka 服务器提供其他属性。 请参阅 Kafka 配置,以获取有关可能的代理属性的更多信息。EmbeddedKafkaBroker.brokerProperties(Map<String, String>)Spring中文文档

从版本 2.5 开始,该方法将 设置为 。 这是因为,在大多数情况下,您希望使用者使用在测试用例中发送的任何消息。 默认值是,这意味着在使用者启动之前,测试已发送的消息将不会收到这些记录。 若要还原到以前的行为,请将属性设置为“调用方法后”。consumerPropsConsumerConfig.AUTO_OFFSET_RESET_CONFIGearliestConsumerConfiglatestlatestSpring中文文档

使用嵌入式代理时,通常对每个测试使用不同的主题,以防止串扰。 如果由于某种原因无法做到这一点,请注意,该方法的默认行为是在分配后将分配的分区查找到开头。 由于它无权访问使用者属性,因此必须使用重载方法,该方法采用布尔参数来查找到末尾而不是开头。consumeFromEmbeddedTopicsseekToEndSpring中文文档

JUnit4 不支持 JUnit4。EmbeddedKafkaKraftBroker

配置主题

以下示例配置创建名为 and 的 5 个分区的主题、一个具有 10 个分区的 Topic 以及一个具有 15 个分区的 Topic:cathatthing1thing2Spring中文文档

public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }

}

默认情况下,当出现问题(例如添加已存在的主题)时,将引发异常。 版本 2.6 添加了该方法的新版本,该版本返回 ;键是主题名称,值表示成功,或表示失败。addTopicsMap<String, Exception>nullExceptionSpring中文文档

将同一代理用于多个测试类

您可以将同一代理用于多个测试类,其内容类似于以下内容:Spring中文文档

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

这假定了一个 Spring Boot 环境,并且嵌入式代理替换了 bootstrap servers 属性。Spring中文文档

然后,在每个测试类中,可以使用类似于以下内容的内容:Spring中文文档

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果不使用 Spring Boot,则可以使用 获取引导服务器。broker.getBrokersAsString()Spring中文文档

前面的示例没有提供在所有测试完成后关闭代理的机制。 例如,如果您在 Gradle 守护程序中运行测试,这可能是一个问题。 在这种情况下,您不应该使用此技术,或者您应该在测试完成时使用某些东西来调用。destroy()EmbeddedKafkaBroker

从 3.0 版开始,该框架为 JUnit 平台公开了 a;默认情况下,它处于禁用状态。 这需要 JUnit Platform 1.8 或更高版本。 此侦听器的目的是为整个测试计划启动一个全局,并在计划结束时停止它。 若要启用此侦听器,从而为项目中的所有测试提供单个全局嵌入式 Kafka 群集,必须通过系统属性或 JUnit Platform 配置将该属性设置为。 此外,还可以提供以下属性:GlobalEmbeddedKafkaTestExecutionListenerEmbeddedKafkaBrokerspring.kafka.global.embedded.enabledtrueSpring中文文档

  • spring.kafka.embedded.count- 要管理的 Kafka 代理数量;Spring中文文档

  • spring.kafka.embedded.ports- 每个 Kafka 代理要启动的端口(逗号分隔值),如果首选随机端口;值的数量必须等于上述值;0countSpring中文文档

  • spring.kafka.embedded.topics- 要在启动的 Kafka 集群中创建的主题(逗号分隔值);Spring中文文档

  • spring.kafka.embedded.partitions- 要为创建的主题配置的分区数;Spring中文文档

  • spring.kafka.embedded.broker.properties.location- 其他 Kafka 代理配置属性的文件位置;此属性的值必须遵循 Spring 资源抽象模式;Spring中文文档

  • spring.kafka.embedded.kraft- 默认为 false,如果为 true,则使用 an 而不是 .EmbeddedKafkaKraftBrokerEmbeddedKafkaZKBrokerSpring中文文档

从本质上讲,这些属性模仿了某些属性。@EmbeddedKafkaSpring中文文档

请参阅《JUnit 5 用户指南》中有关配置属性以及如何提供这些属性的更多信息。 例如,可以将条目添加到测试类路径中的文件中。 从版本 3.0.10 开始,代理会自动将其设置为 ,默认情况下,用于使用 Spring Boot 应用程序进行测试。spring.embedded.kafka.brokers.property=my.bootstrap-serversjunit-platform.propertiesspring.kafka.bootstrap-serversSpring中文文档

建议不要将全局嵌入式 Kafka 和每个测试类组合在单个测试套件中。 它们共享相同的系统属性,因此很可能会导致意外行为。
spring-kafka-test具有对 and 的传递依赖关系(后者支持全局嵌入式代理)。 如果您希望使用嵌入式代理并且不使用 JUnit,您可能希望排除这些依赖项。junit-jupiter-apijunit-platform-launcher
前面的示例没有提供在所有测试完成后关闭代理的机制。 例如,如果您在 Gradle 守护程序中运行测试,这可能是一个问题。 在这种情况下,您不应该使用此技术,或者您应该在测试完成时使用某些东西来调用。destroy()EmbeddedKafkaBroker
建议不要将全局嵌入式 Kafka 和每个测试类组合在单个测试套件中。 它们共享相同的系统属性,因此很可能会导致意外行为。
spring-kafka-test具有对 and 的传递依赖关系(后者支持全局嵌入式代理)。 如果您希望使用嵌入式代理并且不使用 JUnit,您可能希望排除这些依赖项。junit-jupiter-apijunit-platform-launcher

@EmbeddedKafka注解

我们通常建议您将规则用作避免在测试之间启动和停止代理(并为每个测试使用不同的主题)。 从 2.0 版开始,如果您使用 Spring 的测试应用程序上下文缓存,您还可以声明一个 bean,因此单个代理可以跨多个测试类使用。 为方便起见,我们提供了一个测试类级注释来注册 bean。 以下示例演示如何使用它:@ClassRuleEmbeddedKafkaBroker@EmbeddedKafkaEmbeddedKafkaBrokerSpring中文文档

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

从版本 2.2.4 开始,还可以使用批注来指定 Kafka ports 属性。@EmbeddedKafkaSpring中文文档

从版本 3.2 开始,将属性设置为使用 an 而不是 .krafttrueEmbeddedKafkaKraftBrokerEmbeddedKafkaZKBrokerSpring中文文档

下面的示例设置支持属性占位符解析的 、 和属性:topicsbrokerPropertiesbrokerPropertiesLocation@EmbeddedKafkaSpring中文文档

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://localhost:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

在前面的示例中,属性占位符 、 和 是从 Spring 解析的。 此外,代理属性是从 指定的类路径资源加载的。 针对 URL 和资源中找到的任何属性占位符解析属性占位符。 由在 中找到的替代属性定义的属性。${kafka.topics.another-topic}${kafka.broker.logs-dir}${kafka.broker.port}Environmentbroker.propertiesbrokerPropertiesLocationbrokerPropertiesLocationbrokerPropertiesbrokerPropertiesLocationSpring中文文档

您可以将注释用于 JUnit 4 或 JUnit 5。@EmbeddedKafkaSpring中文文档

@EmbeddedKafka使用 JUnit5 进行注释

从 2.3 版开始,有两种方法可以将注解与 JUnit5 一起使用。 当与注释一起使用时,嵌入式代理将添加到测试应用程序上下文中。 您可以在类或方法级别自动将代理连接到测试中,以获取代理地址列表。@EmbeddedKafka@SpringJunitConfigSpring中文文档

当不使用 spring 测试上下文时,创建一个代理;该条件包括一个参数解析器,因此您可以在测试方法中访问代理。EmbdeddedKafkaConditionSpring中文文档

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

将创建一个独立的代理(在 Spring 的 TestContext 之外),除非注释的类也用 . 并且如此元注释,并且当这些注释中的任何一个也存在时,将使用基于上下文的代理。@EmbeddedKafkaExtendWith(SpringExtension.class)@SpringJunitConfig@SpringBootTestSpring中文文档

当有可用的 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要在某处定义了属性,这些占位符就会被解析。 如果没有可用的 Spring 上下文,则不会解析这些占位符。
当有可用的 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要在某处定义了属性,这些占位符就会被解析。 如果没有可用的 Spring 上下文,则不会解析这些占位符。

注解中的嵌入式代理@SpringBootTest

Spring Initializr 现在会自动将 test 范围内的依赖项添加到项目配置中。spring-kafka-testSpring中文文档

如果应用程序使用 Kafka 绑定器,并且要使用嵌入式代理进行测试,则必须删除依赖项,因为它会将实际绑定器替换为测试用例的测试绑定器。 如果希望某些测试使用测试绑定器,而某些测试使用嵌入式代理,则使用实际绑定器的测试需要通过排除测试类中的绑定器自动配置来禁用测试绑定器。 以下示例演示如何执行此操作:spring-cloud-streamspring-cloud-stream-test-supportSpring中文文档

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

有几种方法可以在 Spring Boot 应用程序测试中使用嵌入式代理。Spring中文文档

它们包括:Spring中文文档

JUnit4 类规则

以下示例演示如何使用 JUnit4 类规则创建嵌入式代理:Spring中文文档

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

请注意,由于这是一个 Spring Boot 应用程序,因此我们覆盖了 broker list 属性来设置 Spring Boot 的属性。Spring中文文档

如果应用程序使用 Kafka 绑定器,并且要使用嵌入式代理进行测试,则必须删除依赖项,因为它会将实际绑定器替换为测试用例的测试绑定器。 如果希望某些测试使用测试绑定器,而某些测试使用嵌入式代理,则使用实际绑定器的测试需要通过排除测试类中的绑定器自动配置来禁用测试绑定器。 以下示例演示如何执行此操作:spring-cloud-streamspring-cloud-stream-test-supportSpring中文文档

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

@EmbeddedKafka@SpringJunitConfig

与 一起使用时,建议在测试类上使用。 这是为了防止在测试套件中运行多个测试后,在 JVM 关闭期间发生潜在的争用情况。 例如,如果不使用 ,则可能会提前关闭,而应用程序上下文仍需要其中的资源。 由于每次测试运行都会创建自己的临时目录,因此当发生此争用情况时,它将生成错误日志消息,指示它尝试删除或清理的文件不再可用。 添加将确保在每次测试后清理应用程序上下文,而不是缓存应用程序上下文,从而使其不易受到此类潜在资源争用条件的影响。@EmbeddedKafka@SpringJUnitConfig@DirtiesContext@DirtiesContextEmbeddedKafkaBrokerEmbeddedKafka@DirtiesContextSpring中文文档

@EmbeddedKafka注释或 BeanEmbeddedKafkaBroker

以下示例显示如何使用 Annotation 创建嵌入式代理:@EmbeddedKafkaSpring中文文档

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}
默认情况下,从版本 3.0.10 开始自动设置为。bootstrapServersPropertyspring.kafka.bootstrap-servers
默认情况下,从版本 3.0.10 开始自动设置为。bootstrapServersPropertyspring.kafka.bootstrap-servers

哈姆克雷斯特匹配器

提供以下匹配器:org.springframework.kafka.test.hamcrest.KafkaMatchersSpring中文文档

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

AssertJ 条件

可以使用以下 AssertJ 条件:Spring中文文档

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

以下示例汇集了本章中涵盖的大多数主题:Spring中文文档

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

前面的示例使用 Hamcrest 匹配器。 使用 ,最后一部分如以下代码所示:AssertJSpring中文文档

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

模拟消费者和生产者

该库提供用于测试目的的类。kafka-clientsMockConsumerMockProducerSpring中文文档

如果您希望在一些带有侦听器容器的测试中使用这些类,或者从版本 3.0.7 开始分别使用这些类,该框架现在提供和实现。KafkaTemplateMockConsumerFactoryMockProducerFactorySpring中文文档

这些工厂可以在侦听器容器和模板中使用,而不是默认工厂,后者需要正在运行(或嵌入的)代理。Spring中文文档

下面是一个返回单个使用者的简单实现示例:Spring中文文档

@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
            .toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
                        new RecordHeaders(), Optional.empty()));
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
                        new RecordHeaders(), Optional.empty()));
    });
    return new MockConsumerFactory(() -> consumer);
}

如果您希望使用并发性进行测试,工厂构造函数中的 lambda 每次都需要创建一个新实例。SupplierSpring中文文档

对于 ,有两个构造函数;一个用于创建一个简单的工厂,另一个用于创建支持事务的工厂。MockProducerFactorySpring中文文档

以下是示例:Spring中文文档

@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() ->
            new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer =
            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

请注意,在第二种情况下,lambda 是一个,如果调用方想要一个事务生产者,则第一个参数为 true;可选的第二个参数包含事务 ID。 这可以是默认值(如构造函数中提供的那样),也可以被 (或对于本地事务) 覆盖(如果这样配置)。 如果您希望使用基于此值的不同值,则会提供事务 ID。BiFunction<Boolean, String>KafkaTransactionManagerKafkaTemplateMockProducerSpring中文文档

如果在多线程环境中使用 producer,则 应返回多个 producer(可能使用 线程绑定)。BiFunctionThreadLocalSpring中文文档

事务 s 必须通过调用 来初始化事务。MockProducerinitTransaction()

使用 时,如果不想在每次发送后关闭生产者,则可以提供一个自定义实现,该实现覆盖不从 super 类调用该方法的方法。 这对于在不关闭同一生产者的情况下验证多个发布时进行测试非常方便。MockProducerMockProducerclosecloseSpring中文文档

下面是一个示例:Spring中文文档

@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}
事务 s 必须通过调用 来初始化事务。MockProducerinitTransaction()