【深入浅出 Yarn 架构与实现】2-4 Yarn 基础库 - 状态机库

当一个服务拥有太多处理逻辑时,会导致代码结构异常的混乱,很难分辨一段逻辑是在哪个阶段发挥作用的 。这时就可以引入状态机模型,帮助代码结构变得清晰 。
一、状态机库概述一)简介状态机由一组状态组成:【初始状态 -> 中间状态 -> 最终状态】 。在一个状态机中,每个状态会接收一组特定的事件,根据事件类型进行处理,并转换到下一个状态 。当转换到最终状态时则退出 。
二)状态转换方式状态间转换会有下面这三种类型:

【深入浅出 Yarn 架构与实现】2-4 Yarn 基础库 - 状态机库

文章插图
三)Yarn 状态机类在 Yarn 中提供了一个工厂类 StateMachineFactory 来帮助定义状态机 。如何使用,我们直接写个 demo 。
【深入浅出 Yarn 架构与实现】2-4 Yarn 基础库 - 状态机库

文章插图
二、案例 demo在上一篇文章《Yarn 服务库和事件库》案例基础上进行扩展 , 增加状态机库的内容 。如果还不了解服务库和事件库的同学,建议先学习下上一篇文章 。案例已上传至 github , 有帮助可以点个 ?https://github.com/Simon-Ace/hadoop-yarn-study-demo/tree/master/state-demo
一)状态机实现状态机实现,可以直接嵌入到上篇文章中的 AsyncDispatcher使用 。这里仅给出状态机JobStateMachine以及各种事件处理的代码 。完整的代码项目执行,请到 github demo 中查看 。
import com.shuofxz.event.JobEvent;import com.shuofxz.event.JobEventType;import org.apache.hadoop.yarn.event.EventHandler;import org.apache.hadoop.yarn.state.*;import java.util.EnumSet;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;/** 可参考 Yarn 中实现的状态机对象:* ResourceManager 中的 RMAppImpl、RMApp- AttemptImpl、RMContainerImpl 和 RMNodeImpl,* NodeManager 中 的 ApplicationImpl、 ContainerImpl 和 LocalizedResource,* MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等* */@SuppressWarnings({"rawtypes", "unchecked"})public class JobStateMachine implements EventHandler<JobEvent> {private final String jobID;private EventHandler eventHandler;private final Lock writeLock;private final Lock readLock;// 定义状态机protected static final StateMachineFactory<JobStateMachine, JobStateInternal,JobEventType, JobEvent>stateMachineFactory = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW).addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition()).addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition()).addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition()).addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition()).installTopology();private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;public JobStateMachine(String jobID, EventHandler eventHandler) {this.jobID = jobID;// 多线程异步处理,state 有可能被同时读写,使用读写锁来避免竞争ReadWriteLock readWriteLock = new ReentrantReadWriteLock();this.readLock = readWriteLock.readLock();this.writeLock = readWriteLock.writeLock();this.eventHandler = eventHandler;stateMachine = stateMachineFactory.make(this);}protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {return stateMachine;}public static class InitTransition implements SingleArcTransition<JobStateMachine, JobEvent> {@Overridepublic void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {System.out.println("Receiving event " + jobEvent);// do something...// 完成后发送新的 Event —— JOB_STARTjobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START));}}public static class StartTransition implements SingleArcTransition<JobStateMachine, JobEvent> {@Overridepublic void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {System.out.println("Receiving event " + jobEvent);jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED));}}public static class SetupCompletedTransition implements SingleArcTransition<JobStateMachine, JobEvent> {@Overridepublic void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {System.out.println("Receiving event " + jobEvent);jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED));}}public static class JobTasksCompletedTransition implements MultipleArcTransition<JobStateMachine, JobEvent, JobStateInternal> {@Overridepublic JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {System.out.println("Receiving event " + jobEvent);// 这是多结果状态部分,因此需要人为制定后续状态// 这里整个流程结束,设置一下对应的状态boolean flag = true;if (flag) {return JobStateInternal.SUCCEEDED;} else {return JobStateInternal.KILLED;}}}@Overridepublic void handle(JobEvent jobEvent) {try {// 注意这里为了避免静态条件,使用了读写锁writeLock.lock();JobStateInternal oldState = getInternalState();try {getStateMachine().doTransition(jobEvent.getType(), jobEvent);} catch (InvalidStateTransitionException e) {System.out.println("Can't handle this event at current state!");}if (oldState != getInternalState()) {System.out.println("Job Transitioned from " + oldState + " to " + getInternalState());}} finally {writeLock.unlock();}}public JobStateInternal getInternalState() {readLock.lock();try {return getStateMachine().getCurrentState();} finally {readLock.unlock();}}public enum JobStateInternal {NEW,SETUP,INITED,RUNNING,SUCCEEDED,KILLED}}

推荐阅读