微服务组件--限流框架Spring Cloud Hystrix分析( 四 )


//分析queue()方法怎么使用Future模式的//熟悉线程池的,应该知道线程池有个FutureTask的任务//通过持有FutureTask句柄可以异步获取返回结果//本质上就是FutureTask持有//一个结果存放地址//线程执行的run方法(执行完后将结果放入固定的存放地址)//那么现在看下面的逻辑就会十分清晰public Future<R> queue() {final Future<R> delegate = toObservable().toBlocking().toFuture();final Future<R> f = new Future<R>() {@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {if (delegate.isCancelled()) {return false;}if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);}final boolean res = delegate.cancel(interruptOnFutureCancel.get());if (!isExecutionComplete() && interruptOnFutureCancel.get()) {final Thread t = executionThread.get();if (t != null && !t.equals(Thread.currentThread())) {t.interrupt();}}return res;}@Overridepublic boolean isCancelled() {return delegate.isCancelled();}@Overridepublic boolean isDone() {return delegate.isDone();}@Overridepublic R get() throws InterruptedException, ExecutionException {return delegate.get();}@Overridepublic R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return delegate.get(timeout, unit);}};/* special handling of error states that throw immediately */if (f.isDone()) {try {f.get();return f;} catch (Exception e) {Throwable t = decomposeException(e);if (t instanceof HystrixBadRequestException) {return f;} else if (t instanceof HystrixRuntimeException) {HystrixRuntimeException hre = (HystrixRuntimeException) t;switch (hre.getFailureType()) {case COMMAND_EXCEPTION:case TIMEOUT:// we don't throw these types from queue() only from queue().get() as they are execution errorsreturn f;default:// these are errors we throw from queue() as they as rejection type errorsthrow hre;}} else {throw Exceptions.sneakyThrow(t);}}}//也就是将产生的Future对象返回return f;}【9】分析结果的获取是从delegate属性中获取 , 它被定义为一个观察者
//定义观察者public Observable<R> toObservable() {final AbstractCommand<R> _cmd = this;//doOnCompleted handler already did all of the SUCCESS work//doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work//第一个观察者,命令执行结束后的清理者final Action0 terminateCommandCleanup = new Action0() {@Overridepublic void call() {if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {handleCommandEnd(false); //user code never ran} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {handleCommandEnd(true); //user code did run}}};//mark the command as CANCELLED and store the latency (in addition to standard cleanup)//第二个观察者,取消订阅时处理者final Action0 unsubscribeCommandCleanup = new Action0() {@Overridepublic void call() {if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {if (!_cmd.executionResult.containsTerminalEvent()) {_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);try {executionHook.onUnsubscribe(_cmd);} catch (Throwable hookEx) {...}_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);}handleCommandEnd(false); //user code never ran} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {if (!_cmd.executionResult.containsTerminalEvent()) {_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);try {executionHook.onUnsubscribe(_cmd);} catch (Throwable hookEx) {...}_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);}handleCommandEnd(true); //user code did run}}};//第三个观察者,重点:Hystrix 核心逻辑: 断路器、隔离final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {return Observable.never();}return applyHystrixSemantics(_cmd);}};//第四个观察者,发射数据(OnNext表示发射数据)时的Hookfinal Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {@Overridepublic R call(R r) {R afterFirstApplication = r;try {afterFirstApplication = executionHook.onComplete(_cmd, r);} catch (Throwable hookEx) {logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);}try {return executionHook.onEmit(_cmd, afterFirstApplication);} catch (Throwable hookEx) {logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);return afterFirstApplication;}}};//第五个观察者,命令执行完成的Hookfinal Action0 fireOnCompletedHook = new Action0() {@Overridepublic void call() {try {executionHook.onSuccess(_cmd);} catch (Throwable hookEx) {logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);}}};//进行包装return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {/* this is a stateful object so can only be used once */if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {..省略抛出异常..}commandStartTimestamp = System.currentTimeMillis();if (properties.requestLogEnabled().get()) {// log this command execution regardless of what happenedif (currentRequestLog != null) {currentRequestLog.addExecutedCommand(_cmd);}}final boolean requestCacheEnabled = isRequestCachingEnabled();final String cacheKey = getCacheKey();/* try from cache first */if (requestCacheEnabled) {HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);if (fromCache != null) {isResponseFromCache = true;return handleRequestCacheHitAndEmitValues(fromCache, _cmd);}}// 使用上面的Func0:applyHystrixSemantics 来创建ObservableObservable<R> hystrixObservable =Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);Observable<R> afterCache;// 设置缓存逻辑,不太重要if (requestCacheEnabled && cacheKey != null) {// wrap it for cachingHystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);if (fromCache != null) {// another thread beat us so we'll use the cached value insteadtoCache.unsubscribe();isResponseFromCache = true;return handleRequestCacheHitAndEmitValues(fromCache, _cmd);} else {// we just created an ObservableCommand so we cast and return itafterCache = toCache.toObservable();}} else {afterCache = hystrixObservable;}//链式return afterCache.doOnTerminate(terminateCommandCleanup)// perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)).doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once.doOnCompleted(fireOnCompletedHook);}});}

推荐阅读