Dubbo

Dubbo 服务暴露

原创: 怀风  光明和黑暗互相吸引  1月25日
Dubbo 服务暴露

服务暴露


大家都知道Dubbo是由consumer,provider,registry这三大部分组



那么provider的如果将服务提供给consumer调用呢,就是通过服务暴露来实现的,也就是把我们原来单机架构中的接口,对外部暴露


服务暴露的流程



dubbo服务暴露的流程大概如上图,在dubbo中,所有的服务都会被包装成一个invoker,这一点也将贯穿今后整个学习
dubbo服务暴露可以理解为两部分:本地暴露,远程暴露
本地暴露的接口通常用于我们直接invoke dubbo的接口,以及有些时候我们的服务既是provider又是consumer,避免远程调用造成的资源浪费
远程暴露则是将服务信息注册到registry,并且将服务通过网络提供给其他应用调用

源码解析

初始化

public
 
class
 
ServiceBean
<
T
>
 
extends
 
ServiceConfig
<
T
>
 
implements
 
InitializingBean
,
 
DisposableBean
,
 
ApplicationContextAware
,
 
ApplicationListener
<
ContextRefreshedEvent
>,
 
BeanNameAware
 
{
 
private
 
static
 
final
 
long
 serialVersionUID 
=
 
213195494150089726L
;
    
/**
     *此处省略其他代码
     **/

    
@Override
    
public
 
void
 onApplicationEvent
(
ContextRefreshedEvent
 event
)
 
{
        
if
 
(!
isExported
()
 
&&
 
!
isUnexported
())
 
{
            
if
 
(
logger
.
isInfoEnabled
())
 
{
                logger
.
info
(
"The service ready on spring started. service: "
 
+
 getInterface
());
            
}
            
//服务暴露
            export
();
        
}
    
}

    
@Override
    
@SuppressWarnings
({
"unchecked"
,
 
"deprecation"
})
    
public
 
void
 afterPropertiesSet
()
 
throws
 
Exception
 
{
         
//此处省略....
        
if
 
(!
supportedApplicationListener
)
 
{
            
//服务暴露
            export
();
        
}
    
}
 
}
首先我们来看一下ServiceBean,ServiceBean实现了InitializingBean, ApplicationContextAware, ApplicationListener有没有觉得很熟悉,实现这几个类就能在spring初始化的时候do something
暴露服务

我们看到这里都调用了export()方法
    
public
 
synchronized
 
void
 export
()
 
{
        
if
 
(
provider 
!=
 
null
)
 
{
            
if
 
(
export 
==
 
null
)
 
{
                export 
=
 provider
.
getExport
();
            
}
            
if
 
(
delay 
==
 
null
)
 
{
                delay 
=
 provider
.
getDelay
();
            
}
        
}
        
if
 
(
export 
!=
 
null
 
&&
 
!
export
)
 
{
            
return
;
        
}

        
if
 
(
delay 
!=
 
null
 
&&
 delay 
>
 
0
)
 
{
            delayExportExecutor
.
schedule
(
new
 
Runnable
()
 
{
                
@Override
                
public
 
void
 run
()
 
{
                    doExport
();
                
}
            
},
 delay
,
 
TimeUnit
.
MILLISECONDS
);
        
}
 
else
 
{
            doExport
();
        
}
    
}
这里看到ServiceConfig.export方法上加了一个锁,用来保证不会重复暴露服务,抛开上面的逻辑判断,在第一次初始化的时候,是直接走到了doExport()方法
    
protected
 
synchronized
 
void
 doExport
()
 
{
        
//省略判断代码
        doExportUrls
();
    
}

       
private
 
void
 doExportUrls
()
 
