微服务组件--限流框架Spring Cloud Hystrix分析( 六 )

【13】分析核心的executeCommandAndObserve执行逻辑
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();//主要是来对HystrixCommand和HystrixObservableCommand记录的事件是不同的final Action1<R> markEmits = new Action1<R>() {@Overridepublic void call(R r) {if (shouldOutputOnNextEvents()) {executionResult = executionResult.addEvent(HystrixEventType.EMIT);eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);}if (commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);circuitBreaker.markSuccess();}}};final Action0 markOnCompleted = new Action0() {@Overridepublic void call() {if (!commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);circuitBreaker.markSuccess();}}};//执行失败的逻辑定义final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {@Overridepublic Observable<R> call(Throwable t) {Exception e = getExceptionFromThrowable(t);executionResult = executionResult.setExecutionException(e);if (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);} else if (t instanceof HystrixTimeoutException) {return handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);} else {/** Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.*/if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);return Observable.error(e);}return handleFailureViaFallback(e);}}};final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {@Overridepublic void call(Notification<? super R> rNotification) {setRequestContextIfNeeded(currentRequestContext);}};//上面定义的都是一些异步调用事件,主体在这里Observable<R> execution;//如果超时开启使用HystrixObservableTimeoutOperator来对Observable做超时处理 。//所以不管是信号量隔离还是线程池隔离都会走该逻辑进行超时控制 。if (properties.executionTimeoutEnabled().get()) {//看名字就知道是特殊的隔离,也就是隔离逻辑所在execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));} else {execution = executeCommandWithSpecifiedIsolation(_cmd);}return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext);}private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {// the command timed out in the wrapping thread so we will return immediately// and not increment any of the counters below or other such logicreturn Observable.error(new RuntimeException("timed out before executing run()"));}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {//we have not been unsubscribed, so should proceedHystrixCounters.incrementGlobalConcurrentThreads();threadPool.markThreadExecution();// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());executionResult = executionResult.setExecutedInThread();/*** If any of these hooks throw an exception, then it appears as if the actual execution threw an error*/try {executionHook.onThreadStart(_cmd);executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd);} catch (Throwable ex) {return Observable.error(ex);}} else {//command has already been unsubscribed, so return immediatelyreturn Observable.error(new RuntimeException("unsubscribed before executing run()"));}}}).doOnTerminate(new Action0() {@Overridepublic void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {//if it was never started and received terminal, then no need to clean up (I don't think this is possible)}//if it was unsubscribed, then other cleanup handled it}}).doOnUnsubscribe(new Action0() {@Overridepublic void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {//if it was never started and was cancelled, then no need to clean up}//if it was terminal, then other cleanup handled it}}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {//线程池隔离调用@Overridepublic Boolean call() {return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;}}));} else {return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);// semaphore isolated// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());try {executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd);//the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw} catch (Throwable ex) {//If the above hooks throw, then use that as the result of the run methodreturn Observable.error(ex);}}});}}private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {Observable<R> userObservable;try {userObservable = getExecutionObservable();} catch (Throwable ex) {// the run() method is a user provided implementation so can throw instead of using Observable.onError// so we catch it here and turn it into Observable.erroruserObservable = Observable.error(ex);}return userObservable.lift(new ExecutionHookApplication(_cmd)).lift(new DeprecatedOnRunHookApplication(_cmd));}@Overridefinal protected Observable<R> getExecutionObservable() {return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {try {//调用GenericCommand类的run方法【子类重新写父类】return Observable.just(run());} catch (Throwable ex) {return Observable.error(ex);}}}).doOnSubscribe(new Action0() {@Overridepublic void call() {// Save thread on which we get subscribed so that we can interrupt it later if neededexecutionThread.set(Thread.currentThread());}});}//GenericCommand类#run方法@Overrideprotected Object run() throws Exception {LOGGER.debug("execute command: {}", getCommandKey().name());return process(new Action() {@OverrideObject execute() {return getCommandAction().execute(getExecutionType());}});}

推荐阅读