博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop中客户端和服务器端的方法调用过程
阅读量:6580 次
发布时间:2019-06-24

本文共 11693 字,大约阅读时间需要 38 分钟。

1、Java动态代理实例

Java 动态代理一个简单的demo:(用以对比Hadoop中的动态代理)

Hello接口:

public interface Hello {        void sayHello(String to);        void print(String p);   }

Hello接口的实现类:

public
class HelloImpl
implements
Hello { 
     
  
public
void
sayHello(String to) { 
        System.out.println(
"Say hello to " +
to); 
    } 
     
  
public
void
print(String s) { 
        System.out.println(
"print : " +
s); 
    } 
     
}

与代理类(HelloImpl类)相关联的InvocationHandler对象

public
class LogHandler
implements
InvocationHandler { 
     
  
private
Object dele; 
     
  
public
LogHandler(Object obj) { 
      
this.dele =
obj; 
    } 
     
  
public Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable { 
        doBefore(); 
      
//
在这里完全可以把下面这句注释掉,而做一些其它的事情 
        Object result =
method.invoke(dele, args); 
        after(); 
      
return
result; 
    } 
     
  
private
void
doBefore() { 
        System.out.println(
"before...."
); 
    } 
     
  
private
void
after() { 
        System.out.println(
"after...."
); 
    } 
}

最后测试代码如下:

public
class
ProxyTest { 
 
  
public
static
void
main(String[] args) { 
        HelloImpl impl
=
new
HelloImpl(); 
        LogHandler handler
=
new
LogHandler(impl); 
      
//
这里把handler与impl新生成的代理类相关联 
        Hello hello =
(Hello) Proxy.newProxyInstance(impl.getClass().getClassLoader(), impl.getClass().getInterfaces(), handler); 
         
      
//
这里无论访问哪个方法,都是会把请求转发到handler.invoke
        hello.print("All the test"
); 
        hello.sayHello(
"Denny"
); 
    } 
 
}
 

2、Hadoop中的动态代理

2.1、客户端方法调用过程

IPC客户端的处理比动态代理实例稍微复杂:代理对象上的调用被InvocationHandler捕获后,请求被打包并通过IPC连接发送到服务器上,客户端等待并在服务器的处理应答到达后,生成并返回调用结果。IPC上的调用是个同步操作,即,线程会一直等待调用结束,才会开始后续处理;而网络的处理时异步的,请求发送后,不需要等待应答。客户端通过java的wait()/notify()机制简单地解决了异步网络处理和同步IPC调用的差异。

 

Hadoop对外提供查询文件状态的接口,如下:

public interface IPCQueryStatus extends VersionedProtocol {    IPCFileStatus getFileStatus(String filename);}

客户端通过如下代码调用:

IPCQueryStatus query = (IPCQueryStatus) RPC.getProxy(IPCQueryStatus.class, IPCQueryServer.IPC_VER, addr, new Configuration());IPCFileStatus status = query.getFileStatus("\tmp\testIPC");

2.1.1、Client端动态代理实现

在RPC的getProxy代码如下:

public static VersionedProtocol getProxy(      Class
protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { ...... VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); ...... return proxy; ...... }

需要制定一个InvocationHandler,对于所有的调用请求,这个InvocationHandler都是Invoke,如下:

private static class Invoker implements InvocationHandler {    private Client.ConnectionId remoteId;// 用来标示一个connection,用以复用    private Client client;//最重要的成员变量,RPC客户端    private boolean isClosed = false;    public Invoker(Class
protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf); this.client = CLIENTS.getClient(conf, factory);//★ } ...... public Object invoke(Object proxy, Method method, Object[] args) ...... ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); ...... return value.get(); }}

在上面的代码中,client负责发送IPC请求,并获取结果,类似最上面demo中LogHandler中的dele。

2.1.2、Client通过Connection发送IPC请求并获取结果

如下为client.call方法调用Connection.sendParam发送IPC请求:

public Writable call(Writable param, ConnectionId remoteId)                         throws InterruptedException, IOException {    Call call = new Call(param);    Connection connection = getConnection(remoteId, call);    connection.sendParam(call);                 // send the parameter    ...    synchronized (call) {      while (!call.done) {        try {          call.wait();                           // wait for the result        } catch (InterruptedException ie) {          ...        }      }      ...      if (call.error != null) {        ...        throw call.error;        ...      } else {        return call.value;      }    }}

connection.sendParam后,会再调用receiveMessage来获取返回结果。如下:

private class Connection extends Thread {    ......        public void run() {        ......        while (waitForWork()) {
//wait here for work - read or close connection receiveResponse(); } ...... } ...... private void receiveResponse() { ...... touch(); try { int id = in.readInt(); // try to read an id ...... Call call = calls.get(id); int state = in.readInt(); // read call status if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setValue(value); calls.remove(id); } else if (state == Status.ERROR.state) { call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); calls.remove(id); } else if (state == Status.FATAL.state) { // Close the connection markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); } } catch (IOException e) { markClosed(e); } }}

connection会调用call的setValue或者setException,两个方法都会调用callComplete方法,来调用notify通知进程IPC调用已结束

protected synchronized void callComplete() {      this.done = true;      notify();                                 // notify caller    }    public synchronized void setException(IOException error) {      this.error = error;      callComplete();    }            public synchronized void setValue(Writable value) {      this.value = value;      callComplete();    }

 

2.2、服务器端方法调用过程

服务端由Listener接收。

2.2.1、Listener接收IPC请求的工作过程

Listener主要运行NIO选择器循环,并在Listener.doRead()方法中读取数据,Connection.readAndProcess()中恢复数据帧,然后调用processData().

void Listener.doRead(SelectionKey key) throws InterruptedException {    int count = 0;    Connection c = (Connection)key.attachment();    ...    count = c.readAndProcess();    ...      }public int Connection.readAndProcess() throws IOException, InterruptedException {    ......    processOneRpc(data.array());    ......}private void Connection.processOneRpc(byte[] buf) throws IOException,        InterruptedException {    if (headerRead) {        processData(buf);    } else {        processHeader(buf);        ......    }}private void Connection.processData(byte[] buf) throws  IOException, InterruptedException {    DataInputStream dis =        new DataInputStream(new ByteArrayInputStream(buf));    int id = dis.readInt();                    // try to read an id    ......    Writable param = ReflectionUtils.newInstance(paramClass, conf);//★??paramClass在哪儿设置的★在RPC.Server中,paramClass是Invocation,IPC调用传递的都是Invocation    param.readFields(dis);            Call call = new Call(id, param, this);    callQueue.put(call);              // queue the call; maybe blocked here}

ProcessData反序列化调用参数,构造服务器端的Call对象。然后放入callQueue队列中。callQueue阻塞队列定义于Server类中,是Listener和Handler的边界。(生产者Listener消费者Handler)。

2.2.2、Handler处理IPC请求的工作过程

Handler主要工作都在run方法中完成。主循环中,每循环一次处理一个请求(通过调用Server的抽象方法call来完成)。

public void run() {    ......    SERVER.set(Server.this);    ByteArrayOutputStream buf =     new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);    while (running) {        final Call call = callQueue.take(); // 获取一个IPC调用        ......        String errorClass = null;        String error = null;        Writable value = null;        CurCall.set(call);        ......        value = call(call.connection.protocol, call.param,                    call.timestamp);//实际代码用到jaas,这里简化        ......        CurCall.set(null);        synchronized (call.connection.responseQueue) {        ......        setupResponse(buf, call,                     (error == null) ? Status.SUCCESS : Status.ERROR,                     value, errorClass, error);        ...        responder.doRespond(call);//★?        }    }}

Server.call调用后返回一个writable对象--value,然后通过调用setupResponse将结果序列化到call的Response成员变量中。

private void setupResponse(ByteArrayOutputStream response,                              Call call, Status status,                              Writable rv, String errorClass, String error)   throws IOException {    response.reset();    DataOutputStream out = new DataOutputStream(response);    out.writeInt(call.id);                // write call id    out.writeInt(status.state);           // write status    if (status == Status.SUCCESS) {      rv.write(out);    } else {      WritableUtils.writeString(out, errorClass);      WritableUtils.writeString(out, error);    }   ......    call.setResponse(ByteBuffer.wrap(response.toByteArray()));  }

Server.call抽象方法的具体实现在RPC.Server中。代码如下:

private Object instance;......public Writable call(Class
protocol, Writable param, long receivedTime) throws IOException { Invocation call = (Invocation)param; Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); Object value = method.invoke(instance, call.getParameters()); return new ObjectWritable(method.getReturnType(), value); }

Handler所在线程是共享资源,当有一个IPC请求处理完后,即调用Response的doResponse返回结果,而不亲自返回,原因有二:

1. 对共享资源的占用时间越短越好;

2. IPC返回受网络通信时间影响,可能会占用很长时间。

2.2.3、Response的工作过程

doResponse的代码很简单,将Call放入IPC连接的应答队列中,如果应答队列为1,立即调用processResponse发放向客户端发送结果,(队列为1,表明此IPC连接比较空闲,直接发送,避免从Handler线程到Response线程的切换开销)

void doRespond(Call call) throws IOException {      synchronized (call.connection.responseQueue) {        call.connection.responseQueue.addLast(call);        if (call.connection.responseQueue.size() == 1) {          processResponse(call.connection.responseQueue, true);        }      }    }

Response有一个类似于Listener的NIO选择器,用来处理当队列不为1时的发送。只是Listener关注OP_READ和OP_ACCEPT事件,而Response关注OP_WRITE事件。代码如下:

public void run() {      while (running) {                  waitPending();     // 等待通道登记          writeSelector.select(PURGE_INTERVAL); // 等待通道可写          Iterator
iter = writeSelector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); try { if (key.isValid() && key.isWritable()) { doAsyncWrite(key);//输出远程IPC调用结果 } } catch (IOException e) { } } ...... }}private void doAsyncWrite(SelectionKey key) throws IOException { Call call = (Call)key.attachment(); ...... synchronized(call.connection.responseQueue) { if (processResponse(call.connection.responseQueue, false)) {
//调用输出 try { key.interestOps(0);//processResponse返回true,表示无等待数据,清楚兴趣操作集 } catch (CancelledKeyException e) { ...... } } }}private boolean processResponse(LinkedList
responseQueue, boolean inHandler) throws IOException { ...... synchronized (responseQueue) { ...... int numBytes = channelWrite(channel, call.response); done = true; // error. no more data for this channel. closeConnection(call.connection); } return done;}

processResponse关键点:

1. 可被Handler调用(当应答队列为1),参数inHandler为true,也可被Response调用,参数inHandler为false,表示队列为1或更多。

2. 返回true,表示通道上无需要发送的数据。

2.3总结

IPC Client端,发送Client.Call(new Invocation(method,args), remoteId)

--封装过程:Call.Id ,  Invocation---(查看Client.Connection.sendParam)

IPC Server端,接收Server.Call(Id, Invocation, Connction)---封装过程:Call.Id,Invocation--(查看Server.Connction.processData)

转载地址:http://qinno.baihongyu.com/

你可能感兴趣的文章
Laravel 学习笔记5.3之 Query Builder 源码解析(下)
查看>>
Struts2简单入门实例
查看>>
2012CSDN年度博客之星评选http://vote.blog.csdn.net/item/blogstar/xyz_lmn
查看>>
BZOJ 4037 [HAOI2015]数字串拆分 ——动态规划
查看>>
SpringBoot实战总汇--详解
查看>>
2018年7月1日笔记
查看>>
尝试使用iReport4.7(基于Ubuntu Desktop 12.04 LTS)
查看>>
动态规划:金矿模型
查看>>
子元素应该margin-top为何会影响父元素【转】
查看>>
AJAX 状态值(readyState)与状态码(status)详解
查看>>
BZOJ3668:[NOI2014]起床困难综合症(贪心)
查看>>
LightOJ 1245(Harmonic Number (II))
查看>>
小知识记录
查看>>
css3 animate 和关键帧 @-webkit-keyframes
查看>>
文字链接颜色设置
查看>>
图片转流
查看>>
ubunto应用软件
查看>>
HTML 标签说明
查看>>
锋利的jQuery-2--判断jQuery获取到的对象是否存在$().length
查看>>
linux 查询系统版本命令、查询端口号是否被占用命令
查看>>