{
        
//加载注册中心的配置
        
List
<
URL
>
 registryURLs 
=
 loadRegistries
(
true
);
        
//把使用的协议注册到注册中心 
        
for
 
(
ProtocolConfig
 protocolConfig 
:
 protocols
)
 
{
            doExportUrlsFor1Protocol
(
protocolConfig
,
 registryURLs
);
        
}
    
}
这里dubbo支持多协议,可以看到通过for循环可以把配置的多种协议都导出,进行暴露
    
private
 
void
 doExportUrlsFor1Protocol
(
ProtocolConfig
 protocolConfig
,
 
List
<
URL
>
 registryURLs
)
 
{
         
//省略判断代码
        
// 导出服务
        
String
 contextPath 
=
 protocolConfig
.
getContextpath
();
        
if
 
((
contextPath 
==
 
null
 
||
 contextPath
.
length
()
 
==
 
0
)
 
&&
 provider 
!=
 
null
)
 
{
            contextPath 
=
 provider
.
getContextpath
();
        
}

        
String
 host 
=
 
this
.
findConfigedHosts
(
protocolConfig
,
 registryURLs
,
 map
);
        
Integer
 port 
=
 
this
.
findConfigedPorts
(
protocolConfig
,
 name
,
 map
);
        URL url 
=
 
new
 URL
(
name
,
 host
,
 port
,
 
(
contextPath 
==
 
null
 
||
 contextPath
.
length
()
 
==
 
0
 
?
 
""
 
:
 contextPath 
+
 
"/"
)
 
+
 path
,
 map
);

        
if
 
(
ExtensionLoader
.
getExtensionLoader
(
ConfiguratorFactory
.
class
)
                
.
hasExtension
(
url
.
getProtocol
()))
 
{
            url 
=
 
ExtensionLoader
.
getExtensionLoader
(
ConfiguratorFactory
.
class
)
                    
.
getExtension
(
url
.
getProtocol
()).
getConfigurator
(
url
).
configure
(
url
);
        
}

        
String
 scope 
=
 url
.
getParameter
(
Constants
.
SCOPE_KEY
);
        
// 没有配置时不导出
        
if
 
(!
Constants
.
SCOPE_NONE
.
equalsIgnoreCase
(
scope
))
 
{
            
// 导出本地服务
            
if
 
(!
Constants
.
SCOPE_REMOTE
.
equalsIgnoreCase
(
scope
))
 
{
                exportLocal
(
url
);
            
}
            
// 导出远程服务
            
if
 
(!
Constants
.
SCOPE_LOCAL
.
equalsIgnoreCase
(
scope
))
 
{
                
if
 
(
logger
.
isInfoEnabled
())
 
{
                    logger
.
info
(
"Export dubbo service "
 
+
 interfaceClass
.
getName
()
 
+
 
" to url "
 
+
 url
);
                
}
                
if
 
(
registryURLs 
!=
 
null
 
&&
 
!
registryURLs
.
isEmpty
())
 
{
                    
//将服务都注册到当前已有的注册中心上去
                    
for
 
(
URL registryURL 
:
 registryURLs
)
 
{
                        url 
=
 url
.
addParameterIfAbsent
(
Constants
.
DYNAMIC_KEY
,
 registryURL
.
getParameter
(
Constants
.
DYNAMIC_KEY
));
                        
//判断是否有监控中心
                        URL monitorUrl 
=
 loadMonitor
(
registryURL
);
                        
if
 
(
monitorUrl 
!=
 
null
)
 
{
                            url 
=
 url
.
addParameterAndEncoded
(
Constants
.
MONITOR_KEY
,
 monitorUrl
.
toFullString
());
                        
}
                        
if
 
(
logger
.
isInfoEnabled
())
 
{
                            logger
.
info
(
"Register dubbo service "
 
+
 interfaceClass
.
getName
()
 
+
 
" url "
 
+
 url 
+
 
" to registry "
 
+
 registryURL
);
                        
}
                            
//对于providers,这用于启用自定义代理以生成invoker
                        
String
 proxy 
=
 url
.
getParameter
(
Constants
.
PROXY_KEY
);
                        
if
 
(
StringUtils
.
isNotEmpty
(
proxy
))
 
{
                            registryURL 
=
 registryURL
.
addParameter
(
Constants
.
PROXY_KEY
,
 proxy
);
                        
}

                        
Invoker
<?>
 invoker 
=
 proxyFactory
.
getInvoker
(
ref
,
 
(
Class
)
 interfaceClass
,
 registryURL
.
addParameterAndEncoded
(
Constants
.
EXPORT_KEY
,
 url
.
toFullString
()));
                        
//包装调用者和所有元数据的Invoker包装器
                        
DelegateProviderMetaDataInvoker
 wrapperInvoker 
=
 
new
 
DelegateProviderMetaDataInvoker
(
invoker
,
 
this
);

                        
Exporter
<?>
 exporter 
=
 protocol
.
export
(
wrapperInvoker
);
                        exporters
.
add
(
exporter
);
                    
}
                
}
 
else
 
{
                    
//没有注册中心直接暴露
                    
Invoker
<?>
 invoker 
=
 proxyFactory
.
getInvoker
(
ref
,
 
(
Class
)
 interfaceClass
,
 url
);
                    
//包装调用者和所有元数据的Invoker包装器
                    
DelegateProviderMetaDataInvoker
 wrapperInvoker 
=
 
new
 
DelegateProviderMetaDataInvoker
(
invoker
,
 
this
);
                    
Exporter
<?>
 exporter 
=
 protocol
.
export
(
wrapperInvoker
);
                    exporters
.
add
(
exporter
);
                
}
            
}
        
}
        
this
.
urls
.
add
(
url
);
    
}

    
//暴露本地服务
    
