Dubbo and Zookeeper

 

Dubbo的Provider,Consumer在启动时都会创建一个注册中心,注册中心可以选择Zookeeper,Redis。常用的是Zookeeper,我们这篇博客主要讲的就是Dubbo与Zookeeper的注册交互过程。

Dubbo里默认使用zkclient来操作zookeeper服务器,其对zookeeper原始客户单做了一定的封装,操作zookeeper时能便捷一些,比如不需要手动处理session超时,不需要重复注册watcher等等。

Dubbo在Zookeeper上注册的节点目录:假设接口名称是:com.bob.dubbo.service.CityDubboService

Dubbo启动时,Consumer和Provider都会把自身的URL格式化为字符串,然后注册到zookeeper相应节点下,作为一个临时节点,当连断开时,节点被删除。

Consumer在启动时,不仅仅会注册自身到 …/consumers/目录下,同时还会订阅…/providers目录,实时获取其上Provider的URL字符串信息。

下面我们就看相关的代码实现:

public class ZookeeperRegistry extends FailbackRegistry {

    ......

    /**
     * 默认端口
     */
    private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
    /**
     * 默认 Zookeeper 根节点
     */
    private final static String DEFAULT_ROOT = "dubbo";

    /**
     * Zookeeper 根节点
     */
    private final String root;
    /**
     * Service 接口全名集合
     */
    private final Set<String> anyServices = new ConcurrentHashSet<String>();
    /**
     * 监听器集合
     */
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners
        = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
    /**
     * Zookeeper 客户端
     */
    private final ZookeeperClient zkClient;

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);   // 调用父类FailbackRegistry的构造函数
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 获得 Zookeeper 根节点, 未指定 "group" 参数时为 dubbo
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); // `url.parameters.group` 参数值
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;   // root = "/dubbo"
        // 创建 Zookeeper Client
        zkClient = zookeeperTransporter.connect(url);
        // 添加 StateListener 对象。该监听器,在重连时,调用恢复方法。
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
}

public abstract class FailbackRegistry extends AbstractRegistry {

    ......

    /**
     * 失败发起注册失败的 URL 集合
     */
    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
    /**
     * 失败取消注册失败的 URL 集合
     */
    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
    /**
     * 失败发起订阅失败的监听器集合
     */
    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    /**
     * 失败取消订阅失败的监听器集合
     */
    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    /**
     * 失败通知通知的 URL 集合
     */
    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();

