【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC( 二 )


MyResourceTrackerMessage.proto 定义数据格式
syntax = "proto3";option java_package = "com.shuofxz.protobuf_rpc.proto";option java_outer_classname = "MyResourceTrackerMessageProto";option java_generic_services = true;option java_generate_equals_and_hash = true;message MyRegisterNodeManagerRequestProto {string hostname = 1;int32 cpu = 2;int32 memory = 3;}message MyRegisterNodeManagerResponseProto {string flag = 1;}MyResourceTracker.proto 定义 rpc 接口
syntax = "proto3";import "com/shuofxz/protobuf_rpc/proto/MyResourceTrackerMessage.proto";option java_package = "com.shuofxz.protobuf_rpc.proto";option java_outer_classname = "MyResourceTrackerProto";option java_generic_services = true;option java_generate_equals_and_hash = true;service MyResourceTrackerService {rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto);}2、对 proto 文件编译,生成 java 类
# 在项目根目录执行,路径按照自己的进行修改protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResource.protoprotoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResourceTracker.proto3、定义调用方法接口 MyResourceTracker
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto;public interface MyResourceTracker {MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception;}4、对调用方法接口的实现(服务端)
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;public class MyResourceTrackerImpl implements MyResourceTracker {@Overridepublic MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) {// 输出注册的消息String hostname = request.getHostname();int cpu = request.getCpu();int memory = request.getMemory();System.out.println("NodeManager 的注册消息: hostname = " + hostname + ", cpu = " + cpu + ", memory = " + memory);// 省略处理逻辑// 构建一个响应对象,用于返回MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.Builder builder =MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.newBuilder();// 直接返回 Truebuilder.setFlag("true");MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = builder.build();return response;}}5、编写 proto 的协议接口
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;import org.apache.hadoop.ipc.ProtocolInfo;@ProtocolInfo(protocolName = "com.shuofxz.blablabla", protocolVersion = 1)public interface MyResourceTrackerPB extends MyResourceTrackerProto.MyResourceTrackerService.BlockingInterface {}6、编写 proto 的协议接口实现(服务端)
import com.google.protobuf.RpcController;import com.google.protobuf.ServiceException;import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {final private MyResourceTracker server;public MyResourceTrackerServerSidePB(MyResourceTracker server) {this.server = server;}@Overridepublic MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(RpcController controller, MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) throws ServiceException {try {return server.registerNodeManager(request);} catch (Exception e) {e.printStackTrace();}return null;}}7、RPC Server 的实现
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.ProtobufRpcEngine;import org.apache.hadoop.ipc.RPC;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;import java.io.IOException;public class ProtobufRpcServer {public static void main(String[] args) throws IOException {Configuration conf = new Configuration();RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);// 构建 Rpc ServerRPC.Server server = new RPC.Builder(conf).setProtocol(MyResourceTrackerPB.class).setInstance(MyResourceTrackerProto.MyResourceTrackerService.newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new MyResourceTrackerImpl()))).setBindAddress("localhost").setPort(9998).setNumHandlers(1).setVerbose(true).build();// Rpc Server 启动server.start();}}8、RPC Client 的实现
import com.google.protobuf.ServiceException;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.ProtobufRpcEngine;import org.apache.hadoop.ipc.RPC;import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;import java.io.IOException;import java.net.InetSocketAddress;public class ProtobufRpcClient {public static void main(String[] args) throws IOException {// 设置 RPC 引擎为 ProtobufRpcEngineConfiguration conf = new Configuration();String hostname = "localhost";int port = 9998;RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);// 获取代理MyResourceTrackerPB protocolProxy = RPC.getProxy(MyResourceTrackerPB.class, 1, new InetSocketAddress(hostname, port), conf);// 构建请求对象MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.Builder builder =MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.newBuilder();MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto bigdata02 =builder.setHostname("bigdata02").setCpu(64).setMemory(128).build();// 发送 RPC 请求,获取响应MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = null;try {response = protocolProxy.registerNodeManager(null, bigdata02);} catch (ServiceException e) {e.printStackTrace();}// 处理响应String flag = response.getFlag();System.out.println("最终注册结果: flag = " + flag);}}

推荐阅读