private
 
void
 exportLocal
(
URL url
)
 
{
        
if
 
(!
Constants
.
LOCAL_PROTOCOL
.
equalsIgnoreCase
(
url
.
getProtocol
()))
 
{
            
//手动暴露一个本地服务
            URL local 
=
 URL
.
valueOf
(
url
.
toFullString
())
                    
.
setProtocol
(
Constants
.
LOCAL_PROTOCOL
)
                    
.
setHost
(
LOCALHOST
)
                    
.
setPort
(
0
);
            
Exporter
<?>
 exporter 
=
 protocol
.
export
(
                    proxyFactory
.
getInvoker
(
ref
,
 
(
Class
)
 interfaceClass
,
 local
));
            exporters
.
add
(
exporter
);
            logger
.
info
(
"Export dubbo service "
 
+
 interfaceClass
.
getName
()
 
+
 
" to local registry"
);
        
}
    
}
这你scope配置默认值是null,则本地服务和远程服务都导出,另外如果没有配置注册中心,将直接将接口暴露出去,我们可以根据自己所在的场景,选择都暴露还是指定暴露
由于dubbo也是支持多注册中心的,所以可以通过for循环,将多个服务都注册到当前已有的注册中心上去
在exportLocal方法这是将配置中解析好的url参数手动修改成本地协议进行服务暴露
ProxyFactory是通过SPI获取JavassistProxyFactory靠Javassist字节码技术动态的生成Invoker类,大家有兴趣的可以下去了解一下
暴露的细节

public
 
class
 
ProtocolFilterWrapper
 
implements
 
Protocol
 
{

    
private
 
final
 
Protocol
 protocol
;
    
//装饰者模式
    
public
 
ProtocolFilterWrapper
(
Protocol
 protocol
)
 
{
        
if
 
(
protocol 
==
 
null
)
 
{
            
throw
 
new
 
IllegalArgumentException
(
"protocol == null"
);
        
}
        
this
.
protocol 
=
 protocol
;
    
}
}
在ServiceConfig中,通过SPI获取相应的Protocol,SPI中会对实现类进行装饰,每次执行protocol.exprot()方法的时候,其实都是执行的ProtocolFilterWrapper的protocol.exprot方法
    
