webrick源码分析──主要流程

webrick作为ruby自带的一个http server,很适合拿来作为学习之用。首先来看看最简单的使用webrick的示例吧

require 'webrick'

server = WEBrick::HTTPServer.new({:Port => 3000, :DocumentRoot => '/home/flyerhzm/public_html'})

['INT', 'TERM'].each { |signal|
   trap(signal) { server.shutdown }
}

server.start

这段代码主要是定义了http服务器监听3000端口,根目录在/home/flyerhzm/public_html下,在接收INT或TERM信号时,关闭服务器,然后启动服务器。

我们分两部分来看,首先看看服务器初始化时做了些什么

class GenericServer
  attr_reader :status, :config, :logger, :tokens, :listeners

  def initialize(config={}, default=Config::General)
    @config = default.dup.update(config)
    @status = :Stop
    @config[:Logger] ||= Log::new
    @logger = @config[:Logger]

    @tokens = SizedQueue.new(@config[:MaxClients])
    @config[:MaxClients].times{ @tokens.push(nil) }

    webrickv = WEBrick::VERSION
    rubyv = #{RUBY_VERSION} (#{RUBY_RELEASE_DATE}) [#{RUBY_PLATFORM}]
    @logger.info(WEBrick #{webrickv})
    @logger.info(ruby #{rubyv})

    @listeners = []
    unless @config[:DoNotListen]
      if @config[:Listen]
        warn(:Listen option is deprecated; use GenericServer#listen)
      end
      listen(@config[:BindAddress], @config[:Port])
      if @config[:Port] == 0
        @config[:Port] = @listeners[0].addr[1]
      end
    end
  end
end

class HTTPServer  ::WEBrick::GenericServer
  def initialize(config={}, default=Config::HTTP)
    super
    @http_version = HTTPVersion::convert(@config[:HTTPVersion])

    @mount_tab = MountTable.new
    if @config[:DocumentRoot]
      mount(/, HTTPServlet::FileHandler, @config[:DocumentRoot],
            @config[:DocumentRootOptions])
    end

    unless @config[:AccessLog]
      @config[:AccessLog] = [
        [ $stderr, AccessLog::COMMON_LOG_FORMAT ],
        [ $stderr, AccessLog::REFERER_LOG_FORMAT ]
      ]
    end

    @virtual_hosts = Array.new
  end
end

WEBrick::HTTPServer继承自WEBrick::GenericServer

WEBrick::GenericServer初始化时

首先记录所有的配置信息,预定义的WEBrick::Config::HTTP和用户定义的配置信息,包括监听端口,请求超时时间,文档根目录等等。

接着生成一个定长的队列SizedQueue,用来控制最大的客户端连接数。注意这里的SizedQueue放入的并不是一个线程,而是nil。

然后打印当前的WEBrick版本号和Ruby版本号。

最后调用listen方法,生成TCPServer,监听端口。这里可能生成两个TCPServer,一个是IPv4的,一个是IPv6的。

WEBrick::HTTPServer初始化时主要是定义了http版本号,根据配置信息mount根目录,这里将http://localhost/映射到/home/flyerhzm/public_html/目录,默认DirectoryIndex为[index.html,index.htm,index.cgi,index.rhtml],即请求为目录时,显示目录下的index.html, index.htm, index.cgi或者index.rhtml,DocumentRootOptions为{ :FancyIndexing = true },即请求为目录且目录下没有DirectoryIndex定义的文件时,显示目录下的所有文件。这些都是在WEBrick::HTTPServlet::FileHandler中定义的,我会在之后的文章介绍。

介绍完初始化,下面来看看start方法是如何实现的

def start()
  raise ServerError, already started. if @status != :Stop
  server_type = @config[:ServerType] || SimpleServer

  server_type.start{
    @logger.info \
      #{self.class}#start: pid=#{$} port=#{@config[:Port]}
    call_callback(:StartCallback)

    thgroup = ThreadGroup.new
    @status = :Running
    while @status == :Running
      begin
        if svrs = IO.select(@listeners, nil, nil, 2.0)
          svrs[0].each{|svr|
            @tokens.pop          # blocks while no token is there.
            if sock = accept_client(svr)
              th = start_thread(sock, block)
              th[:WEBrickThread] = true
              thgroup.add(th)
            else
              @tokens.push(nil)
            end
          }
        end
      rescue Errno::EBADF, IOError = ex
        # if the listening socket was closed in GenericServer#shutdown,
        # IO::select raise it.
      rescue Exception = ex
        msg = #{ex.class}: #{ex.message}\n\t#{ex.backtrace[0]}
        @logger.error msg
      end
    end

    @logger.info going to shutdown ...
    thgroup.list.each{|th| th.join if th[:WEBrickThread] }
    call_callback(:StopCallback)
    @logger.info #{self.class}#start done.
    @status = :Stop
  }
end

首先,根据不同的ServerType执行不同的start方法,定义有SimpleServer和Daemon两种,Daemon方式会在以后的文章中介绍,默认为SimpleServer

class SimpleServer
  def SimpleServer.start
    yield
  end
end

非常简单,就是直接执行传过来的block

在这个block中生成一个线程组,用来存放处理http请求的线程。

IO.select(@listeners, nil, nil, 2.0)方法监听@listeners(就是tcp server),一旦有数据进入就返回,并设置2秒超时,防止进程被挂死。

对于客户端连接的socket请求,创建一个新的线程来处理,并把这个线程放入线程组中。这里用了一个小技巧来控制线程组中线程的数量。一般我们是将线程插入到SizedQueue来控制线程的数量,而这里SizedQueue插入满nil,每次创建一个线程之前,先从SizedQueue pop一个nil,每次线程处理完在push一个nil,这样,当创建了一定数量的线程时,SizedQueue就为空,无法再pop数据,只有等待一个线程处理完后才能继续。

接着先来看看如何关闭服务器。webrick提供了两种方法:

def stop
  if @status == :Running
    @status = :Shutdown
  end
end

一是stop,它只是简单地将服务器的状态由Running改为Shutdown,这样就可以从start方法中的循环跳出来,不过由于start方法最后有这么一句话:thgroup.list.each{|th| th.join if th[:WEBrickThread] },这表示服务器并不会马上关闭,它会等到线程组中所有的线程都执行完毕之后再关闭。

def shutdown
  stop
  @listeners.each{|s|
    if @logger.debug?
      addr = s.addr
      @logger.debug(close TCPSocket(#{addr[2]}, #{addr[1]}))
    end
    s.close
  }
  @listeners.clear
end

二是shutdown,它首先执行stop方法,然后遍历所有的sockets并关闭,这样所有的线程都会买上结束,服务器也会马上停止。

再来看看每个线程都做了些什么

def start_thread(sock, )
  Thread.start{
    begin
      Thread.current[:WEBrickSocket] = sock
      begin
        addr = sock.peeraddr
        @logger.debug "accept: #{addr[3]}:#{addr[1]}"
      rescue SocketError
        @logger.debug "accept: address unknown>"
        raise
      end
      call_callback(:AcceptCallback, sock)
      block ? block.call(sock) : run(sock)
    rescue Errno::ENOTCONN
      @logger.debug "Errno::ENOTCONN raised"
    rescue ServerError => ex
      msg = "#{ex.class}: #{ex.message}\n\t#{ex.backtrace[0]}"
      @logger.error msg
    rescue Exception => ex
      @logger.error ex
    ensure
      @tokens.push(nil)
      Thread.current[:WEBrickSocket] = nil
      if addr
        @logger.debug "close: #{addr[3]}:#{addr[1]}"
      else
        @logger.debug "close: address unknown>"
      end
      sock.close
    end
  }
end

如果传入一个block,就执行这个block,不然就执行run方法,run方法的定义在WEBrick::HTTPServer下

def run(sock)
  while true 
    res = HTTPResponse.new(@config)
    req = HTTPRequest.new(@config)
    server = self
    begin
      timeout = @config[:RequestTimeout]
      while timeout  0
        break if IO.select([sock], nil, nil, 0.5)
        timeout = 0 if @status != :Running
        timeout -= 0.5
      end
      raise HTTPStatus::EOFError if timeout = 0 || sock.eof?
      req.parse(sock)
      res.request_method = req.request_method
      res.request_uri = req.request_uri
      res.request_http_version = req.http_version
      res.keep_alive = req.keep_alive?
      server = lookup_server(req) || self
      if callback = server[:RequestCallback] || server[:RequestHandler]
        callback.call(req, res)
      end
      server.service(req, res)
    rescue HTTPStatus::EOFError, HTTPStatus::RequestTimeout = ex
      res.set_error(ex)
    rescue HTTPStatus::Error = ex
      @logger.error(ex.message)
      res.set_error(ex)
    rescue HTTPStatus::Status = ex
      res.status = ex.code
    rescue StandardError = ex
      @logger.error(ex)
      res.set_error(ex, true)
    ensure
      if req.request_line
        req.fixup()
        res.send_response(sock)
        server.access_log(@config, req, res)
      end
    end
    break if @http_version  1.1
    break unless req.keep_alive?
    break unless res.keep_alive?
  end
end

run方法中,首先,根据配置信息实例化HTTPResponse和HTTPRequest,在设置的timeout之内读取socket数据,不然停止执行。request对象读取socket数据并根据HTTP协议进行解析(关于http请求和应答的解析将在后文进行介绍),将部分内容(request_method, request_uri等等)赋值给response对象。调用service方法,根据request进行操作,并返回相应的response。最后,通过socket将response发送给客户端。需要注意的是,如果http版本是1.1而且keep_alive为true的话,run方法的循环将一直执行,来保持与客户端的长连接。

最后看看service方法的代码

def service(req, res)
  if req.unparsed_uri == *
    if req.request_method == OPTIONS
      do_OPTIONS(req, res)
      raise HTTPStatus::OK
    end
    raise HTTPStatus::NotFound, `#{req.unparsed_uri}' not found.
  end

  servlet, options, script_name, path_info = search_servlet(req.path)
  raise HTTPStatus::NotFound, `#{req.path}' not found. unless servlet
  req.script_name = script_name
  req.path_info = path_info
  si = servlet.get_instance(self, *options)
  @logger.debug(format(%s is invoked., si.class.name))
  si.service(req, res)
end

对于OPTIONS请求是需要特殊处理,返回可以处理的请求(GET, HEAD, POST, OPTIONS),根据请求的path返回相应的servlet,options, script_name和path_info,并获取到servlet实例(一般是用户定义的Servlet类,WEBrick默认有FileHandler, CGIHandler和ProcHandler),然后由具体的servlet实例来处理http请求。

这就是WEBrick的主要流程,写得比较乱,之后的文章根据WEBrick的功能一部分一部分详细介绍。

Posted in  webrick ruby


capistrano读取releases目录的错误

新年刚开始工作就遇到capistrano读取releases目录的错误,deploy之后总是把最新的release目录删除,看来是判断哪个release目录是最新的时候出错了。

看了下2.5.11源代码,capistrano是这样定义releases目录的

_cset(:releases)          { capture("ls -x #{releases_path}").split.reverse }

其中ls -x的结果是

20091224074632  20091228080936  20091228082551  20100104023017  20100104025008

也就是说releases的结果就是

['20100104025008', '20100104023017', '20091228082551', '20091228080936', '20091224074632']

再看看删除release部分的代码

directories = (releases - releases.last(count)).map { |release|
  File.join(releases_path, release) }.join(" ")

try_sudo "rm -rf #{directories}"

从这段代码的逻辑可以判断,capistrano会把最新的20100104025008目录删除,显然这不是我们希望看到的结果。

看了2.5.9的源代码和github上最新的代码,releases却是这样定义的

_cset(:releases)          { capture("ls -x #{releases_path}").split.reverse }

而ls -xt的执行结果是

20100104025008  20100104023017  20091228082551  20091228080936  20091224074632

和2.0.11正好相反,看来这就是问题的症结。查了一下lighthouse,https://capistrano.lighthouseapp.com/projects/8716/tickets/88-getting-the-newest-directory#ticket-88-19,原来是因为file cache store才把-t参数去掉了,但是却导致了新的问题,根据上面的解决方案,在config/deploy.rb文件中增加

set(:releases) { capture("ls -x #{releases_path}").split }

即可

Posted in  ruby capistrano


Fiber in Ruby 1.9

Ruby 1.9新推出了Fiber这个新的概念,有人说它是轻量级的Thread,其实不然。它是一段代码块,可以停止、继续,可以有返回值、写入值,有多个Fiber时,它们的执行顺序是固定。它和Thread相似的是,它的执行不是线性的,它可以在中途停止,将控制权交给主程序或者是其它的Fiber,但是中控制权交接的过程是由你来控制的,而不是线程调度程序。所以有时候Fiber可以完成之前只能用Thread才能完成的任务(比如:Producer-Consumer)。

先来看看一个例子吧:

require 'fiber'

fiber = Fiber.new do
  (1..3).each do |i|
    Fiber.yield(i)
  end
end

while fiber.alive?
  puts fiber.resume
end

运行结果是

1
2
3
1..3

首先,Fiber.new定义了一个Fiber,但是并不会执行,直到调用这个Fiber的resume方法,这个Fiber才会执行,并且当执行到Fiber.yield时停止,并且把yield的参数返回给主程序,同时将控制权将给主程序。然后主程序继续执行,调用这个Fiber的resume方法,这个Fiber从刚才停止的地方继续执行,直到Fiber.yield,以此类推。当这个Fiber执行完毕时,Fiber#alive?返回false,如果这个时候继续调用Fiber#resume,系统将抛出异常。

运行结果稍微与我们的预想有点偏差,我们并不需要最后一行1..3,原因是在调用三次Fiber#resume分别返回1、2、3,这个时候Fiber并没有执行完毕,所以Fiber#alive?仍然返回true,第四次调用Fiber#resume返回的这个Fiber block内的返回值,这里就是(1..3),所以我们需要对程序做点小小的修改

require 'fiber'

fiber = Fiber.new do
  (1..3).each do |i|
    Fiber.yield(i)
  end
end

loop do
  output = fiber.resume
  break unless fiber.alive?
  puts output
end

这下运行的结果就和预期一致了。

再来看看如何向Fiber写入数据

require 'fiber'

fiber = Fiber.new do
  loop do
    input = Fiber.yield
    break if input.to_s.empty?
    puts input
  end
end

(1..3).each do |i|
  fiber.resume i
end

运行结果

2
3

这和从Fiber中读取数据是个相反的操作,通过给Fiber#resume传递参数将数据作为Fiber.yield的返回值传入Fiber。

你可能会很奇怪,为什么只打印了2和3,没有1呢?因为在第一次调用Fiber#resume的时候,Fiber还没有开始,resume的参数是传递给Fiber block的,当这个Fiber运行到Fiber.yield时,这个Fiber停止,然后第二次调用Fiber#resume的时候,将2传递了Fiber,并作为yield的返回值,所以打印了2,我们需要修改一下代码来打印1、2、3

require 'fiber'

fiber = Fiber.new do |title|
  puts title
  loop do
    input = Fiber.yield
    break if input.to_s.empty?
    puts input
  end
end

fiber.resume 'title'
(1..3).each do |i|
  fiber.resume i
end

运行结果为

title
1
2
3

如果需要在两个Fiber之间切换的话,可以使用Fiber#transfer,用来实现Producer-Consumer。

Posted in  ruby


rails的异常处理

当在本地开发模式下发生异常的时候,rails会将错误发生的点、错误桟以及请求和应答的内容显示在浏览器上,并且在console下面打印出错误桟,这些使得调试web应用变得更容易。那rails内部是如何处理异常的呢?

rails把与ActionController相关的异常处理都定义在了ActionController::Rescue里面。其中最关键的是

alias_method_chain :perform_action, :rescue

perform_action是ActionController处理http请求的方法,它负责根据不同的请求调用相应的action。而上面这句话则为perform_action添加了处理异常的功能,看看具体的实现

def perform_action_with_rescue #:nodoc:
  perform_action_without_rescue
rescue Exception => exception
  rescue_action(exception)
end

可以看到,当perform_action执行发生异常时,通过rescue_action方法来处理异常。

整个过程的实现非常优雅,通过alias_method_chain为原来的perform_action增加了异常处理功能,却完全不用修改也不用关心perform_action原来的实现。rails的实现中大量使用了alias_method_chain,将功能点从方法的实现中剥离出来,有点AOP的思想。

最后看看本地和远程处理的不同

if consider_all_requests_local || local_request?
  rescue_action_locally(exception)
else
  rescue_action_in_public(exception)
end

还记得development.rb中有一句话是config.action_controller.consider_all_requests_local = true,就是在这里起作用的。如果是local_request,会显示所有的错误桟和请求应答消息,不然就只是显示404或500的静态页面。除非你重写rescue_action_in_public方法(比如exception_notification插件)

Posted in  rails


生病小记

上周五突然感到不舒服,就向maxime请了假,早早回家,大概5点到地段医院,被告知医院关门了,而且没有急诊,只能回家睡觉了。结果这一睡就是12个小时,中间自然是醒醒睡睡,量量体温有38.6度。夏天发烧的话裹紧被子睡一觉出出汗就好了,现在这冬天都出不了汗呢,只好喝些感冒退烧冲剂,继续蒙头睡觉。

一个周末就完全窝在床上了,看看电视睡睡觉,也不错,就是一直腰酸背痛的。

最近跑步有点减少,看来还是得坚持。

Posted in  life


Fork me on GitHub