此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4! |
该 jar 包含一些有用的实用程序,可帮助测试您的应用程序。spring-kafka-test
嵌入式 Kafka 代理
提供了两种实现:
-
EmbeddedKafkaZKBroker
- 启动嵌入式实例的旧版实现(使用 时仍是默认值)。Zookeeper
EmbeddedKafka
-
EmbeddedKafkaKraftBroker
- 在 controller 和 broker 组合模式中使用 而不是 (从 3.1 开始)。Kraft
Zookeeper
有几种技术可用于配置代理,如以下各节所述。
KafkaTestUtils
org.springframework.kafka.test.utils.KafkaTestUtils
提供了许多 static 帮助程序方法来使用记录、检索各种记录偏移量等。
有关完整详细信息,请参阅其 Javadocs。
JUnit
org.springframework.kafka.test.utils.KafkaTestUtils
还提供了一些静态方法来设置 producer 和 consumer 属性。
下面的清单显示了这些方法签名:
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
从版本 2.5 开始,该方法将 设置为 。
这是因为,在大多数情况下,您希望使用者使用测试用例中发送的任何消息。
默认值是 which 意味着测试在使用者启动之前已经发送的消息将不会收到这些记录。
若要还原到以前的行为,请将属性设置为 after call the method。 使用嵌入式代理时,通常最佳做法是为每个测试使用不同的主题,以防止串扰。
如果由于某种原因无法做到这一点,请注意该方法的默认行为是在分配后将分配的分区查找到开头。
由于它无权访问使用者属性,因此您必须使用采用 boolean 参数的重载方法来查找末尾而不是开头。 |
提供了 的 JUnit 4 包装器,用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。
(有关与 JUnit 5 一起使用的信息,请参见@EmbeddedKafka Annotation)。
下面的清单显示了这些方法的签名:@Rule
EmbeddedKafkaZKBroker
@EmbeddedKafka
/**
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }
/**
*
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
JUnit4 不支持 。EmbeddedKafkaKraftBroker |
该类有一个 utility 方法,允许您使用它创建的所有主题。
以下示例演示如何使用它:EmbeddedKafkaBroker
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
它有一些实用程序方法可以从消费者那里获取结果。
下面的清单显示了这些方法签名:KafkaTestUtils
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
以下示例演示如何使用:KafkaTestUtils
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
当嵌入式 Kafka 和嵌入式 Zookeeper 服务器由 启动时,系统属性 named 设置为 Kafka 代理的地址,系统属性 named 设置为 Zookeeper 的地址。
为此属性提供了方便的常量 ( 和 )。EmbeddedKafkaBroker
spring.embedded.kafka.brokers
spring.embedded.zookeeper.connect
EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS
EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT
Kafka 代理的地址可以公开给任何任意且方便的属性,而不是默认的系统属性。
为此,可以在启动嵌入式 Kafka 之前设置 () 系统属性。
例如,使用 Spring Boot 时,应分别为自动配置 Kafka 客户端设置配置属性。
因此,在随机端口上使用嵌入式 Kafka 运行测试之前,我们可以将其设置为系统属性 - 它将使用它来公开其代理地址。
现在,这是此属性的默认值(从版本 3.0.10 开始)。spring.embedded.kafka.brokers
spring.embedded.kafka.brokers.property
EmbeddedKafkaBroker.BROKER_LIST_PROPERTY
spring.kafka.bootstrap-servers
spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers
EmbeddedKafkaBroker
使用 ,您可以为 Kafka 服务器提供其他属性。
有关可能的代理属性的更多信息,请参阅 Kafka Config。EmbeddedKafkaBroker.brokerProperties(Map<String, String>)
从版本 2.5 开始,该方法将 设置为 。
这是因为,在大多数情况下,您希望使用者使用测试用例中发送的任何消息。
默认值是 which 意味着测试在使用者启动之前已经发送的消息将不会收到这些记录。
若要还原到以前的行为,请将属性设置为 after call the method。 使用嵌入式代理时,通常最佳做法是为每个测试使用不同的主题,以防止串扰。
如果由于某种原因无法做到这一点,请注意该方法的默认行为是在分配后将分配的分区查找到开头。
由于它无权访问使用者属性,因此您必须使用采用 boolean 参数的重载方法来查找末尾而不是开头。 |
JUnit4 不支持 。EmbeddedKafkaKraftBroker |
配置主题
以下示例配置创建名为 AND 的主题(具有 5 个分区)、一个名为 (具有 10 个分区的主题)和一个名为 (具有 15 个分区) 的主题:cat
hat
thing1
thing2
public class MyTests {
@ClassRule
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");
@Test
public void test() {
embeddedKafkaRule.getEmbeddedKafka()
.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
默认情况下,当出现问题时(例如添加已存在的主题)将引发异常。
版本 2.6 添加了该方法的新版本,该版本返回 ;key 是 topic name,值是 success 或 an for failure。addTopics
Map<String, Exception>
null
Exception
对多个测试类使用相同的代理
您可以将同一个 broker 用于多个测试类,类似于以下内容:
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
.brokerListProperty("spring.kafka.bootstrap-servers");
private static boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
try {
embeddedKafka.afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
return embeddedKafka;
}
private EmbeddedKafkaHolder() {
super();
}
}
这假定 Spring Boot 环境,并且嵌入式代理替换了 bootstrap servers 属性。
然后,在每个测试类中,您可以使用类似于以下内容的内容:
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
如果不使用 Spring Boot,则可以使用 获取引导服务器。broker.getBrokersAsString()
前面的示例没有提供在所有测试完成后关闭代理的机制。
例如,如果您在 Gradle 守护进程中运行测试,这可能是一个问题。
在这种情况下,您不应该使用此技术,或者您应该在测试完成时使用一些东西来调用 。destroy() EmbeddedKafkaBroker |
从版本 3.0 开始,框架为 JUnit 平台公开了一个;默认情况下,它是禁用的。
这需要 JUnit Platform 1.8 或更高版本。
此侦听器的目的是为整个测试计划启动一个全局变量,并在计划结束时停止它。
要启用此侦听器,从而为项目中的所有测试使用单个全局嵌入式 Kafka 集群,必须通过系统属性或 JUnit Platform 配置将该属性设置为该属性。
此外,还可以提供以下属性:GlobalEmbeddedKafkaTestExecutionListener
EmbeddedKafkaBroker
spring.kafka.global.embedded.enabled
true
-
spring.kafka.embedded.count
- 要管理的 Kafka 代理数量; -
spring.kafka.embedded.ports
- 如果首选随机端口,则每个 Kafka broker 启动的端口(逗号分隔值);值的数量必须等于上述数量;0
count
-
spring.kafka.embedded.topics
- 要在启动的 Kafka 集群中创建的主题(逗号分隔值); -
spring.kafka.embedded.partitions
- 要为创建的主题预置的分区数; -
spring.kafka.embedded.broker.properties.location
- 其他 Kafka 代理配置属性的文件位置;此属性的值必须遵循 Spring 资源抽象模式; -
spring.kafka.embedded.kraft
- default false,当 true 时,使用 an 而不是 。EmbeddedKafkaKraftBroker
EmbeddedKafkaZKBroker
实质上,这些属性模拟了一些属性。@EmbeddedKafka
有关配置属性以及如何提供这些属性的更多信息,请参阅 JUnit 5 用户指南。
例如,可以将条目添加到 testing Classpath 中的文件中。
从版本 3.0.10 开始,默认情况下,代理会自动将其设置为 ,以便使用 Spring Boot 应用程序进行测试。spring.embedded.kafka.brokers.property=my.bootstrap-servers
junit-platform.properties
spring.kafka.bootstrap-servers
建议不要将全局嵌入式 Kafka 和 per-test 类合并到单个测试套件中。 它们共享相同的系统属性,因此很可能会导致意外行为。 |
spring-kafka-test 具有对 and 的传递依赖项(后者支持全局嵌入式代理)。
如果您希望使用嵌入式代理但不使用 JUnit,则可能希望排除这些依赖项。junit-jupiter-api junit-platform-launcher |
前面的示例没有提供在所有测试完成后关闭代理的机制。
例如,如果您在 Gradle 守护进程中运行测试,这可能是一个问题。
在这种情况下,您不应该使用此技术,或者您应该在测试完成时使用一些东西来调用 。destroy() EmbeddedKafkaBroker |
建议不要将全局嵌入式 Kafka 和 per-test 类合并到单个测试套件中。 它们共享相同的系统属性,因此很可能会导致意外行为。 |
spring-kafka-test 具有对 and 的传递依赖项(后者支持全局嵌入式代理)。
如果您希望使用嵌入式代理但不使用 JUnit,则可能希望排除这些依赖项。junit-jupiter-api junit-platform-launcher |
@EmbeddedKafka
注解
我们通常建议您使用该规则,以避免在测试之间启动和停止代理(并为每个测试使用不同的主题)。
从版本 2.0 开始,如果使用 Spring 的测试应用程序上下文缓存,则还可以声明一个 bean,因此可以在多个测试类中使用单个代理。
为方便起见,我们提供了一个测试类级 Comments,用于注册 Bean。
以下示例演示如何使用它:@ClassRule
EmbeddedKafkaBroker
@EmbeddedKafka
EmbeddedKafkaBroker
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class KafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
从版本 2.2.4 开始,您还可以使用注释来指定 Kafka ports 属性。@EmbeddedKafka
从版本 3.2 开始,将属性设置为使用 an 而不是 .kraft
true
EmbeddedKafkaKraftBroker
EmbeddedKafkaZKBroker
以下示例设置 support 属性占位符分辨率的 、 和 属性:topics
brokerProperties
brokerPropertiesLocation
@EmbeddedKafka
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://localhost:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
在前面的示例中,属性占位符 、 和 是从 Spring 解析的。
此外,代理属性是从 指定的类路径资源加载的。
为 URL 和资源中找到的任何属性占位符解析属性占位符。
由 中的 override 属性定义的属性。${kafka.topics.another-topic}
${kafka.broker.logs-dir}
${kafka.broker.port}
Environment
broker.properties
brokerPropertiesLocation
brokerPropertiesLocation
brokerProperties
brokerPropertiesLocation
您可以将注释与 JUnit 4 或 JUnit 5 一起使用。@EmbeddedKafka
@EmbeddedKafka
使用 JUnit5 进行注释
从版本 2.3 开始,有两种方法可以在 JUnit5 中使用 Comments。
当与 annotation 一起使用时,嵌入式代理将添加到 test 应用程序上下文中。
您可以在类或方法级别将代理自动连接到测试中,以获取代理地址列表。@EmbeddedKafka
@SpringJunitConfig
当不使用 spring test 上下文时,会创建一个 broker;该条件包括一个参数解析器,因此您可以在测试方法中访问代理。EmbdeddedKafkaCondition
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
将创建一个独立的代理(在 Spring 的 TestContext 之外),除非一个带 Comments 的类也用 . 并且是元注释的,并且当这些注释中的任何一个也存在时,将使用基于上下文的代理。@EmbeddedKafka
ExtendWith(SpringExtension.class)
@SpringJunitConfig
@SpringBootTest
当有可用的 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要在某处定义了该属性,这些占位符就会被解析。 如果没有可用的 Spring 上下文,则不会解析这些占位符。 |
当有可用的 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要在某处定义了该属性,这些占位符就会被解析。 如果没有可用的 Spring 上下文,则不会解析这些占位符。 |
Annotations 中的 Embedded Broker@SpringBootTest
Spring Initializr 现在会自动将 test 作用域中的依赖项添加到项目配置中。spring-kafka-test
如果您的应用程序在 Kafka Binder 中使用,并且您希望使用嵌入式代理进行测试,则必须删除依赖项,因为它会将实际 Binder 替换为测试用例的测试 Binder。
如果您希望某些测试使用测试 Binder,而一些测试使用嵌入式代理,则使用真实 Binder 的测试需要通过在测试类中排除 Binder auto 配置来禁用测试 Binder。
以下示例显示了如何执行此操作:
|
在 Spring Boot 应用程序测试中,有几种方法可以使用嵌入式代理。
他们包括:
JUnit4 类规则
以下示例说明如何使用 JUnit4 类规则创建嵌入式代理:
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
.brokerListProperty("spring.kafka.bootstrap-servers");
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
请注意,由于这是一个 Spring Boot 应用程序,因此我们覆盖 broker list 属性以设置 Spring Boot 的属性。
如果您的应用程序在 Kafka Binder 中使用,并且您希望使用嵌入式代理进行测试,则必须删除依赖项,因为它会将实际 Binder 替换为测试用例的测试 Binder。
如果您希望某些测试使用测试 Binder,而一些测试使用嵌入式代理,则使用真实 Binder 的测试需要通过在测试类中排除 Binder auto 配置来禁用测试 Binder。
以下示例显示了如何执行此操作:
|
@EmbeddedKafka
跟@SpringJunitConfig
当与 一起使用时,建议在测试类上使用。
这是为了防止在测试套件中运行多个测试后,在 JVM 关闭期间发生潜在的争用条件。
例如,如果不使用 ,则 可能会提前关闭,而应用程序上下文仍然需要来自它的资源。
由于每个测试运行都会创建自己的临时目录,因此当出现此争用条件时,它将生成错误日志消息,指示它尝试删除或清理的文件不再可用。
添加将确保应用程序上下文在每次测试后被清理而不是缓存,使其不易受到此类潜在资源争用情况的影响。@EmbeddedKafka
@SpringJUnitConfig
@DirtiesContext
@DirtiesContext
EmbeddedKafkaBroker
EmbeddedKafka
@DirtiesContext
@EmbeddedKafka
注解或 BeanEmbeddedKafkaBroker
以下示例显示了如何使用 Annotation 创建嵌入式代理:@EmbeddedKafka
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
从版本 3.0.10 开始,它会自动设置为默认。bootstrapServersProperty spring.kafka.bootstrap-servers |
从版本 3.0.10 开始,它会自动设置为默认。bootstrapServersProperty spring.kafka.bootstrap-servers |
Hamcrest 匹配器
提供以下匹配器:org.springframework.kafka.test.hamcrest.KafkaMatchers
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
AssertJ 条件
您可以使用以下 AssertJ 条件:
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
例
以下示例汇集了本章中介绍的大部分主题:
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "templateTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
前面的示例使用 Hamcrest 匹配程序。
使用 ,最后一部分类似于以下代码:AssertJ
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
模拟 Consumer 和 Producer
该库提供 和 类用于测试目的。kafka-clients
MockConsumer
MockProducer
如果您希望在某些带有侦听器容器的测试中使用这些类,或者分别使用这些类,从版本 3.0.7 开始,框架现在提供 和 implementations。KafkaTemplate
MockConsumerFactory
MockProducerFactory
这些工厂可以在侦听器容器和模板中使用,而不是默认工厂,后者需要运行(或嵌入)代理。
下面是一个返回单个使用者的简单实现示例:
@Bean
ConsumerFactory<String, String> consumerFactory() {
MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
TopicPartition topicPartition0 = new TopicPartition("topic", 0);
List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
.toMap(Function.identity(), tp -> 0L));
consumer.updateBeginningOffsets(beginningOffsets);
consumer.schedulePollTask(() -> {
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
new RecordHeaders(), Optional.empty()));
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
new RecordHeaders(), Optional.empty()));
});
return new MockConsumerFactory(() -> consumer);
}
如果您希望使用并发进行测试,则工厂构造函数中的 lambda 每次都需要创建一个新实例。Supplier
使用 ,有两个构造函数;一个用于创建简单工厂,另一个用于创建支持事务的工厂。MockProducerFactory
以下是示例:
@Bean
ProducerFactory<String, String> nonTransFactory() {
return new MockProducerFactory<>(() ->
new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}
@Bean
ProducerFactory<String, String> transFactory() {
MockProducer<String, String> mockProducer =
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
mockProducer.initTransactions();
return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}
请注意,在第二种情况下,lambda 是 a,如果调用方需要事务性生产者,则第一个参数为 true;可选的第二个参数包含事务 ID。
这可以是默认值(如构造函数中提供),也可以被(或对于本地事务)覆盖(如果已配置)。
如果您希望根据此值使用不同的 ID,则会提供事务 ID。BiFunction<Boolean, String>
KafkaTransactionManager
KafkaTemplate
MockProducer
如果您在多线程环境中使用 producer,则应返回多个 producer (可能使用 a 进行线程绑定)。BiFunction
ThreadLocal
必须通过调用 来初始化事务的事务。MockProducer initTransaction() |
使用 时,如果您不想在每次发送后关闭生产者,则可以提供一个自定义实现,该实现将覆盖不从超类调用该方法的方法。
这在验证同一生产者上的多个发布而不关闭它时,便于进行测试。MockProducer
MockProducer
close
close
下面是一个示例:
@Bean
MockProducer<String, String> mockProducer() {
return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
@Override
public void close() {
}
};
}
@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
return new MockProducerFactory<>(() -> mockProducer);
}
必须通过调用 来初始化事务的事务。MockProducer initTransaction() |