@Override
    
public
 
<
T
>
 
Exporter
<
T
>
 export
(
Invoker
<
T
>
 invoker
)
 
throws
 
RpcException
 
{
        
//如果是注册中心协议直接导出
        
if
 
(
Constants
.
REGISTRY_PROTOCOL
.
equals
(
invoker
.
getUrl
().
getProtocol
()))
 
{
            
return
 protocol
.
export
(
invoker
);
        
}
        
//如果不是则执行整个filter的责任链
        
return
 protocol
.
export
(
buildInvokerChain
(
invoker
,
 
Constants
.
SERVICE_FILTER_KEY
,
 
Constants
.
PROVIDER
));
    
}

    
//责任链模式,对filter进行逐个执行
    
private
 
static
 
<
T
>
 
Invoker
<
T
>
 buildInvokerChain
(
final
 
Invoker
<
T
>
 invoker
,
 
String
 key
,
 
String
 group
)
 
{
        
Invoker
<
T
>
 last 
=
 invoker
;
        
List
<
Filter
>
 filters 
=
 
ExtensionLoader
.
getExtensionLoader
(
Filter
.
class
).
getActivateExtension
(
invoker
.
getUrl
(),
 key
,
 group
);
        
if
 
(!
filters
.
isEmpty
())
 
{
            
for
 
(
int
 i 
=
 filters
.
size
()
 
-
 
1
;
 i 
>=
 
0
;
 i
--)
 
{
                
final
 
Filter
 filter 
=
 filters
.
get
(
i
);
                
final
 
Invoker
<
T
>
 next 
=
 last
;
                last 
=
 
new
 
Invoker
<
T
>()
 
{

                    
@Override
                    
public
 
Class
<
T
>
 getInterface
()
 
{
                        
return
 invoker
.
getInterface
();
                    
}

                    
@Override
                    
public
 URL getUrl
()
 
{
                        
return
 invoker
.
getUrl
();
                    
}

                    
@Override
                    
public
 
boolean
 isAvailable
()
 
{
                        
return
 invoker
.
isAvailable
();
                    
}

                    
@Override
                    
public
 
Result
 invoke
(
Invocation
 invocation
)
 
throws
 
RpcException
 
{
                        
Result
 result 
=
 filter
.
invoke
(
next
,
 invocation
);
                        
if
 
(
result 
instanceof
 
AsyncRpcResult
)
 
{
                            
AsyncRpcResult
 asyncResult 
=
 
(
AsyncRpcResult
)
 result
;
                            asyncResult
.
thenApplyWithContext
(

->
 filter
.
onResponse
(
r
,
 invoker
,
 invocation
));
                            
return
 asyncResult
;
                        
}
 
else
 
{
                            
return
 filter
.
onResponse
(
result
,
 invoker
,
 invocation
);
                        
}
                    
}

                    
@Override
                    
public
 
void
 destroy
()
 
{
                        invoker
.
destroy
();
                    
}

                    
@Override
                    
public
 
String
 toString
()
 
{
                        
return
 invoker
.
toString
();
                    
}
                
};
            
}
        
}
        
return
 last
;
    
}
由上述代码我们可以看到,dubbo中运用装饰者模式和责任链模式,对我们提供的服务做了一次封装,最终转换成我们需要的invoker对外暴露
要注意到的是,当我们的协议是registry也就是注册协议的时候,是不需要进行构建责任链的
本地暴露

    
@Override
    
public
 
<
T
>
 
Exporter
<
T
>
 export
(
Invoker
<
T
>
 invoker
)
 
throws
 
RpcException
 
{
        
return
 
new
 
InjvmExporter
<
T
>(
invoker
,
 invoker
.
getUrl
().
getServiceKey
(),
 exporterMap
);
    
}
    
