在以下示例中,我们将发布到名为 .
它是一个已分区的主题,对于此示例,我们假设该主题已创建具有三个分区。hello-pulsar-partitioned
@SpringBootApplication
public class PulsarBootPartitioned {
public static void main(String[] args) {
SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
}
@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
return args -> {
for (int i = 0; i < 10; i++) {
pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
}
};
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
static class FooRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
}
static class BarRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
}
}
static class BuzzRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
}
}
}
在前面的示例中,我们发布到分区主题,并且我们希望将一些数据段发布到特定分区。
如果将其保留为 Pulsar 的默认值,则它遵循分区分配的循环模式,我们想覆盖它。
为此,我们提供了一个带有 method 的 message router 对象。
考虑实现的 3 个消息路由器。 始终将数据发送到 分区 、 发送到分区 和 发送到分区 。
另请注意,我们现在使用 that 的方法返回一个 .
在运行应用程序时,我们还需要将 on the producer 设置为 ()。send
FooRouter
0
BarRouter
1
BuzzRouter
2
sendAsync
PulsarTemplate
CompletableFuture
messageRoutingMode
CustomPartition
spring.pulsar.producer.message-routing-mode
在消费者端,我们使用 a 和 exclusive 订阅类型。
这意味着来自所有分区的数据最终都位于同一个使用者中,并且没有排序保证。PulsarListener
如果我们希望每个分区都由单个不同的使用者使用,我们该怎么办?
我们可以切换到订阅模式并添加三个单独的使用者:failover
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
System.out.println("Message Received 3: " + foo);
}
当您遵循此方法时,单个分区始终由专用使用者使用。
同样,如果你想使用 Pulsar 的共享 consumer 类型,你可以使用 subscription 类型。
但是,当您使用该模式时,您将失去任何排序保证,因为单个使用者可能会在另一个使用者有机会之前从所有分区接收消息。shared
shared
请考虑以下示例:
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}