(1). 概述

在这里对Key Sahred模式进行测试,以对Key Shared有更进一步的了解.

(2). 案例代码

package help.lixin.pulsar.example;

import org.apache.pulsar.client.api.*;

import java.util.concurrent.TimeUnit;

public class KeySharedTest {
    private static final String SERVER_URL = "pulsar://localhost:6650";
    private static final String TOPIC = "persistent://public/default/test_2";

    public static void main(String[] args) throws Exception {
        Thread product = new ProductThread();
        product.setName("product");
        product.start();

        Thread consumer1 = new ConsumerThread();
        consumer1.setName("consumer-1");
        consumer1.start();

        Thread consumer2 = new ConsumerThread();
        consumer2.setName("consumer-2");
        consumer2.start();

        Thread consumer3 = new ConsumerThread();
        consumer3.setName("consumer-3");
        consumer3.start();

        TimeUnit.HOURS.sleep(1);
    }

    static class ProductThread extends Thread {
        @Override
        public void run() {
            try {
                PulsarClient client = PulsarClient.builder()
                        //
                        .serviceUrl(SERVER_URL)
                        //
                        .enableTcpNoDelay(true)
                        //
                        .build();
                Producer<String> producer = client.newProducer(Schema.STRING)
                        //
                        .producerName("my-producer")
                        // 指定生产者模式做测试
                        // .accessMode(ProducerAccessMode.Shared)
                        .topic(TOPIC)
                        //
                        .sendTimeout(10, TimeUnit.SECONDS)
                        //
                        .create();
                for (int i = 0; i < 500; i++) {
                    try {
                        String key = "00001";
                        if (i % 2 == 0) {
                            key = "00007";
                        }
                        // 同步发送消息,并指定:key
                        MessageId messageId = producer.newMessage().key(key).value("Hello World! " + key + " -- " + i).send();
                        // System.out.println(Thread.currentThread().getName() + " Send Message: " + messageId);
                        TimeUnit.SECONDS.sleep(3);
                    } catch (Exception ignore) {
                        System.out.println(Thread.currentThread().getName() + " Send Fail " + ignore.getMessage());
                    }
                }// end for
            } catch (Exception ignore) {
                System.out.println(Thread.currentThread().getName() + " Error " + ignore.getMessage());
            }// end catch
        }
    } // end


    static class ConsumerThread extends Thread {
        @Override
        public void run() {
            try {

                // 构造Pulsar Client
                PulsarClient client = PulsarClient.builder()
                        //
                        .serviceUrl(SERVER_URL)
                        //
                        .enableTcpNoDelay(true).build();

                Consumer consumer = client.newConsumer()
                        // 消息费者名称
                        .consumerName("my-consumer")
                        // 主题
                        .topic(TOPIC)
                        // 订阅名称
                        .subscriptionName("my-subscription")
                        //
                        .ackTimeout(10, TimeUnit.SECONDS)
                        //
                        .maxTotalReceiverQueueSizeAcrossPartitions(10)
                        //
                        .subscriptionType(SubscriptionType.Key_Shared)
                        //
                        .subscribe();
                do {
                    // 接收消息有两种方式:异步和同步
                    Message message = consumer.receive();
                    String key = message.getKey();
                    String format = String.format("%s", new String(message.getData()));
                    System.err.println(Thread.currentThread().getName() + " Revice Msg: " + format + " key: " + key);
                    // 消息确认机制
                    consumer.acknowledge(message);
                    // 否定这条消息.
                    // consumer.redeliverUnacknowledgedMessages();
                } while (true);
            } catch (Exception ignore) {
                System.out.println(Thread.currentThread().getName() + " Revice Msg ERROR: " + ignore.getMessage());
            }
        }
    } // end
}

(3). 测试结果

consumer-2 Revice Msg: Hello World! 00007 -- 0 key: 00007
consumer-3 Revice Msg: Hello World! 00001 -- 1 key: 00001
consumer-2 Revice Msg: Hello World! 00007 -- 2 key: 00007
consumer-3 Revice Msg: Hello World! 00001 -- 3 key: 00001
consumer-2 Revice Msg: Hello World! 00007 -- 4 key: 00007
consumer-3 Revice Msg: Hello World! 00001 -- 5 key: 00001
consumer-2 Revice Msg: Hello World! 00007 -- 6 key: 00007
consumer-3 Revice Msg: Hello World! 00001 -- 7 key: 00001
consumer-2 Revice Msg: Hello World! 00007 -- 8 key: 00007
consumer-3 Revice Msg: Hello World! 00001 -- 9 key: 00001
consumer-2 Revice Msg: Hello World! 00007 -- 10 key: 00007

(4). 总结

我创建三个线程订阅消息,从结果能看出来:仅有两个线程在消费,得到的结论就是:当线程数大于key的数量时,线程数是闲出来的.

(5). Pulsar与RocketMQ比较后的缺点

在RocketMQ里,不仅可以指定Topic,还可以指定Tag(Tag相当于Pulsar里的Key),而,Pulsar在Key Shared模式下,维度只能到Topic级别.
假如,有这样一个场景:应用程序部署是区分物理分区的,比如:华东区和华南区,华东区和华南区的”数据源”是隔离的,而Pulsar是所有物理分区共享的,那么问题来了,这种情况下,如果华南区消费了华东区的MQ数据,在进行”数据源”操作时,是连接不到华东区的”数据源”的(毕竟,MQ的消费不是那么纯粹,会伴随业务操作的数据落地),解决方案还是有的:

  1. 不同的物理分区(华南和华东)用不同的namespace(建议做法).
  2. 在不区分namespace的情况下,大家在一起消费消息,把不属于自己分区的消息给拒绝掉,只是这样做的情况下,这条消息会被拒N-1次.