如果是本地暴露,则通过SPI拿到InjvmProtocol,最终通过injvm协议导出InjvmExporter
远程暴露

    
@Override
    
public
 
<
T
>
 
Exporter
<
T
>
 export
(
Invoker
<
T
>
 invoker
)
 
throws
 
RpcException
 
{
        URL url 
=
 invoker
.
getUrl
();
        
// 导出服务.
        
String
 key 
=
 serviceKey
(
url
);
        
DubboExporter
<
T
>
 exporter 
=
 
new
 
DubboExporter
<
T
>(
invoker
,
 key
,
 exporterMap
);
        exporterMap
.
put
(
key
,
 exporter
);
        
//导出根服务以进行调度事件
        
Boolean
 isStubSupportEvent 
=
 url
.
getParameter
(
Constants
.
STUB_EVENT_KEY
,
 
Constants
.
DEFAULT_STUB_EVENT
);
        
Boolean
 isCallbackservice 
=
 url
.
getParameter
(
Constants
.
IS_CALLBACK_SERVICE
,
 
false
);
        
if
 
(
isStubSupportEvent 
&&
 
!
isCallbackservice
)
 
{
            
String
 stubServiceMethods 
=
 url
.
getParameter
(
Constants
.
STUB_EVENT_METHODS_KEY
);
            
if
 
(
stubServiceMethods 
==
 
null
 
||
 stubServiceMethods
.
length
()
 
==
 
0
)
 
{
                
if
 
(
logger
.
isWarnEnabled
())
 
{
                    logger
.
warn
(
new
 
IllegalStateException
(
"consumer ["
 
+
 url
.
getParameter
(
Constants
.
INTERFACE_KEY
)
 
+
                            
"], has set stubproxy support event ,but no stub methods founded."
));
                
}
            
}
 
else
 
{
                stubServiceMethodsMap
.
put
(
url
.
getServiceKey
(),
 stubServiceMethods
);
            
}
        
}
        
//打开服务
        openServer
(
url
);
        optimizeSerialization
(
url
);
        
return
 exporter
;
    
}
    

    
//打开服务
    
private
 
void
 openServer
(
URL url
)
 
{
        
// find server.
        
String
 key 
=
 url
.
getAddress
();
        
//客户端可以导出仅供服务器调用的服务
        
boolean
 isServer 
=
 url
.
getParameter
(
Constants
.
IS_SERVER_KEY
,
 
true
);
        
if
 
(
isServer
)
 
{
            
ExchangeServer
 server 
=
 serverMap
.
get
(
key
);
            
if
 
(
server 
==
 
null
)
 
{
                
synchronized
 
(
this
)
 
{
                    server 
=
 serverMap
.
get
(
key
);
                    
if
 
(
server 
==
 
null
)
 
{
                        
//如果服务不存在,创建服务
                        serverMap
.
put
(
key
,
 createServer
(
url
));
                    
}
                
}
            
}
 
else
 
{
                
// 服务器支持重置,与override一起使用
                server
.
reset
(
url
);
            
}
        
}
    
}
在dubbo协议中,我们看到在服务导出的时候会根据配置地址,打开netty服务,也就是通过这一步,开启了RPC端口,使consumer通过TCP协议进行服务调用
服务注册

    
@Override
    
public
 
<
T
>
 
Exporter
<
T
>
 export
(
final
 
Invoker
<
T
>
 originInvoker
)
 
throws
 
RpcException
 
