Motan In Action - Provider

2016/10/19

Transactional

配置不全

配置加载顺序

MySql 存储引擎不支持事务

  • MyISAM 适用于查询,不支持事务处理
  • InnoDB 支持事务处理

配置概述

Motan框架中将功能模块抽象为四个可配置的元素,分别为:

  • protocol
  • registry
  • service
  • referer

Motan推荐使用Spring配置RPC服务,Motan扩展了一些自定义的Spring xml标签。项目运行时通过加载配置文件启动Motan。

  • motan:protocol 配置Motna服务的协议,不同的服务适用不同的协议进行传输,可以自行扩展协议。Motan默认的RPC协议为Motan协议,使用TCP长连接模式,基于Netty通信。协议还包括负责均衡、容错策略、连接控制、本地调用等属性。
  • motan:registry 注册中心配置。用于配置注册中心的注册协议、地址端口、超时时间等。Motan支持使用多种Registry模块,如Consul、Zookeeper,使用不同注册中心需要依赖对应的jar包。在开发及测试环境下,也可以不使用注册中心,只测试指定服务提供者。
  • motan:service 定义给外部调用的接口。
  • motan:basicService RPC服务的通用配置,用于配置所有服务接口的公共配置,减少配置冗余。
  • motan:referer 服务调用方。Client端订阅Service后,会从Registry中得到能够提供对应Service的一组Server,Client把这一组Server看作一个提供服务的cluster。当cluster中的Server发送变更时,Client端的register模块会通知Client进行更新。
  • motan:basicReferer 调用方基础配置。用于配置所有服务代理的公共属性。

Service

motan:service的export属性,暴露服务并将服务注册至注册中心,从而使调用方调用。多个服务如果端口相同会共用一个服务。

ServiceConfig的export方法,为同步方法防止服务被多次暴露。方法首先判断服务是否已发布,如果已发布则直接结束。接着检查所要发布服务的方法和接口是否存在和正确。然后加载注册中心配置和服务协议配置,将服务发布到注册中心所指定的地址和端口。一个服务可以发布到注册中心的多地址。服务发布成功后,将服务添加到已注册服务列表中,确保服务唯一。

public synchronized void export() {
	// 检查服务是否已发布
	if(export.get()) {
		LoggerUtil.warn(String.format("%s has already been expoted, so ignore the export request!", interfaceClass.getName()));
		return;
	}

	// 检查发布服务的接口和方法是否存在和正确
	checkInterfaceAndMethods(interfaceClass, methods);

	// 加载注册中心配置地址 Zookeeper或者Consul服务地址
	List<URL> registryUrls = loadRegistryUrls();
	if(registryUrls == null || registryUrls.size() == 0) {
		throw new IllegalStateException("Should set registry config for service:" + interfaceClass.getName());
	}

	// 加载服务协议配置的服务发布地址和端口
	Map<String, Integer> protocolPorts = getProtocolAndPort();
	// protocols 哪里来的???ServiceConfigBean里创建,实现了InitializingBean接口的afterPropertiesSet方法
	for(ProtocolConfig protocolConfig : protocols) {
		Integer port = protocolPorts.get(protocolConfig.getId());
		if(port == null) {
			throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", interfaceClass.getName(),
                        protocolConfig.getId()));
		}
		// 发布服务
		doExport(protocolConfig, port, registryUrls);
	}

	// 将服务添加到已注册服务列表
	afterExport();
}

1.这里还有很多问题要思考:点对点配置,直接export,配置文件如何加载?采用公共配置,配置文件如何加载?这里还需要进一步分析。目前分析的这个流程是采用哪种配置方式? 2.已经找到源头,在ServiceConfigBean文件里查找。

ServiceConfig的内部方法doExport,首先对参数进行一些默认设置,然后构建服务的URL地址,最后通过ConfigHandler发布服务。

