对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

为了查找,您的侦听器必须实现 ,它具有以下方法:ConsumerSeekAwareSpring中文文档

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions)

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

在启动容器和分配分区时调用。 在初始化后的任意时间进行查找时,应使用此回调。 您应该保存对回调的引用。 如果在多个容器(或 )中使用相同的侦听器,则应将回调存储在侦听器键控的某个结构中。registerSeekCallbackConcurrentMessageListenerContainerThreadLocalThreadSpring中文文档

使用组管理时,在分配分区时调用。 例如,可以使用此方法通过调用回调来设置分区的初始偏移量。 还可以使用此方法将此线程的回调与分配的分区相关联(请参阅下面的示例)。 您必须使用 callback 参数,而不是传入 的参数。 从版本 2.5.5 开始,即使使用手动分区分配,也会调用此方法。onPartitionsAssignedregisterSeekCallbackSpring中文文档

onPartitionsRevoked在容器停止或 Kafka 撤消分配时调用。 您应该放弃此线程的回调,并删除与已吊销分区的任何关联。Spring中文文档

回调有以下几种方法:Spring中文文档

void seek(String topic, int partition, long offset);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

seekRelative已添加到 2.3 版中,用于执行相对查找。Spring中文文档

这些方法也已添加到版本 2.3 中。seekToTimestampSpring中文文档

当为 or 方法中的多个分区查找相同的时间戳时,首选第二种方法,因为在对使用者方法的单个调用中查找时间戳的偏移量更有效。 从其他位置调用时,容器将收集所有时间戳查找请求,并调用 .onIdleContaineronPartitionsAssignedoffsetsForTimesoffsetsForTimes
当为 or 方法中的多个分区查找相同的时间戳时,首选第二种方法,因为在对使用者方法的单个调用中查找时间戳的偏移量更有效。 从其他位置调用时,容器将收集所有时间戳查找请求,并调用 .onIdleContaineronPartitionsAssignedoffsetsForTimesoffsetsForTimes

您还可以在检测到空闲容器时执行查找操作。 有关如何启用空闲容器检测的信息,请参阅检测空闲和无响应使用者onIdleContainer()Spring中文文档

接受集合的方法很有用,例如,在处理压缩主题时,并且您希望在每次启动应用程序时都查找到开头:seekToBeginning
接受集合的方法很有用,例如,在处理压缩主题时,并且您希望在每次启动应用程序时都查找到开头:seekToBeginning
public class MyListener implements ConsumerSeekAware {

    ...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}

若要在运行时任意查找,请使用相应线程的回调引用。registerSeekCallbackSpring中文文档

这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;在控制台中点击会导致所有分区查找到开头。<Enter>Spring中文文档

@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

为了更简单,版本 2.3 添加了类,该类跟踪要用于主题/分区的回调。 下面的示例演示如何在容器每次空闲时查找每个分区中处理的最后一条记录。 它还具有允许任意外部调用将分区倒带一条记录的方法。AbstractConsumerSeekAwareSpring中文文档

public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getSeekCallbacks()
            .forEach((tp, callback) ->
                callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbackFor(new TopicPartition(topic, partition))
            .seekRelative(topic, partition, -1, true);
    }

}

版本 2.6 为抽象类添加了方便的方法:Spring中文文档

  • seekToBeginning()- 查找所有分配的分区到开头。Spring中文文档

  • seekToEnd()- 查找所有分配的分区到最后。Spring中文文档

  • seekToTimestamp(long timestamp)- 将所有分配的分区查找到该时间戳所表示的偏移量。Spring中文文档

public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listn(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
    }

}