dubbo订阅后dubbo 没有提供者服务提供者,后期更自动更新吗

spring+dubbo(4)
为方便开发测试,经常会在线下共用一个所有服务可用的注册中心,这时,如果一个正在开发中的服务提供者注册,可能会影响消费者不能正常运行。
可以让服务提供者开发方,只订阅服务(开发的服务可能依赖其它服务),而不注册正在开发的服务,通过直连测试正在开发的服务。
禁用注册配置:
&dubbo:registryaddress="10.20.153.10:9090"register="false"/&
&dubbo:registryaddress="10.20.153.10:9090?register=false"/&
只订阅配置
在注册中心查看只订阅的服务是否注册上?
1、“只订阅”指的是需要做开发调试的服务提供者,只向注册中心订阅其所依赖的服务,但
不向注册中心注册其本身可以提供的服务。
2、“只订阅”需要结合“直连提供者”配置来进行调用测试。(正在开发的本地服务只订阅,本地消费端
直连正在开发的本地服务进行调试)
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:7990次
排名:千里之外
原创:15篇
(2)(3)(4)(3)(7)(1)(2)(1)(1)(1)
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'dubbo直连、只订阅、只注册 - ice-wee的专栏 - CSDN博客
dubbo直连、只订阅、只注册
dubbo/springcloud
& & & &&在开发及测试环境下,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直联方式,将以服务接口为单位,忽略注册中心的提供者列表,
A接口配置点对点,不影响B接口从注册中心获取列表。
& & &&为方便开发测试,经常会在线下共用一个所有服务可用的注册中心,这时,如果一个正在开发中的服务提供者注册,可能会影响消费者不能正常运行。
解决方案:
& & &&可以让服务提供者开发方,只订阅服务(开发的服务可能依赖其它服务),而不注册正在开发的服务,通过直连测试正在开发的服务。
& & & &如果有两个镜像环境(例如环境A、B),两个注册中心,有一个服务(例如D)只在其中一个注册中心有部署,另一个注册中心还没来得及部署,而两个注册中心的其它应用都需要依赖此服务,所以需要将服务同时注册到两个注册中心,但却不能让此服务同时依赖两个注册中心的其它服务(其它服务:例如服务A)。
解决方案:
& & &&可以让服务提供者方,只注册服务到另一注册中心,而不从另一注册中心订阅服务。
**我博客所有文章目录:
我的热门文章Dubbo中订阅和通知解析 | 大程熙
Dubbo中关于服务的订阅和通知主要发生在服务提供方暴露服务的过程和服务消费方初始化时候引用服务的过程中。
服务引用过程中的订阅和通知在服务消费者初始化的过程中,会有一步是进行服务的引用,具体的代码是在RegistryProtocol的refer方法:
12345678910111213141516171819public &T& Invoker&T& refer(Class&T& type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//在这一步获取注册中心实例的过程中,也会有notify的操作。(这里省略)
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
// group=&a,b& or group=&*&
Map&String, String& qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() & 0 ) {
if ( ( MA_SPLIT_PATTERN.split( group ) ).length & 1
|| &*&.equals( group ) ) {
return doRefer( getMergeableCluster(), registry, type, url );
return doRefer(cluster, registry, type, url);}
在refer方法中有一步是获取注册中心实例,这一步中也会有一个notify操作,先暂时不解释。接着就是doRefer方法:
1234567891011121314151617181920private &T& Invoker&T& doRefer(Cluster cluster, Registry registry, Class&T& type, URL url) {
RegistryDirectory&T& directory = new RegistryDirectory&T&(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
//订阅的url
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//服务消费方向注册中心注册自己,供其他层使用,比如服务治理
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
//订阅服务提供方
//同时订阅了三种类型providers,routers,configurators。
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ &,& + Constants.CONFIGURATORS_CATEGORY
+ &,& + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);}
在doRefer方法中服务消费者会订阅服务,同时订阅了三种类型:providers,routers,configurators。
接续看directory.subscribe订阅方法,这里directory是RegistryDirectory:
12345678public void subscribe(URL url) { //设置消费者url
setConsumerUrl(url);
//url为订阅条件,不能为空
//第二个参数this,是变更事件监听器,不允许为空,RegistryDirectory实现了NotifyListener接口,因此是一个事件监听器
registry.subscribe(url, this);}
这里registry是ZookeeperRegistry,在ZookeeperRegistry调用subscribe处理之前会先经过AbstractRegistry的处理,然后经过FailbackRegistry处理,在FailbackRegistry中会调用ZookeeperRegistry的doSubscribe方法。
首先看下AbstractRegistry中subscribe方法:
12345678910111213141516public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException(&subscribe url == null&);
if (listener == null) {
throw new IllegalArgumentException(&subscribe listener == null&);
//从缓存中获取已经订阅的url的监听器
Set&NotifyListener& listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet&NotifyListener&());
listeners = subscribed.get(url);
//将当前监听器添加到监听器的set中
listeners.add(listener);}
然后是FailbackRegistry的subscribe方法:
123456789101112131415161718192021222324252627282930313233public void subscribe(URL url, NotifyListener listener) { //上面AbstractRegistry的处理
super.subscribe(url, listener);
//移除订阅失败的
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
//子类实现,我们这里使用的是ZookeeperRegistry
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t =
List&URL& urls = getCacheUrls(url);
if (urls != null && urls.size() & 0) {
//订阅失败,进行通知,重试
notify(url, listener, urls);
} else {
// 如果开启了启动时检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperE
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
throw new IllegalStateException(&Failed to subscribe & + url + &, cause: & + t.getMessage(), t);
// 将失败的订阅请求记录到失败列表,定时重试
addFailedSubscribed(url, listener);
}}
这里总共进行了一下几件事情:
AbstractRegistry的处理
移除订阅失败的
由具体的子类向服务器端发送订阅请求
如果订阅发生失败了,尝试获取缓存url,然后进行失败通知或者如果开启了启动时检测,则直接抛出异常
将失败的订阅请求记录到失败列表,定时重试
主要看下子类向服务器段发送订阅请求的步骤,在ZookeeperRegistry的doSubscribe方法中:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {//这里暂时没用到先不解释
String root = toRootPath();
ConcurrentMap&NotifyListener, ChildListener& listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap&NotifyListener, ChildListener&());
listeners = zkListeners.get(url);
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List&String& currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (! anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
zkListener = listeners.get(listener);
zkClient.create(root, false);
List&String& services = zkClient.addChildListener(root, zkListener);
if (services != null && services.size() & 0) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
} else {
List&URL& urls = new ArrayList&URL&();
//这里的path分别为providers,routers,configurators三种
for (String path : toCategoriesPath(url)) {
//根据url获取对应的监听器map
ConcurrentMap&NotifyListener, ChildListener& listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap&NotifyListener, ChildListener&());
listeners = zkListeners.get(url);
//根据我们的listener获取一个ChildListener实例
ChildListener zkListener = listeners.get(listener);
//没有的话就创建一个ChildListener实例。
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List&String& currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
zkListener = listeners.get(listener);
//根据path在Zookeeper中创建节点,这里就是订阅服务
zkClient.create(path, false);
//这里zkClient是dubbo的ZookeeperClient,在addChildListener中会转化为ZkClient的Listener
List&String& children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
//订阅完成之后,进行通知
notify(url, listener, urls);
} catch (Throwable e) {
throw new RpcException(&Failed to subscribe & + url + & to zookeeper & + getUrl() + &, cause: & + e.getMessage(), e);
}}
上面主要是分别对providers,routers,configurators三种不同类型的进行订阅,也就是往zookeeper中注册节点,注册之前先给url添加监听器。最后是订阅完之后进行通知。
notify方法,这里notify方法实现是在ZookeeperRegistry的父类FailbackRegistry中:
1234567891011121314151617181920protected void notify(URL url, NotifyListener listener, List&URL& urls) {
if (url == null) {
throw new IllegalArgumentException(&notify url == null&);
if (listener == null) {
throw new IllegalArgumentException(&notify listener == null&);
try {
//doNotify方法中没做处理,直接调用父类的notify方法
doNotify(url, listener, urls);
} catch (Exception t) {
// 将失败的通知请求记录到失败列表,定时重试
Map&NotifyListener, List&URL&& listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap&NotifyListener, List&URL&&());
listeners = failedNotified.get(url);
listeners.put(listener, urls);
}}
看下AbstractRegistry的notify方法:
123456789101112131415161718192021222324252627282930313233protected void notify(URL url, NotifyListener listener, List&URL& urls) {
Map&String, List&URL&& result = new HashMap&String, List&URL&&();
//获取catagory列表,providers,routers,configurators
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List&URL& categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList&URL&();
result.put(category, categoryList);
categoryList.add(u);
if (result.size() == 0) {
//已经通知过
Map&String, List&URL&& categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap&String, List&URL&&());
categoryNotified = notified.get(url);
for (Map.Entry&String, List&URL&& entry : result.entrySet()) {
//providers,routers,configurators中的一个
String category = entry.getKey();
List&URL& categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
//还记得刚开始的时候,listener参数么,这里listener是RegistryDirectory
listener.notify(categoryList);
}}
继续看RegistryDirectory的notify方法:
1234567891011121314151617181920212223242526272829303132333435363738394041424344public synchronized void notify(List&URL& urls) { //三种类型分开
List&URL& invokerUrls = new ArrayList&URL&();
List&URL& routerUrls = new ArrayList&URL&();
List&URL& configuratorUrls = new ArrayList&URL&();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
// configurators
//更新缓存的服务提供方配置规则
if (configuratorUrls != null && configuratorUrls.size() &0 ){
this.configurators = toConfigurators(configuratorUrls);
// routers
//更新缓存的路由配置规则
if (routerUrls != null && routerUrls.size() &0 ){
List&Router& routers = toRouters(routerUrls);
if(routers != null){ // null - do nothing
setRouters(routers);
List&Configurator& localConfigurators = this. // local reference
// 合并override参数
this.overrideDirectoryUrl = directoryU
if (localConfigurators != null && localConfigurators.size() & 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
// providers
//重建invoker实例
refreshInvoker(invokerUrls);}
最重要的重建invoker实例,在服务引用的文章中已经介绍过,不再重复,还有上面说省略的获取注册中心实例的过程中,也会有notify的操作。(这里省略)这里也是进行了invoker实例的重建。
暴露服务过程中的订阅和通知服务暴露过程中的订阅在RegistryProtocol的export方法中:
12345678910111213141516171819202122232425262728293031323334353637383940public &T& Exporter&T& export(final Invoker&T& originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper&T& exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
//OverrideListener是RegistryProtocol的内部类
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//订阅override数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter&T&() {
public Invoker&T& getInvoker() {
return exporter.getInvoker();
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
};}
registry.subscribe订阅override数据,会首先经过AbstractRegistry处理,然后经过FailbackRegistry处理。处理方法在上面消费者发布订阅的讲解中都已经介绍。往下的步骤基本相同,不同之处在于AbstractRegistry的notify方法:
12345678910111213141516171819202122232425262728293031323334protected void notify(URL url, NotifyListener listener, List&URL& urls) {
Map&String, List&URL&& result = new HashMap&String, List&URL&&();
//获取catagory列表,providers,routers,configurators
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List&URL& categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList&URL&();
result.put(category, categoryList);
categoryList.add(u);
if (result.size() == 0) {
//已经通知过
Map&String, List&URL&& categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap&String, List&URL&&());
categoryNotified = notified.get(url);
for (Map.Entry&String, List&URL&& entry : result.entrySet()) {
//providers,routers,configurators中的一个
String category = entry.getKey();
List&URL& categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
//对于消费者来说这里listener是RegistryDirectory
//而对于服务提供者来说这里是OverrideListener,是RegistryProtocol的内部类
listener.notify(categoryList);
}}
接下来看OverrideListener的notify方法:
123456789101112131415161718192021222324252627282930313233343536373839404142434445/* *
provider 端可识别的override url只有这两种. *
override://0.0.0.0/serviceName?timeout=10 *
override://0.0.0.0/?timeout=10 */public void notify(List&URL& urls) {
List&URL& result =
for (URL url : urls) {
URL overrideUrl =
if (url.getParameter(Constants.CATEGORY_KEY) == null
&& Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
// 兼容旧版本
overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY);
if (! UrlUtils.isMatch(subscribeUrl, overrideUrl)) {
if (result == null) {
result = new ArrayList&URL&(urls);
result.remove(url);
logger.warn(&Subsribe category=configurator, but notifed non-configurator urls. may be registry bug. unexcepted url: & + url);
if (result != null) {
this.configurators = RegistryDirectory.toConfigurators(urls);
List&ExporterChangeableWrapper&?&& exporters = new ArrayList&ExporterChangeableWrapper&?&&(bounds.values());
for (ExporterChangeableWrapper&?& exporter : exporters){
Invoker&?& invoker = exporter.getOriginInvoker();
final Invoker&?& originI
if (invoker instanceof InvokerDelegete){
originInvoker = ((InvokerDelegete&?&)invoker).getInvoker();
}else {
originInvoker =
URL originUrl = RegistryProtocol.this.getProviderUrl(originInvoker);
URL newUrl = getNewInvokerUrl(originUrl, urls);
if (! originUrl.equals(newUrl)){
//对修改了url的invoker重新export
RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
}}
这里也是对Invoker重新进行了引用。
坚持原创技术分享,您的支持将鼓励我继续创作!
支付宝打赏今天看啥 热点:
dubbo学习过程、使用经验分享及实现原理简单介绍,dubbo经验分享
部门去年年中开始各种改造,第一步是模块服务化,这边初选dubbo试用在一些非重要模块上,慢慢引入到一些稍微重要的功能上,半年时间,学习过程及线上使用遇到的些问题在此总结下。
整理这篇文章差不多花了两天半时间,请尊重劳动成果,如转载请注明出处http://blog.csdn.net/hzzhoushaoyu/article/details/
二、什么是dubbo
Dubbo是阿里巴巴提供的开源的SOA服务化治理的技术框架,据说只是剖出来的一部分开源的,但一些基本的需求已经可以满足的,而且扩展性也非常好(至今没领悟到扩展性怎么做到的),通过spring bean的方式管理配置及实例,较容易上手且对应用无侵入。更多介绍可戳http://alibaba.github.io/dubbo-doc-static/Home-zh.htm。
三、如何使用dubbo
1.服务化应用基本框架
如上图所示,一个抽象出来的基本框架,consumer和provider是框架中必然存在的,Registry做为全局配置信息管理模块,推荐生产环境使用Registry,可实时推送现存活的服务提供者,Monitor一般用于监控和统计RPC调用情况、成功率、失败率等情况,让开发及运维了解线上运行情况。
应用执行过程大致如下:
服务提供者启动,根据协议信息绑定到配置的IP和端口上,如果已有服务绑定过相同IP和端口的则跳过注册服务信息至注册中心客户端启动,根据接口和协议信息订阅注册中心中注册的服务,注册中心将存活的服务地址通知到客户端,当有服务信息变更时客户端可以通过定时通知得到变更信息在客户端需要调用服务时,从内存中拿到上次通知的所有存活服务地址,根据路由信息和负载均衡机制选择最终调用的服务地址,发起调用通过filter分别在客户端发送请求前和服务端接收请求后,通过异步记录一些需要的信息传递到monitor做监控或者统计
2.服务接口定义
一般单独有一个jar包,维护服务接口定义、RPC参数类型、RPC返回类型、接口异常、接口用到的常量,该jar包中不处理任何业务逻辑。
比如命名api-0.1.jar,在api-0.1.jar中定义接口
public interface UserService
public RpcResponseDto isValidUser(RpcAccountRequestDto requestDto) throws new RpcBusinessException, RpcSystemE
并在api-0.1.jar中定义RpcResponseDto,RpcAccountRequestDto,RpcBusinessException,RpcSystemException。
服务端通过引用该jar包实现接口并暴露服务,客户端引用该jar包引用接口的代理实例。
3.注册中心
开源的dubbo已支持4种组件作为注册中心,我们部门使用推荐的zookeeper做为注册中心,由于就瓶颈来说不会出现在注册中心,风险较低,未做特别的研究或比较。
zookeeper,推荐集群中部署奇数个节点,由于zookeeper挂掉一半的机器集群就不可用,所以部署4台和3台的集群都是在挂掉2台后集群不可用redismulticast,广播受到网络结构的影响,一般本地不想搭注册中心的话使用这种调用dubbo简易注册中心
对于zookeeper客户端,dubbo在2.2.0之后默认使用zkclient,2.3.0之后提供可选配置Curator,提到这个点的原因主要是因为zkclient发现一些问题:①服务器在修改服务器时间后zkClient会抛出日志错误之类的异常然后容器(我们使用resin)挂掉了,也不能确定就是zkClient的问题,接入dubbo之前无该问题②dubbo使用zkclient不传入连接zookeeper等待超时时间,使用默认的Integer.MAX_VALUE,这样在zookeeper连不上的情况下不报错也无法启动;目前我们准备寻找其他解决方案,比如使用curator试下,还没正式投入。
配置应用名
&dubbo:application name=&test&/&
配置dubbo注解识别处理器,不指定包名的话会在spring bean中查找对应实例的类配置了dubbo注解的
&dubbo:annotation/&
配置注册中心,通过group指定注册中心分组,可通过register配置是否注册到该注册中心以及subscribe配置是否从该注册中心订阅
&dubbo:registry address=&zookeeper://127.0.0.1:2181/& group=&test&/&配置服务协议,多网卡可通过IP指定绑定的IP地址,不指定或者指定非法IP的情况下会绑定在0.0.0.0,使用Dubbo协议的服务会在初始化时建立长连接
&dubbo:protocol name=&dubbo& port=&20880& accesslog=&d:/access.log&&&/dubbo:protocol&通过xml配置文件配置服务暴露,首先要有个spring bean实例(无论是注解配置的还是配置文件配置的),在下面ref中指定bean实例ID,作为服务实现类
&dubbo:service interface=&com.web.foo.service.FirstDubboService& ref=&firstDubboServiceImpl& version=&1.0&&&/dubbo:service&通过注解方式配置服务暴露,Component是Spring bean注解,Service是dubbo的注解(不要和spring bean的service注解弄混),如前文所述,dubbo注解只会在spring bean中被识别
@Component
@Service(version=&1.0&)
public class FirstDubboServiceImpl implements FirstDubboService
public void sayHello(TestDto test)
System.out.println(&Hello World!&);
同服务端配置应用名、注解识别处理器和注册中心。
配置客户端reference bean。客户端跟服务端不同的是客户端这边没有实际的实现类的,所以配置的dubbo:reference实际会生成一个spring bean实例,作为代理处理Dubbo请求,然后其他要调用处直接使用spring bean的方式使用这个实例即可。
xml配置文件配置方式,id即为spring bean的id,之后无论是在spring配置中使用ref=&firstDubboService&还是通过@Autowired注解都OK
&dubbo:reference interface=&com.web.foo.service.FirstDubboService&
version=&1.0& id=&firstDubboService& &&/dubbo:reference&
另外开发、测试环境可通过指定Url方式绕过注册中心直连指定的服务地址,避免注册中心中服务过多,启动建立连接时间过长,如
&dubbo:reference interface=&com.web.foo.service.FirstDubboService&
version=&1.0& id=&firstDubboService& url=&dubbo://127.0.0.1:20880/&&&/dubbo:reference&
注解配置方式引用,
@Component
public class Consumer
@Reference(version=&1.0&)
private FirstDubboS
public void test()
TestDto test = new TestDto();
test.setList(Arrays.asList(new String[]{&a&, &b&}));
test.setTest(&t&);
service.sayHello(test);
}Reference被识别的条件是spring bean实例对应的当前类中的field,如上是直接修饰spring bean当前类中的属性
这个地方看了下源码,本应该支持当前类和父类中的public set方法,但是看起来是个BUG,Dubbo处理reference处部分源码如下
Method[] methods = bean.getClass().getMethods();
for (Method method : methods) {
String name = method.getName();
if (name.length() & 3 && name.startsWith(&set&)
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())
&& ! Modifier.isStatic(method.getModifiers())) {
Reference reference = method.getAnnotation(Reference.class);
if (reference != null) {
Object value = refer(reference, method.getParameterTypes()[0]);
if (value != null) {
method.invoke(bean, new Object[] {
});//??这里不是应该把value作为参数调用么,而且为什么上面if条件判断参数为1这里不传参数
} catch (Throwable e) {
logger.error(&Failed to init remote service reference at method & + name + & in class & + bean.getClass().getName() + &, cause: & + e.getMessage(), e);
6.监控中心
如果使用Dubbo自带的监控中心,可通过简单配置即可,先通过github获得dubbo-monitor的源码,部署启动后在应用配置如下
&dubbo:monitor protocol=&registry& /& &!--通过注册中心获取monitor地址后建立连接--&
&dubbo:monitor address=&dubbo://127.0.0.1:7070/com.alibaba.dubbo.monitor.MonitorService& /& &!--绕过注册中心直连monitor,同consumer直连--&
7.服务路由
最重要辅助功能之一,可随时配置路由规则调整客户端调用策略,目前dubbo-admin中已提供基本路由规则的配置UI,到github下载源码部署后很容易找到地方,这里简单介绍下怎么用路由。
下面是dubbo-admin的新建路由界面,可配置信息都在图片中有,
比如现在我们有10.0.0.1~3三台消费者和10.0.0.4~6三台服务提供者,想让1和2调用4,3调用5和6的话,则可以配置两个规则,
1.消费者IP:10.0.0.1,10.0.0.2 ;提供者IP:10.0.0.4
2.消费者IP:10.0.0.3;提供者IP:10.0.0.5,10.0.0.6
另外,IP地址支持结尾为*匹配所有,如10.0.0.*或者10.0.*等。
不匹配的配置规则和匹配的配置规则是一致的。
配置完成后可在消费者标签页查看路由结果
8.负载均衡
dubbo提供4种负载均衡方式:
Random,随机,按权重配置随机概率,调用量越大分布越均匀,默认是这种方式RoundRobin,轮询,按权重设置轮询比例,如果存在比较慢的机器容易在这台机器的请求阻塞较多LeastActive,最少活跃调用数,不支持权重,只能根据自动识别的活跃数分配,不能灵活调配ConsistentHash,一致性hash,对相同参数的请求路由到一个服务提供者上,如果有类似灰度发布需求可采用
dubbo的负载均衡机制是在客户端调用时通过内存中的服务方信息及配置的负责均衡策略选择,如果对自己系统没有一个全面认知,建议先采用random方式。
9.dubbo过滤器
有需要自己实现dubbo过滤器的,可关注如下步骤:
如下是dubbo rpc access log的过滤器,仅对服务提供方有效,且参数中需要带accesslog,也就是配置protocol或者serivce时配置的accesslog=&d:/rpc_access.log&
@Activate(group = Constants.PROVIDER, value = Constants.ACCESS_LOG_KEY)
public class AccessLogFilter implements Filter {
10.其他特性
http://alibaba.github.io/dubbo-doc-static/User+Guide-zh.htm#UserGuide-zh-%3Cdubbo%3Amonitor%2F%3E
可关注以上链接内容,dubbo提供较多的辅助功能特性,大多目前我们暂时未使用到,后续我们这边关注到的两个特性可能会再引进来使用:
结果缓存,省得自己再去写一个缓存,对缓存没有特殊要求的话直接使用dubbo的好了分组合并,对RPC接口不同的实现方式分别调用然后合并结果的一种调用模式,比如我们要查用户是否合法,一种我们要查是否在黑名单,同时我们还要关注登录信息是否异常,然后合并结果
四、前车之鉴
这个主要是在整个学习及使用过程中记录的,以及一些同事在初识过程问过我的,这边做了整理然后直接列举在下面:
1.服务版本号
引用只会找相应版本的服务
&dubbo:serviceinterface=“com.xxx.XxxService” ref=“xxxService” version=“1.0” /&
&dubbo:referenceid=“xxxService” interface=“com.xxx.XxxService” version=“1.0”/&
为了今后更换接口定义发布在线时,可不停机发布,使用版本号
2.暴露一个内网一个外网IP问题
为了在测试环境提供一个内网访问的地址和一个办公区访问的地址。
o增加一个指定IP为内网地址的服务协议
o增加一个不指定IP的服务协议,但是在/etc/hosts中hostname对应的IP要为外网IP
上面这种方案是一开始使用的方案,后面发现dubbo在启动过程无论是否配路由还是会一个个去连接,虽然不影响启动,但是由于存在超时所以会影响启动时间,而且每台机器还得特别配置指定IP,后面使用另外一套方案:
使用这种方式需要注意做好防火墙控制等,比如在线默认也是不指定IP,会绑定在0.0.0.0,如果非法人员知道调用的外网IP和端口,而且可以直接访问就麻烦了(如果在应用中做IP拦截也成,需要注意有防范措施)。
3.dubbo reference注解问题
前文介绍使用时已经提到过,@Reference只能在spring bean实例对应的当前类中使用,暂时无法在父类使用;如果确实要在父类声明一个引用,可通过配置文件配置dubbo:reference,然后在需要引用的地方跟引用spring bean一样就行
4.服务超时问题
目前如果存在超时,情况基本都在如下几点:
客户端耗时大,也就是超时异常时的client elapsed xxx,这个是从创建Future对象开始到使用channel发出请求的这段时间,中间没有复杂操作,只要CPU没问题基本不会出现大耗时,顶多1ms属于正常IOThread繁忙,默认情况下,dubbo协议一个客户端与一个服务提供者会建立一个共享长连接,如果某个客户端处于特别繁忙而且一直往一个服务提供者塞请求,可能造成IOThread阻塞,一般非常特殊的情况才会出现服务端工作线程池中线程全部繁忙,接收消息后塞入队列等待,如果等待时间比预想长会引起超时网络抖动,如果上述情况都排除了,还出现在请求发出后,服务接收请求前超过预想时间,只能归类到网络抖动了,需要SA一起查看问题服务自身耗时大,这个需要应用自身做好耗时统计,当出现这种情况的时候需要用数据来说明问题及规划优化方案,建议采用缓存埋点的方式统计服务中各个执行阶段的耗时情况,最终如果超过预想时间则把缓存统计的耗时情况打日志,减少日志量,且能够得到更明确的信息
现在我们应用使用过程中发现两种类型的耗时,一种我们目前只能归类到网络抖动,后续需要找运维一起关注这个问题,另外一种是由于一些历史原因,数据库查询容易发生抖动,总有一个时间点会突然多出很多超时。
5.服务保护
服务保护的原则上是避免发生类似雪崩效应,尽量将异常控制在服务周围,不要扩散开。
说到雪崩效应,还得提下dubbo自身的重试机制,默认3次,当失败时会进行重试,这样在某个时间点出现性能问题,然后调用方再连续重复调用,很容易引起雪崩,建议的话还是很据业务情况规划好如何进行异常处理,何时进行重试。
服务保护的话,目前我们主要从以下几个方面来实施,也不成熟,还在摸索:
考虑服务的dubbo线程池类型(fix线程池的话考虑线程池大小)、数据库连接池、dubbo连接数限制是否都合适
考虑服务超时时间和重试的关系,设置合适的值
一定时间内服务异常数较大,则可考虑使用failfast让客户端请求直接返回或者让客户端不再请求
经领导推荐,还在学习Release it,后续有其他想法,再回头来编辑。
6.zkclient的问题
前文已经提到过zkclient有两个问题,修改服务器时间会导致容器挂掉;dubbo使用zkclient没有传超时时间导致zookeeper无法连接的时候,直接阻塞Integer.MAX_VALUE。
正在调研curator,目前只能说curator不会在无法连接的时候直接阻塞。
另外zkclient和curator的jar包应该都是jdk1.6编译的,所以系统还在jdk1.5以下的话无法使用。
7.注册中心的分组group和服务的不同实现group
这两个东西完全不同的概念,使用的时候不要弄混了。
registry上可以配置group,用于区分不同分组的注册中心,比如在同一个注册中心下,有一部分注册信息是要给开发环境用的,有一部分注册信息时要给测试环境用的,可以分别用不同的group区分开,目前对这个理解还不透彻,大致就是用于区分不同环境。
service和reference上也可以配置group,这个用于区分同一个接口的不同实现,只有在reference上指定与service相同的group才会被发现,还有前文提到的分组合并结果也是用的这个。
五、dubbo如何工作的
其实dubbo整个框架内容并不算大,仔细看的话可能最多两天看完一遍,但是目前还是没领悟到怎么做到的扩展性,学习深度还不够~
要学习dubbo源码的话,必须要拿出官方高清大图才行。
这张图看起来挺复杂的样子,真正拆分之后对照源码来看会发现非常清晰、简单直观。
1.如何跟进源码
入口就是各种dubbo配置项的解析,&dubbo:xxx /&都是spring namespace,可以看到dubbo jar包下META-INF里面的spring.handlers,自定义的spring namespace处理器。
对于spring不太熟的同学可以先了解下这个功能,入口都在这里,解析成功后每个&dubbo:xxx /&配置项都对应一个spring实例。
2.服务提供者
首先把这张图拆分成三块,首先是服务端剖去网络传输模块,也就是大图中的右上角。
这里主要抽几个主要的类,从服务初始化到接收消息的流程简单说明下,有兴趣的再对照源码看下会比较清晰。
ServiceBean
继承ServiceConfig,做为服务配置管理和配置信息校验,每一个dubbo:service配置或者注解都会对应生成一个ServiceBean的实例,维护当前服务的配置信息,并把一些全局配置塞入到该服务配置中。
另外ServiceBean本身是一个InitializingBean,在afterPropertiesSet时通过配置信息引导服务绑定和注册。
可以留意到ServiceBean还实现了ApplicationListener,在全部spring bean加载完成后判断是否延迟加载的逻辑。
ProtocolFilterWrapper
经过serviceBean引导后进入该类,这个地方注意下,Protocol使用的装饰模式,叶子只有DubboProtocol和RegistryProtocol,在中间调用中会绕来绕去,而且registry会走一遍这个流程,然后在RegistryProtocol中暴露服务再走一遍,注意每个类的作用,不要被绕昏了就行,第一次跟进代码的时候没留意就晕头转向的。
在这之前其实还有个ProtocolListenerWrapper,封装监听器,在服务暴露后通知到监听器,没有复杂逻辑,如果没特殊需求可以先绕过。
再来说ProtocolFIlterWrapper,这个类的作用就是串联filter调用链,如果有看过struts或者spring mvc拦截器源码的应该不会陌生。
RegistryProtocol
注册中心协议,如果配置了注册中心地址,每次服务暴露肯定首先引导进入这个类中,如果没有注册中心连接则会先创建连接,然后再引导真正的服务协议暴露流程,会再走一次ProtocolFilterWrapper的流程(这次引导到的叶子是DubboProtocol)。
在服务暴露返回后,会再执行服务信息的注册和订阅操作。
DubboProtocol
这个类的export相对较简单,就是引导服务bind server socket。
另外该类还提供了一个内部类,用于处理接收请求,就是下面要提到的ExchangeHandler。
DubboProtocol$ExchangeHandler
接收反序列化好的请求消息,然后根据请求信息找到执行链,将请求再丢入执行链,让其最终执行到实现类再将执行结果返回即整个过程完成。
客户端模块与服务端模块比较类似,只是刚好反过来,一个是暴露服务,一个是引用服务,然后客户端多出路由和负载均衡。
ReferenceBean
继承ReferenceConfig,维护配置信息和配置信息的校验,该功能与ServiceBean类似
其本身还实现了FactoryBean,作为实例工厂,创建远程调用代理类;而且如果不指定为init的reference都是在首次getBean的时候调用到该factoryBean的getObject才进行初始化
另外实现了InitializingBean,在初始化过程中引导配置信息初始化和构建init的代理实例
InvokerInvocationHandler
看到这个类名应该就知道是动态代理的handler,这里作为远程调用代理类的处理器在客户端调用接口时引导进入invoker调用链
ProtocolFIlterWrapper
与Service那边的功能类似,构建调用链
RegistryProtocol
与service那边类似,如果与注册中心还没有连接则建立连接,之后注册和订阅,再根据配置的策略返回相应的clusterInvoker
比service那边有个隐藏较深的逻辑需要留意的,就是订阅过程,RegistryDirectory作为订阅监听器,在订阅完成后会通知到RegistryDirectory,然后会刷新invoker,进入引导至DubboProtocol的流程,与变更的service建立长连接,第一次发生订阅时就会同步接收到通知并将已存在的service存到字典
DubboProtocol
在订阅过程中发现有service变更则会引导至这里,与服务建立长连接,整个过程为了得到串联执行链Invoker
ClusterInvoker
ClusterInvoker由RegistryProtocol构建完成后,内部封装了Directory,在调用时会从Directory列举存活的service对应的Invoker,Directory作为被通知对象,在service有变更时也会及时得到通知
调用时在集群中发现存在多节点的话都会通过clusterInvoker来根据配置抉择最终调用的节点,包括路由方式、负载均衡等
dubbo本身支持的节点调用策略包括比如failoverClusterInvoker在失败时进行重试其他节点,failfastClusterInvoker在失败时返回异常,mergeableClusterInvoker则是对多个实现结果进行合并的等等很多
DubboInvoker
承接上层的调用信息,作为调用结构的叶子,将信息传递到exchange层,主要用来和echange交互的功能模块
4.网络传输层
从exchange往下都是算网络传输,包括做序列化、反序列化,使用Netty等IO框架发送接收消息等逻辑,先前看的时候没有做统一梳理,后续有机会再来编辑吧。
相关搜索:
相关阅读:
相关频道:
&&&&&&&&&&&&&&&&
WEB编程教程最近更新}

我要回帖

更多关于 dubbo指定服务提供者 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信