初始化过程
SimpleConfigHandler.refer ProxyFactory proxyFactory: JdkProxyFactory proxyFactory.getProxy(interfaceClass, RefererInvocationHandler); interfaceClass: interface com.catslave.cloud.order.service.PurchaseOrderService
$Proxy ref: {protocol:motan[motan://172.19.10.102:10073/com.catslave.cloud.order.service.PurchaseOrderService?group=motan-cat-rpc, available:true]}
调用过程
AnnotationBean.postProcessBeforeInitialization
RefererInvocationHandler.invoke
NettyChannel.request
this.nettyClient.registerCallback
ChannelFuture writeFuture = this.channel.write(request);
proxy
动态代理实现
动态代理原理分析:Java动态代理最终也是动态生成class字节码文件,然后加载运行。 因为JVM不允许在运行时修改原有类,所以所有的动态性都是通过新建类来实现的。 引出一个问题:JVM 动态更新
Java动态代理的两种实现方法JDK和CGLib https://blog.csdn.net/m0_38039437/article/details/77970633 JDK动态代理实现原理 http://rejoy.iteye.com/blog/1627405
- 静态代理是通过在代码中显式定义一个业务实现类一个代理,在代理类中对同名的业务方法进行包装,用户通过代理类调用被包装过的业务方法;
- JDK动态代理是通过接口中的方法名,在动态生成的代理类中调用业务实现类的同名方法;
- CGlib动态代理是通过继承业务类,生成的动态代理类是业务类的子类,通过重写业务方法进行代理;
Serialize
- 单例反射问题 readResolve
- 有些序列化方式支持字段的增删
rpc
GenericObjectPool
JDK Execute
- Executor
- ExecutorService
- AbstractExecutorService
- ThreadPoolExecutor
ThreadPoolExecutor
- corePoolSize,maximumPoolSize 线程池会根据corePoolSize和maximumPoolSize两个参数自动调整线程池的大小。通过submit或execute方法提交任务的时候,如果当前线程池的线程数小于corePoolSize,线程池就会创建一个新的线程处理任务,即使其他的core线程是空闲的。如果当前线程池的线程数大于corePoolSize并且小于maximumPoolSize,那么只有在队列“满”的情况下才会创建新的线程。因此这里有一个注意事项,如果希望请求积压在队列的时候能够自动扩容,但指定的队列又是一个无界队列,那么就不会扩容,因为队列不存在“满”的情况。
- keepAliveTime 如果当前线程池数超过corePoolSize,那么如果在keepAliveTime事件内都没有新的任务需要处理,超过corePoolSize的那部分线程就会被销毁。默认情况下是不会回收corePoolSize线程,可以通过设置allowCoreThreadTimeOut改变这一行为。
- workQueue 实际用于存储任务的队列。如果传入的是一个有界队列,那么队列满的时候,线程池就会根据core和max参数决定是否需要扩容。
- threadFactory 创建线程是通过ThreadFactory来实现的,如果没有指定,默认是使用Executors.defaultThreadFactory()。一般来说,我们会在这里对线程设置名称、异常处理器等等。
- handler 任务提交失败的时候会调用这个处理器。ThreadPoolExecutor内置了多个实现,比如抛异常、直接抛弃等。这里可以根据业务场景需要进行配置,比如当队列积压的时候,针对性的对线程池进行扩容或者发送警告等策略。
commons-pool
- ObjectPool 对象池,定义基础方法,获取对象、归还对象、添加对象。
- PooledObjectFactory 对ObjectPool方法的补充,创建池中的对象和销毁对象。
- ObjectPoolFactory 对象池的工厂类,用于创建对象池
commons-pool2提供的是面向接口的编程,为我们提供了一个抽象的对象池管理方式。我们只需要按实际业务去重写或实现一些方法和接口即可。
GenericObjectPool
对象池的核心类,它实现了对对象池的管理,是一个基本的对象池实现(ObjectPool接口)。一般情况下,我们可以直接使用它。它的构造函数提供了两个重要的参数:GenericObjectPoolConfig<T>
类和PooledObjectFactory<T>
接口的实现。PooledObjectFactory<T>
接口的实现由我们继承并实现。GenericObjectPool
内部持有两个数据结构:ConcurrentHashMap
和LinkedBlockingDeque
(为什么使用该数据结构?)。前者用于存储所有的对象(不含销毁的对象),后者用于存储空闲的对象。
borrowObject
T borrowObject(final long borrowMaxWaitMillis)
- 从空闲队列
idleObjects
获取一个资源,如果资源为null,则新建一个资源 - 如果最大资源数已达上限,则创建失败,返回null;否则直接创建新资源,并将资源添加到池中
allObjects
,然后返回新建的资源 - 如果资源创建失败返回null,判断是否已达最大等待时间,如果没有则继续获取创建资源直达超时位置;如果资源创建成功,则初始化资源状态
- 获取testOnBorrow参数是否在借用资源时校验资源有效性。如果是,则对资源进行校验,校验失败则销毁资源;
- 返回资源
returnObject
void returnObject(final T obj)
- 首先判断归还的对象是否为池中创建的对象,如果不是则抛异常提示错误;
- 获取testOnReturn参数是否在对象归还时校验对象的有效性。如果是,则对对象进行校验,校验失败则销毁对象;
- 还原对象到初始化状态;
- 判断池中的空闲对象是否已到上线,如果已达上线则销毁对象;
- 将对象重新加入空闲队列的列头或者列尾。
PooledObjectFactory
这个接口是我们要实现的,定义了创建、销毁等对池中对象的操作。在池中一个对象在使用前或使用后可能会有不同的状态,这个接口提供了标准的接口来使对象在不同的状态间切换:activeObject()
和passivateObject()
这两个方法对对象的状态进行切换。activeObject()
是使某个对象处于初始化后的状态,是在borrwoObject
从池中获取资源后调用的,是资源处于初始化后的状态。passivateObject()
是使某个对象钝化,即在归还到池中前使对象还原成初始化前的状态。
GenericObjectPoolConfig 参数配置类,允许使用者对对象池的一些参数做调整。
主要参数解释:
- testOnBorrow 获取对象的时候要校验对象的有效性
- testOnReturn 归还对象的时候要校验对象的有效性
- testWhileIdle 定时对空闲对象池进行有效性校验
- minIdle 池初始化时默认生成多少个空闲资源
- maxIdle 池中最大能储存多少个空闲资源。主要用来限制returnObject方法,return的资源是否返回到池中。如果池中空闲资源已达到上限,则归还的资源会被destroy掉。
- maxActive 激活的最大资源数。是指同时有多少个资源可以使用在borrowObject()方法起作用。如果maxActive已达上限,borrowObject方法就会阻塞等待一定时间,不能立马获取到资源。
- maxWait 在获取不到资源的时候,阻塞等待最大时间。
commons-pool2的应用 redis的Java客户端JedisPool(jedis[https://github.com/xetorthio/jedis]);dbcp数据库连接池[https://github.com/apache/commons-dbcp]。
Question
- Motan如何实现高并发、高可用?
NettyClient和NettyChannel
ChannelState 实例状态:UNINIT未初始化、INIT初始化完成、ALIVE存活、UNALIVE不存活、CLOSE关闭。
NettyClient也有客户端状态,NettyChannel也有各自的连接状态。
NettyClient持有一个Factory,Factory内部创建多个NettyChannel实例。
- 各实例状态转换节点
NettyChannel.open -> ALIVE; reconnect -> INIT; close -> CLOSE.
NettyClient.open -> ALIVE; close -> CLOSE; incrErrorCount -> UNALIVE; resetErrorCount -> ALIVE.
HeadtbeatClientEndpointManager在节点ALIVE状态下不会发送心跳,其他状态会调用heartbeat发送心跳包。但是NettyClient会检查,如果当前Client状态为UNINIT或CLOSE状态则不发送直接返回。
心跳的回调处理函数,也是客户端实例的回调处理逻辑,那客户端的回调处理逻辑在哪里呢?
- 客户端调用阻塞一定时间 ChannelFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS); 同步阻塞一段时间,如果调用成功,返回response;超时,则取消调用,并记录该Client调用失败次数,然后抛出调用超时异常。
每次调用结束后都会对调用次数做更新,记录成功或者失败的次数,成功reset errorCount,失败errorCount.incrementAndGet。如果失败的次数超过的线程池连接量,则把当前Client标记为UNALIVE。
- NettyClient是如何从线程池选择线程的?
轮询取出一个Channel,Channel可用直接返回;如果不可用则重建该Channel,然后继续取下一个Channel。
重建Channel,调用Channel的reconnect方法,将Channel状态置为INIT初始化状态,然后将重建任务RebuildTask提交到线程池rebuildExecutorService执行。
RebuildTask处理逻辑:Channel加锁,然后close Channel,再open Channel(connect新的连接),释放锁。
- 创建NettyClient的时候总共启动过了哪几类线程池?
NettyClient拥有的线程池
// 1.回收过期任务 定期执行的1个线程池 private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
过期任务TimeoutMonitor,如果是异步的请求,会将请求添加到callbackMap集合,过期任务线程池会轮询该集合,判断请求是否超时,超时移除请求。如果请求成功,会将请求从callbackMap集合中删除。
// 2.父类AbstractSharedPoolClient 创建了一个线程池,用来异步执行Channel处理化的(channel.open,默认是同步)。StandardThreadExecutor是Tomcat实现的线程池。该线程池也是所有Client共享的。 private static final ThreadPoolExecutor executor = new StandardThreadExecutor(1, 300, 20000, new DefaultThreadFactory(“AbstractPoolClient-initPool-“, true));
NettyChannelFactory拥有的线程池
// 3.创建的NettyChannelFactory对象也创建了一个线程池,该线程池专门用来重构Channel连接的,也是使用Tomcat的。该线程池是static类型的,所以对象实例共享,所有NettyClient共用一个rebuildExecutorService线程池处理Channel的重连。 private static final ExecutorService rebuildExecutorService = new StandardThreadExecutor(5, 30, 10L, TimeUnit.SECONDS, 100, new DefaultThreadFactory(“RebuildExecutorService”, true), new ThreadPoolExecutor.CallerRunsPolicy());
- NettyClient请求流程?
NettyClient.request -> getChannel从线程池中选择一个Channel,调用该Channel.request方法 -> 构建异步的ResponseFuture,将request注册到Client的callbackMap集合,防止请求超时;然后调用writeAndFlush将消息发送出去,awaitUnterruptibly阻塞等待一段时间,等待发送成功。发送成功后,给ResponseFuture添加监听器,用于追踪Channel是否可用。 -> Channel.request返回异步的ResponseFufure。 -> NettyClient根据同步/异步请求,如果是异步就直接返回,如果是同步,构建DefaultResponse返回。
接收消息时,NettyChannelHandler.channelRead事件触发,接收消息, processResponse调用MessageHandler.handler处理消息,handler将该消息对应的请求从callbackMap集合中删除,然后回调responseFuture的onSuccess方法,表示响应成功。
“请求-响应”通信模式,客户端唯一的RequestId来匹配请求,通过Future来进行同步等待。
- NettyClient请求失败了?
FailoverHaStrategy call方法。tryCount 失败重试次数,重试的时候会从referers里面轮询一条出来请求
NettyServer
- NettyServer 启动创建了几个线程?
standardThreadExecutor 用于处理客户请求的线程池,接收到请求后将请求提交到线程池中执行。
消息处理回调MessageHandler,ProviderMessageRouter维护了方法名到Provider的映射。通过请求的方法名对应找到Provider实例,然后通过反射调用实例方法。