    public FailbackRegistry(URL url) {
        super(url);
        // 重试频率,单位:毫秒 ,默认 5*1000
        int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 创建失败重试定时器
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // Check and connect to the registry
                try {
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

    /**
     * 重试
     */
    // Retry the failed actions
    protected void retry() {
        // 重试执行注册
        if (!failedRegistered.isEmpty()) {
            ......
            for (URL url : failed) {
                try {
                    // 执行注册
                    doRegister(url);
                    // 移除出 `failedRegistered`
                    failedRegistered.remove(url);
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        // 重试执行取消注册
        if (!failedUnregistered.isEmpty()) {
            ......
            for (URL url : failed) {
                try {
                    // 执行取消注册
                    doUnregister(url);
                    // 移除出 `failedUnregistered`
                    failedUnregistered.remove(url);
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        // 重试执行订阅
        if (!failedSubscribed.isEmpty()) {
            ......
            for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                URL url = entry.getKey();
                Set<NotifyListener> listeners = entry.getValue();
                for (NotifyListener listener : listeners) {
                    try {
                        // 执行订阅
                        doSubscribe(url, listener);
                        // 移除监听器
                        listeners.remove(listener);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // 重试执行取消订阅
        if (!failedUnsubscribed.isEmpty()) {
            ......
            for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                URL url = entry.getKey();
                Set<NotifyListener> listeners = entry.getValue();
                for (NotifyListener listener : listeners) {
                    try {
                        // 执行取消订阅
                        doUnsubscribe(url, listener);
                        // 移除监听器
                        listeners.remove(listener);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }

}

ZookeeperRegistry 在实例化时,调用父类构造函数。在父类构造函数中,会创建一个定时任务,每隔5S执行retry( ) 方法。

在retry( ) 方法中,重试那些失败的动作。重试的动作包括:

1. Provider向zookeeper注册自身的url,生成一个临时的znode
2. Provider从Dubbo容器中退出,停止提供RPC调用。也就是移除zookeeper内自身url对应的znode
3. Consumer订阅 ” /dubbo/..Service/providers” 目录的子节点,生成ChildListener
4. Consumer从Dubbo容器中退出,移除之前创建的ChildListener

为什么如此设置? 主要是和zookeeper的通信机制有关的。当zookeeper的Client和Server连接断开,或者心跳超时,那么Server会将相应Client注册的临时节点删除,当然注册的Listener也相应删除。

而Provider和Consumer注册的URL就属于临时节点,当连接断开时,Dubbo注册了zookeeper的StateListener,也就是状态监听器,当Dubbo里的zookeeper Client和Server重新连接上时,将之前注册的的URL添加入这几个失败集合中,然后重新注册和订阅。

看ZookeeperRegistry 的构造函数,其添加了一个StateListener:

public class ZookeeperRegistry extends FailbackRegistry {

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        ......
        // 添加 StateListener 对象。该监听器,在重连时,调用恢复方法。
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }

}

public abstract class FailbackRegistry extends AbstractRegistry {

    protected void recover() throws Exception {
        // register 恢复注册,添加到 `failedRegistered` ,定时重试
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                failedRegistered.add(url);
            }
        }
        // subscribe 恢复订阅,添加到 `failedSubscribed` ,定时重试
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    addFailedSubscribed(url, listener);
                }
            }
        }
    }

}

ZookeeperRegistry 构造函数中为zookeeper的操作客户端添加了一个状态监听器 StateListener,当重新连接时( 重新连接意味着之前连接断开了 ),将已经注册和订阅的URL添加到失败集合中,定时重试,也就是重新注册和订阅。

zookeeper Client与Server断开连接后,会定时的不断尝试重新连接,当连接成功后就会触发一个Event,Dubbo注册了CONNECTED状态的监听器,当连接成功后重新注册和订阅。

zookeeper Server宕机了,Dubbo里的Client并没有对此事件做什么响应,当然其内部的zkClient会不停地尝试连接Server。当Zookeeper Server宕机了不影响Dubbo里已注册的组件的RPC调用,因为已经通过URL生成了Invoker对象,这些对象还在Dubbo容器内。当然因为注册中心宕机了,肯定不能感知到新的Provider。同时因为在之前订阅获得的Provider信息已经持久化到本地文件,当Dubbo应用重启时,如果zookeeper注册中心不可用,会加载缓存在文件内的Provider信息,还是能保证服务的高可用。

Consumer会一直维持着对Provider的ChildListener,监听Provider的实时数据信息。当Providers节点的子节点发生变化时,实时通知Dubbo,更新URL,同时更新Dubbo容器内的Consumer Invoker对象,只要是订阅成功均会实时同步Provider,更新Invoker对象,无论是第一次订阅还是断线重连后的订阅:

public class ZookeeperRegistry extends FailbackRegistry {

    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 处理所有 Service 层的发起订阅,例如监控中心的订阅
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                ......
                // 处理指定 Service 层的发起订阅,例如服务消费者的订阅
            } else {
                // 子节点数据数组
                List<URL> urls = new ArrayList<URL>();
                // 循环分类数组 , router, configurator, provider
                for (String path : toCategoriesPath(url)) {
                    // 获得 url 对应的监听器集合
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) { // 不存在,进行创建
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    // 获得 ChildListener 对象
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) { //  不存在子目录的监听器,进行创建 ChildListener 对象
                        // 订阅父级目录, 当有子节点发生变化时,触发此回调函数
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                // 变更时,调用 `#notify(...)` 方法,回调 NotifyListener
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    // 创建 Type 节点。该节点为持久节点。
                    zkClient.create(path, false);
                    // 向 Zookeeper ,PATH 节点,发起订阅,返回此节点下的所有子元素 path : /根节点/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    // 添加到 `urls` 中
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener, 在这一步从连接Provider,实例化Invoker
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

}

订阅获取Providers的最新URL字符串,调用notify(…)方法,通知监听器,最终会执行如下代码:

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    private volatile List<Configurator> configurators;

    private volatile Map<String, Invoker<T>> urlInvokerMap;

    private volatile Map<String, List<Invoker<T>>> methodInvokerMap;  

    private volatile Set<URL> cachedInvokerUrls; 


    private void refreshInvoker(List<URL> invokerUrls) {
        // 从zookeeper获取到的url已经没有合适的了,在订阅返回为空时,会手动生成一个 EMPTY_PROTOCOL 的 url
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.methodInvokerMap = null; // Set the method invoker map to null
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException(
                    "urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

}

更新Dubbo内的Invoker相关数据,保证Consumer能实时感知到Provider的信息,保证PRC调用不会出错。

以上就是Dubbo内Zookeeper注册中心的实现过程。

总结:

1.Provider和Consumer向Zookeeper注册临时节点,当连接断开时删除相应的注册节点。

2.Consumer订阅Providers节点的子节点,实时感知Provider的变化情况,实时同步自身的Invoker对象,保证RPC的可用性。