微服务组件--限流框架Spring Cloud Hystrix分析( 五 )

【10】分析核心applyHystrixSemantics方法
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {//执行命令开始执行的钩子方法 可能有人会问 前面绑定了那么多的钩子方法 这里怎么才开始//start 因为前面绑定但是并没有执行 。当有订阅者订阅 这里才是开始执行的代码逻辑executionHook.onStart(_cmd);//判断断路器是否开启if (circuitBreaker.allowRequest()) {//如果是信号量隔离返回TryableSemaphoreActual 根据设置的并发量来判断是否能执行,如果不能执行,进入fallback 。//如果是线程池隔离 返回TryableSemaphoreNoOp直接返回true没有任何操作final TryableSemaphore executionSemaphore = getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);final Action0 singleSemaphoreRelease = new Action0() {@Overridepublic void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {@Overridepublic void call(Throwable t) {eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);}};//判断能否正常执行if (executionSemaphore.tryAcquire()) {try {/* used to track userThreadExecutionTime */executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());//核心方法return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException e) {return Observable.error(e);}} else {//信号量执行的时候并发太大直接回退return handleSemaphoreRejectionViaFallback();}} else {//执行降级return handleShortCircuitViaFallback();}}//TryableSemaphoreActual类#tryAcquire方法@Overridepublic boolean tryAcquire() {int currentCount = count.incrementAndGet();if (currentCount > numberOfPermits.get()) {count.decrementAndGet();return false;} else {return true;}}//TryableSemaphoreNoOp类#tryAcquire方法@Overridepublic boolean tryAcquire() {return true;}【11】分析allowRequest方法是怎么判断是否允许通过的
@Overridepublic boolean allowRequest() {if (properties.circuitBreakerForceOpen().get()) {// 属性要求我们强制打开电路,这样我们将允许NO请求return false;}if (properties.circuitBreakerForceClosed().get()) {// 我们仍然希望允许isOpen()执行它的计算,因此我们模拟正常的行为isOpen();// 属性要求我们忽略错误 , 所以我们将忽略isOpen的结果,只允许所有的流量通过return true;}return !isOpen() || allowSingleTest();}@Overridepublic boolean isOpen() {//如果断路器打开立即返回trueif (circuitOpen.get()) {return true;}// we're closed, so let's see if errors have made us so we should trip the circuit openHealthCounts health = metrics.getHealthCounts();// check if we are past the statisticalWindowVolumeThresholdif (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anythingreturn false;}if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {return false;} else {// our failure rate is too high, trip the circuitif (circuitOpen.compareAndSet(false, true)) {// if the previousValue was false then we want to set the currentTimecircuitOpenedOrLastTestedTime.set(System.currentTimeMillis());return true;} else {// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have// caused another thread to set it to true already even though we were in the process of doing the same// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is openreturn true;}}}public boolean allowSingleTest() {long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();// 1) 如果断路器是打开的// 2) 且已经过了休眠时间,尝试打开if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {//已经过了休眠时间,允许一个请求尝试 。//如果成功,断路器被关闭 。if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {//如果这个返回true,意味着我们设置了时间 , 因此我们将返回true以允许单次尝试//如果它返回false,这意味着另一个线程在我们之前运行并允许单次尝试return true;}}return false;}【12】分析降级的逻辑
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();// record the executionResult// do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)executionResult = executionResult.addEvent((int) latency, eventType);if (shouldNotBeWrapped(originalException)){/* executionHook for all errors */Exception e = wrapWithOnErrorHook(failureType, originalException);return Observable.error(e);} else if (isUnrecoverable(originalException)) {logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);/* executionHook for all errors */Exception e = wrapWithOnErrorHook(failureType, originalException);return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));} else {if (isRecoverableError(originalException)) {logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);}if (properties.fallbackEnabled().get()) {/* fallback behavior is permitted so attempt */final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {@Overridepublic void call(Notification<? super R> rNotification) {setRequestContextIfNeeded(requestContext);}};final Action1<R> markFallbackEmit = new Action1<R>() {@Overridepublic void call(R r) {if (shouldOutputOnNextEvents()) {executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);}}};final Action0 markFallbackCompleted = new Action0() {@Overridepublic void call() {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);}};final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {@Overridepublic Observable<R> call(Throwable t) {Exception e = originalException;Exception fe = getExceptionFromThrowable(t);if (fe instanceof UnsupportedOperationException) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with iteventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);/* executionHook for all errors */e = wrapWithOnErrorHook(failureType, e);return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe));} else {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);/* executionHook for all errors */e = wrapWithOnErrorHook(failureType, e);return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe));}}};final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);final Action0 singleSemaphoreRelease = new Action0() {@Overridepublic void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {fallbackSemaphore.release();}}};Observable<R> fallbackExecutionChain;//上面那些定义的其实都不会在这里调用 , 主要是看下面的// acquire a permitif (fallbackSemaphore.tryAcquire()) {try {if (isFallbackUserDefined()) {executionHook.onFallbackStart(this);//HystrixCommand类#getFallbackObservablefallbackExecutionChain = getFallbackObservable();} else {//same logic as above without the hook invocationfallbackExecutionChain = getFallbackObservable();}} catch (Throwable ex) {//If hook or user-fallback throws, then use that as the result of the fallback lookupfallbackExecutionChain = Observable.error(ex);}return fallbackExecutionChain.doOnEach(setRequestContext).lift(new FallbackHookApplication(_cmd)).lift(new DeprecatedOnFallbackHookApplication(_cmd)).doOnNext(markFallbackEmit).doOnCompleted(markFallbackCompleted).onErrorResumeNext(handleFallbackError).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} else {return handleFallbackRejectionByEmittingError();}} else {return handleFallbackDisabledByEmittingError(originalException, failureType, message);}}}//HystrixCommand类#getFallbackObservable@Overridefinal protected Observable<R> getFallbackObservable() {return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {try {//调用GenericCommand类的getFallback方法【子类重新写父类】return Observable.just(getFallback());} catch (Throwable ex) {return Observable.error(ex);}}});}//GenericCommand类#getFallback方法@Overrideprotected Object getFallback() {final CommandAction commandAction = getFallbackAction();if (commandAction != null) {try {return process(new Action() {@OverrideObject execute() {MetaHolder metaHolder = commandAction.getMetaHolder();Object[] args = createArgsForFallback(metaHolder, getExecutionException());return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);}});} catch (Throwable e) {LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, e).build());throw new FallbackInvocationException(unwrapCause(e));}} else {return super.getFallback();}}

推荐阅读