private void doExport(ProtocolConfig protocolConfig, int port, List<URL> registryUrls) {
	String protocolName = protocolConfig.getName();
	if(protocolName == null || protocolName.length() == 0) {
		protocolName = URLParamType.protocol.getValue();// 默认协议“motan”
	}

	// 设置本地IP
	String hostAddress = host;
	if(StringUtils.isBlank(hostAddress) && basicServiceConfig != null) {
		hostAddress = basicServiceConfig.getHost();
	}
	if(NetUtils.isInvalidLocalHost(hostAddress)) {
		hostAddress = getLocalHostAddress(registryUrls);
	}

	Map<String, String> map = new HashMap<String, String>();

	map.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_SERVICE);
	map.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));

	// 加载配置文件参数
	collectConfigParams(map, protocolConfig, basicServiceConfig, extConfig, this);
	collectMethodConfigParams(map, this.getMethods());

	// 构建服务URL
	URL serviceUrl = new URL(protocolName, hostAddress, port, interfaceClass.getName(), map);

	if(serviceExists(serviceUrl)) {
		LoggerUtil.warn(String.format("%s configService is malformed, for same service (%s) already exists ", interfaceClass.getName(),
                    serviceUrl.getIdentity()));
            throw new MotanFrameworkException(String.format("%s configService is malformed, for same service (%s) already exists ",
                    interfaceClass.getName(), serviceUrl.getIdentity()), MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
	}

	List<URL> urls = new ArrayList<URL>();

	// injvm 协议只支持注册到本地,其他协议可以注册到local、remote
	if(MotanConstants.PROTOCOL_INJVM.equals(protocolConfig.getId())) {
		...
	} else {
		for (URL ru : registryUrls) {
			urls.add(ru.createCopy());// 为什么要copy???
		}
	}

	for(URL u : urls) {
		u.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(serviceUrl.toFullStr()));
		registereUrls.add(u.createCopy());// ??? registereUrls
	}

	// 获取默认配置处理程序,发布服务 ???SimpleConfigHandler
	ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHelper.class).getExtension(MotanConstants.DEFAULT_VALUE);

	exporters.add(configHandler.export(interfaceClass, ref, urls));

	// 记录服务,用于日志统计作用
	initLocalAppInfo(serviceUrl);

}

ConfigHandler接口定义了处理配置文件中URL的基本方法,包括export、unexport、refer等泛型方法。SimpleConfigHandlerConfigHandler的简单实现类。ServiceConfigdoExport方法,最终调用ConfigHandler.export方法发布服务,SimpleConfigHandler实现了ConfigHandlerexport方法,方法的实现非常简单,第一启动服务,第二将服务发布到注册中心。

@Override
public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {
	
	String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
	URL serviceUrl = URL.valueOf(serviceStr);

	// export Service 启动服务
	// 利用protocol decorator来增加filter特性
	String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());
	Protocol protocol = new ProtocolFilterDecorator(ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName));
	// Provider ???
	Provider<T> provider = new DefaultProvider<T>(ref, serviceUrl, interfaceClass);
	Exporter<T> exporter = protocol.export(provider, serviceUrl);// ???

	// register service 注册服务
	register(registryUrls, serviceUrl);

	return exporter;
}

什么时候export呢

Spring启动的时候,AnnotationBean.postProcessAfterInitialization 初始化服务和发布注册,在bean初始化之后开始初始化服务和注册。调用ServiceConfig的export方法。

registryUrls注册地址:zookeeper://120.76.157.7:2181/com.weibo.api.motan.registry.RegistryService?group=default_rpc

URL serviceUrl:motan,172.19.10.102,10073,com.catslave.cloud.order.service.PurchaseOrderService,参数。

ConfigHandler configHandler:SimpleConfigHandler

configHandler.export

Protocol protocol: DefaultRpcProtocol

Provider provider: DefaultProvider

protocol.export(provider, serviceUrl)

protocolKey: motan://172.19.10.102:10073/motan-cat-rpc/com.catslave.cloud.order.service.PurchaseOrderService/1.0

new DefaultRpcExporter(provider, url)

ProviderMessageRouter requestRouter : ProviderProtectedMessageRouter
EndpointFactory endpointFactory: NettyEndpointFactory

endpointFactory.createServer(url, requestRouter)

register(registryUrls, serviceUrl)

RegistryFactory registryFactory: ZookeeperRegistryFactory
Registry registry: ZookeeperRegistry
registry.register(serviceUrl)

createNode(URL url, ZkNodeType nodeType) url: motan://172.19.10.102:10073/com.catslave.cloud.order.service.PurchaseOrderService?group=motan-cat-rpc nodeType: UNAVAILABLE_SERVER

String nodeTypePath: /motan/motan-cat-rpc/com.catslave.cloud.order.service.PurchaseOrderService/unavailableServer

MotanSwitcherUtil.setSwitcherValue(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, true);

createNode(URL url, ZkNodeType nodeType)

url: motan://172.19.10.102:10073/com.catslave.cloud.order.service.PurchaseOrderService?group=motan-cat-rpc
nodeType: AVAILABLE_SERVER

层次分析

motan-core

Motan推荐使用Spring配置RPC服务,Motan自定义了Spring xml标签。在模块motan-springsupport里,MotanBeanDefinitionParserMotanNamespaceHandler,对标签进行加载和解析。ServiceConfigBean对应motan:service标签。motan-springsupport包的标签,在工程初始化时自动加载解析。

ServiceConfigBean实现了ApplicationListener接口,当bean解析完被装载进Context上下文时,会触发该接口事件。Motan在motan:service配置加载完成时,通过实现ApplicationListener接口的onApplicationEvent方法,调用ServiceConfigexport方法发布服务。

模块motan-coreconfig包,ServiceConfigexport方法被调用。ServiceConfig通过handler包来处理服务发布。 SimpleConfigHandler发布注册服务。