{
        URL registryUrl 
=
 getRegistryUrl
(
originInvoker
);
        
// url在本地导出
        URL providerUrl 
=
 getProviderUrl
(
originInvoker
);
        
// 订阅覆盖数据
        
// 同样的服务由于订阅是带有服务名称的缓存密钥,因此会导致订阅信息覆盖。
        
final
 URL overrideSubscribeUrl 
=
 getSubscribedOverrideUrl
(
providerUrl
);
        
final
 
OverrideListener
 overrideSubscribeListener 
=
 
new
 
OverrideListener
(
overrideSubscribeUrl
,
 originInvoker
);
        overrideListeners
.
put
(
overrideSubscribeUrl
,
 overrideSubscribeListener
);

        providerUrl 
=
 overrideUrlWithConfig
(
providerUrl
,
 overrideSubscribeListener
);
        
//导出invoker
        
final
 
ExporterChangeableWrapper
<
T
>
 exporter 
=
 doLocalExport
(
originInvoker
,
 providerUrl
);
        
//将url注册到注册中心
        
final
 
Registry
 registry 
=
 getRegistry
(
originInvoker
);
        
final
 URL registeredProviderUrl 
=
 getRegisteredProviderUrl
(
providerUrl
,
 registryUrl
);
        
ProviderInvokerWrapper
<
T
>
 providerInvokerWrapper 
=
 
ProviderConsumerRegTable
.
registerProvider
(
originInvoker
,
                registryUrl
,
 registeredProviderUrl
);
        
//判断我们是否需要推迟发布
        
boolean
 
register
 
=
 registeredProviderUrl
.
getParameter
(
"register"
,
 
true
);
        
if
 
(
register
)
 
{
            
register
(
registryUrl
,
 registeredProviderUrl
);
            providerInvokerWrapper
.
setReg
(
true
);
        
}
        
// Deprecated! Subscribe to override rules in 2.6.x or before.
        registry
.
subscribe
(
overrideSubscribeUrl
,
 overrideSubscribeListener
);
        exporter
.
setRegisterUrl
(
registeredProviderUrl
);
        exporter
.
setSubscribeUrl
(
overrideSubscribeUrl
);
        
//确保每次导出时都返回一个新的导出器实例
        
return
 
new
 
DestroyableExporter
<>(
exporter
);
    
}

    
//真正导出服务的地方
     
private
 
<
T
>
 
ExporterChangeableWrapper
<
T
>
 doLocalExport
(
final
 
Invoker
<
T
>
 originInvoker
,
 URL providerUrl
)
 
{
        
String
 key 
=
 getCacheKey
(
originInvoker
);
        
ExporterChangeableWrapper
<
T
>
 exporter 
=
 
(
ExporterChangeableWrapper
<
T
>)
 bounds
.
get
(
key
);
        
if
 
(
exporter 
==
 
null
)
 
{
            
synchronized
 
(
bounds
)
 
{
                exporter 
=
 
(
ExporterChangeableWrapper
<
T
>)
 bounds
.
get
(
key
);
                
if
 
(
exporter 
==
 
null
)
 
{

                    
final
 
Invoker
<?>
 invokerDelegete 
=
 
new
 
InvokerDelegate
<
T
>(
originInvoker
,
 providerUrl
);
                    
//以dubbo协议为例,这里才是真正调用DubboProtocol.exprot的地方
                    exporter 
=
 
new
 
ExporterChangeableWrapper
<
T
>((
Exporter
<
T
>)
 protocol
.
export
(
invokerDelegete
),
 originInvoker
);
                    bounds
.
put
(
key
,
 exporter
);
                
}
            
}
        
}
        
return
 exporter
;
    
}
大家这里可以跟进源码,会发现,在ServiceConfig中通过proxyFactory生成的Invoker的url指向的协议其实是registry,所以在ServiceConfig中protocol.exprot调用的是RegistryProtocol的exprot方法
在RegistryProtocol中调用了真正的远程服务暴露的方法,即DubboProtocol(以dubbo协议为例),在远程服务暴露成功后,将服务信息注册到registry上去,由此完成了一个服务的导出
至此Dubbo服务暴露中的大致流程已经完成了,后面将会对Dubbo如何通过ProxyFactory生成Invoker,以及Registry是如何进行注册的进行更加深入的学习

阅读原文

微信扫一扫
关注该公众号

更多内容vip可查看