`
zhanghaj00
  • 浏览: 63162 次
社区版块
存档分类
最新评论

Hadoop 2.5.0 之 RPC 初体验 一 IPC server 类自己看

 
阅读更多

最近领导发话,搞了三周的web项目,整个人都不好了,加班加点,终于还是做到了自己喜欢的样子,从各种热启动,各种配置灵活,可惜没有用到JMX来管理,不过好的一点是能做到各种切换,各种配置,还是给自己打个50分。  总结项目中用到的技术,其实想想也就那么点  SpringMVC的 Json和XML的viewResover,Quartz2.x和Spring的整合来控制定时任务的调度重启什么的,还有正式环境测试环境的检测和自动切换,好吧 ,这么点东西搞了那么久 给自己40分。。

 

当然 期间也不能间断学习。

接下来就开始今天的主题吧。

 

从Reactor说起。Reactor是现在高并发的通用理论了,也是异步调用的很好的应用,很多著名的服务器都是用Reactor模式来做的,好的 简单说下这个模式的论文。。。。(好高深)

Reactor由以下几部分组成:(转xxx)

 

Handles:

就是网络连接(connection),每个网络连接都由一个handle表示。

Event(事件)
ACCEPT_EVENT:表示收到连接请求
READ_EVENT:表示收到数据
WRITE_EVENT:表示socket可以足够的缓冲区,可以向其写入数据。
TIMEOUT_EVENT:超时。Java NIO不支持这个事件。
SIGNAL_EVENT:信号。Java NIO不支持这个事件。
CLOSE_EVENT:该socket被关闭。Java NIO不支持这个事件。
 
Event Handler
事件处理代码,该对象实例(instance)是和handle关联的。每个Event Handler处理一个handle上的事件。一般的服务器中至少有两种事件处理代码:一个是用来接收连接请求(accept event handler),响应ACCEPT_EVENT。,一个用来处理接受的请求(connection event handler),响应READ_EVENT和WRITE_EVENT。
 
Synchronous Event Demultiplexer(事件多路分离系统)
该模式的核心,等待handles上的事件。没有事件出现时,一直阻塞(blocking);当某个(某些)handles上有时间产生时,返回。这个是实现一般由操作系统提供,如linux的select、poll及epoll等。
 
Initiation Dispatcher
该对象负责管理Event Handlers(增加,删除等),当某个handle上有事件出现时,调用相应的
内部使用Synchronous Event Demultiplexer
 
reactor的论文中给出了实例代码
Logging_Acceptor是处理接收请求的event handler。
Logging_Handler是处理实际请求的event handler
Initiation_Dispatcher::instance是单利模式的Initiation Dispatcher

-----------------------------------我是分割线------------------------------------------------

但是想学习这个模式 不能看论文吧,听说Hadoop的RPC 中,Server端就是利用了这个模型,我们拨开来看看

 

1.RPC  远程 producer call 我也不知道怎么翻译 ,

首先说说Server端负责什么,当然是处理client端的请求,其中还有为了实现R 远程这个而有的 反射和代理这里就不罗嗦了。。只说我看到的关键吧

 

进入ipc.Server类中

 

Server端做的就是 1.接收请求,2.处理请求,3,返回结果,(大象放入冰箱的三个步骤)

 

这其中怎么体现Reactor的设计模型呢?

 

(1) 接收请求阶段,

       首先 所有的请求都会放入一个内部类Call中然后放入一个队列里面,东西有了。然后怎么处理呢?Hadoop这里用的是一个内部类Listerner来建立连接,一个Reader来读取请求,

 

     先说Listerner 吧 ,其实就是轮询监听有没有新的请求

 

public Listener() throws IOException {
      //这里监听Server的请求
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);
	//设置成异步执行
      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
	//这里的readThreads是设置中的默认为1,修改这里的配置可以提高性能哦
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
	//启动reader线程
        reader.start();
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

 

说了Listerner是一个线程,初始化完了要启动,代码中只有一个doAccept(key);方法。可以理解 就是吧连接放到一个queue里面 然后reader已经启动啦 就可以开始干活了。

其中这段代码:是doaccept中

 

Reader reader = getReader();
Connection c = connectionManager.register(channel);
key.attach(c);  // so closeCurrentConnection can get the object
reader.addConnection(c);

就是将一个事件绑定到这个Key上 然后  设置一下最后联系的时间  呃呃呃。。我理解的这个attach在这里的用途就是这个。。额 可能我看不懂吧。。

 Connection c = (Connection)key.attachment();
 c.setLastContact(Time.now());

然后 不是Reader线程已经再跑了么。那run方法中:

先是给Connection注册 read事件conn.channel.register(readSelector, SelectionKey.OP_READ, conn);

然后再doread中 调用 count = c.readAndProcess(); 关键方法 开始读了。。这其中用了两个Seletor  分别是listener和reader 这个selector设置了异步就是通过socketchannel向selector注册实现的,额 这就是NIO部分的内容了,然后 就进入那个方法 就到了connection内部类调用handler内部类的时候了。。下次再看吧
        

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics