(1). 前言

有这样一个需求:在IM服务器启动时,会把自己的ip的port向ZK进行注册,这个port主要进行消息路由功能(不对外服务).
所有的IM会获取这个IM服务列表,与这些IM服务列表保持着长连接.

(2). 实体定义

package help.lixin.zk.entity;

import java.util.Objects;

public class NodeInfo {
    private String ip;
    private String port;

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getPort() {
        return port;
    }

    public void setPort(String port) {
        this.port = port;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        NodeInfo nodeInfo = (NodeInfo) o;
        return Objects.equals(ip, nodeInfo.ip) && Objects.equals(port, nodeInfo.port);
    }

    @Override
    public int hashCode() {
        return Objects.hash(ip, port);
    }

    @Override
    public String toString() {
        return "NodeInfo{" +
                "ip='" + ip + '\'' +
                ", port=" + port +
                '}';
    }
}

(3). 接口定义

定义接口的目的是为了方便随意切换实现(Redis/MySQL/MongoDB).

package help.lixin.zk;

import help.lixin.zk.entity.NodeInfo;

import java.util.Collection;

public interface Regsiter {

    void register() throws Exception;

    void unRegister() throws Exception;

    Collection<NodeInfo> getNodes();
}

(4). 接口实现

package help.lixin.zk;

import help.lixin.zk.entity.NodeInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;

public class ZKRegsiter implements Regsiter {
    private static final String PATH = "/im/nodes";
    private static final String FORMAT = "%s" + "/" + "%s";
    private CuratorFramework client = null;
    private NodeInfo currentNodeInfo;
    private String zkHosts;

    public ZKRegsiter(CuratorFramework client, NodeInfo currentNodeInfo) {
        this.currentNodeInfo = currentNodeInfo;
        this.client = client;
    }

    @Override
    public void register() throws Exception {
        String nodePath = String.format(FORMAT, PATH, currentNodeInfo.getIp());
        // 1. 创建目录
        Stat stat = client.checkExists().forPath(PATH);
        if (null == stat) {
            client.create().creatingParentsIfNeeded() // 递归
                    .withMode(CreateMode.PERSISTENT) // 持久
                    .forPath(PATH);
        }

        // 2.在目录下,创建数据
        client.create().withMode(CreateMode.EPHEMERAL) // 瞬时
                .forPath(nodePath, currentNodeInfo.getPort().getBytes(StandardCharsets.UTF_8));

        // 3. 对目录进行监听,只要目录有变化,就触发:更新
        PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true);
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // TODO 设计监听器
            }
        });
    }

    @Override
    public void unRegister() throws Exception {
        String path = String.format(FORMAT, PATH, currentNodeInfo.getIp());
        Stat stat = client.checkExists().forPath(path);
        if (null != stat) {
            client.delete()
                    .guaranteed()  // 保证强制删除
                    .forPath(path);
        }
    }

    @Override
    public Collection<NodeInfo> getNodes() {
        // 重新获取ZK里的数据,并置入到缓存里
        Collection<NodeInfo> nodeInfos = new HashSet<>();
        try {
            Stat stat = client.checkExists().forPath(PATH);
            if (null != stat) {
                List<String> nodes = client.getChildren().forPath(PATH);
                for (String node : nodes) {
                    String path = String.format(FORMAT, PATH, node);
                    byte[] bytes = client.getData().forPath(path);
                    String port = new String(bytes);
                    NodeInfo nodeInfo = new NodeInfo();
                    nodeInfo.setIp(node);
                    nodeInfo.setPort(port);

                    nodeInfos.add(nodeInfo);
                }
            }
        } catch (Exception ignore) {
            // TODO WARRING
        }
        return nodeInfos;
    }
}

(5). pom.xml定义

<!-- 对zookeeper的底层api的一些封装 -->
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-framework</artifactId>
	<version>2.12.0</version>
</dependency>
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
	<version>2.12.0</version>
</dependency>

(6). 测试

package help.lixin;

import help.lixin.zk.Regsiter;
import help.lixin.zk.ZKRegsiter;
import help.lixin.zk.entity.NodeInfo;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.Collection;
import java.util.concurrent.TimeUnit;

public class RegsiterTest {
    public static void main(String[] args) throws  Exception {
        String zkHosts = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkHosts,
                5000, 5000, retryPolicy);
        client.start();

        NodeInfo nodeInfo1 = new NodeInfo();
        nodeInfo1.setIp("192.168.1.80");
        nodeInfo1.setPort("9000");


        NodeInfo nodeInfo2 = new NodeInfo();
        nodeInfo2.setIp("192.168.1.90");
        nodeInfo2.setPort("9000");

		// 模拟第一个实例(JVM进程)
        Regsiter regsiter = new ZKRegsiter(client,nodeInfo1);
        regsiter.register();
        Collection<NodeInfo> nodes = regsiter.getNodes();

		// 模拟第二个实例(JVM进程)
        Regsiter regsiter2 = new ZKRegsiter(client,nodeInfo2);
        regsiter2.register();
        Collection<NodeInfo> nodes2 = regsiter2.getNodes();

        System.out.println(nodes);
        System.out.println(nodes2);
    }
}

(7). 总结