Motan的服务都是通过RPC进行调用,服务的发布通过rpc包进行发布,通过registry包进行注册。

rpc包定义了Protocol接口来发布和引用服务。AbstractProtocol抽象类实现了接口Protocol的发布和引用服务方法,并新定义了创建服务和创建引用抽象方法。DefaultRpcProtocl实现了抽象类AbstractProtocol定义的创建服务和引用方法。通过定义两个内部类DefaultRpcExporterDefaultRpcRefererDefaultRpcExporter服务的提供者,DefaultRpcReferer服务的引用者。

transport包定义了EndpointFactory节点创建工厂,用于创建远程服务和客户端。AbstractEndpointFactory为其抽象工厂实现类,对创建服务和客户端方法进行了校验,并新定义了两个抽象方法创建服务和客户端由子类具体实现。

motan-springsupport

Spring标签解析相关功能,自定义了几个Spring xml标签,以及支持注解方式annotation使用Motan。

motan-transport-netty

基于Netty协议的长连接传输协议,NettyEndpointFactory实现了transport包抽象工厂类AbstractEndpointFactory定义的创建服务和客户端抽象方法。motan-transport-netty包分别通过NettyServerNettyClient实现类服务端和客户端。

原理实现

Motan.service的实现原理主要分为三点,Spring自定义标签(使用配置方式)、Netty服务端、Zookeeper注册中心(以Zookeeper为例)。

Spring自定义标签

motan-coreresources/META-INF包内定义了Motan自定义的标签motan.xsdmotna.xsd中包含了Motan扩展的所有标签。

motan-springsupport包,对自定义标签进行解析,以及支持注解方式的配置Motan。

MotanNamespaceHandler继承NamespaceHandlerSupport,对所有标签注册自定义的解析类。 MotanBeanDefinitionParser自定义的标签解析类,创建标签实例,从标签中读取出对应的属性值并赋值到实例中。

public class MotanNamespaceHandler extends NamespaceHandlerSupport {
	...

	@Override
	public void init() {
		// 解析标签 referer
		registerBeanDefinitionParser("referer", new MotanBeanDefinitionParser(RefererConfigBean.class, false));
		// 解析标签 service
		registerBeanDefinitionParser("service", new MotanBeanDefinitionParser(ServiceConfigBean.class, false));
		...
		Initializable initialization = InitializationFactory.getInitialization();
		initialization.init();
	}
}

Service端的配置文件,service的配置会覆盖basicConfig的配置,basicConfig的配置会覆盖protocol的配置。

问题:motan:service 可以配置多个,那ServiceConfig是单例还是多例?

ServiceConfig会有多个。ServcieConfig.existingServices就是这个用处,维护当前正在运行的服务。 ConcurrentHashSet不允许添加重复值。

ServiceConfig.export方法加synchronized关键字修饰,保证一次只能有一个服务在发布。

AbstractProtocol.exporterMap ConcurrentHashMap

Netty服务端

Zookeeper注册中心

======= 抽象类AbstractProtocol实现了Protocol接口定义的export方法。AbstractProtocol内部维护着一张exporterMapprotocolKeyexporter的对应关系。在export方法的实现中,使用synchronized关键字对exporterMap加锁,确保服务唯一。

@Override
public <T> Exporter<T> export(Provider<T> provider, URL url) {

	// 检查参数
	if(url == null)	{
		throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: url is null", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
	}

	if(provider == null) {
		throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: provider is null, url=" + url, MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
	}

	// ???
	String protocolKey = MotanFrameworkUtil.getProtocolKey(url);

	synchronized(exporterMap) {
		Exporter<T> exporter = (Exporter<T>)exporterMap.get(protocolKey);

		if(exporter != null) {
			throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: service already exist, url=" + url, MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
		}

		// 创建服务
		exporter = createExporter(provider, url);
		exporter.init();

		exporterMap.put(protocolKey, exporter);

		LoggerUtil.info(this.getClass().getSimpleName() + " export Success: url=" + url);

		return exporter;
	}
}

DefaultRpcProtocol继承AbstractProtocl,实现createExportercreateReferer方法,并定义了两个内部类DefaultRpcExporterDefaultRpcReferer

@Override
protected <T> Exporter<T> createExporter(Provider<T> provider, URL url) {
	return new DefaultRpcExporter<T>(provider, url);
}
	class DefaultRpcExporter<T> extends AbstractExporter<T> {
		private Server server;
		private EndpointFactory endpointFactory;

		public DefaultRpcExporter(Provider<T> provider, URL url) {
			super(provider, url);

			// 构建消息处理程序,用于方法的回调
			ProviderMessageRouter requestRouter = initRequestRouter(url);
			
			endpointFactory = ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));
			// 启动Netty Server服务
			server = endpointFactory.createServer(url, requestRouter);
		}
	}

NettyEndpointFactory创建了NettyServer实例,启动了一个Netty服务端。

Post Directory