# # $Id: task.rb,v 1.5.1.2 2003/03/13 19:34:33 fukumoto Exp $ # require 'monitor' class Task class Request attr_reader :method, :args, :block, :thread, :callback def initialize(method, args, block, thread, callback) @method = method @args = args @block = block @thread = thread @callback = callback end end include MonitorMixin def initialize(*args, &block) super() @request_queue = [] @condvar = new_cond @finished = false @thread = Thread.new { begin self.body(*args, &block) rescue STDERR.puts($!, $@) # if $VERBOSE ??? raise ensure synchronize do @finished = true purge_queue end end } end protected def invoke(method, args, block) value = nil done = false caller_thread = Thread.current synchronize do raise ThreadError if @finished @request_queue.push(Request.new(method, args, block, caller_thread, Proc.new { |v| begin Thread.critical = true value = v; done = true caller_thread.wakeup rescue ThreadError ensure Thread.critical = false end })) @condvar.signal end begin until (Thread.critical = true; done) Thread.stop end ensure Thread.critical = false end value end def Task.entry(*names) names.each do |name| self.module_eval <<-End def #{name}(*args, &block) invoke(#{name.inspect}, args, block) end public #{name.inspect} End end end def accept(*methods) if methods.length == 1 and methods[0].kind_of? Hash methods = methods[0] synchronize do # if methods[:DELAY] # delay = Thread.new(Thread.current, methods[:DELAY]) { |th, d| # sleep d; th.raise Timeout # } # end if block_given? and not @request_queue.find { |req| methods[req.method] } yield else @condvar.wait_until { @request_queue.find { |req| methods[req.method] } } for i in 0...@request_queue.length req = @request_queue[i] if methods[req.method] @request_queue.delete_at(i) return req end end end end # end of synchronize else # "methods" is an Array of method names # methods.compact! synchronize do if block_given? and not @request_queue.find { |req| methods.include? req.method } yield else @condvar.wait_until { @request_queue.find { |req| methods.include? req.method } } for i in 0...@request_queue.length req = @request_queue[i] if methods.include? req.method @request_queue.delete_at(i) return req end end end # end if block_given?... end # end of synchronize end # end of if end # end of def accept def process(req) value = nil begin value = yield(*req.args) req.callback.call(value) rescue => e req.thread.raise e # ??? # raise end end def purge_queue @request_queue.each do |i| i.thread.raise ThreadError end end # subclass must define body # def body(args...) ... end end if $0 == __FILE__ class QueueTask < Task entry :read entry :write def body(n = 100) head = tail = 0 count = 0 buffer = Array.new(n) loop { request = accept(:read => count > 0, :write => count < n) case request.method when :read process(request) { buffer[tail] } tail = (tail + 1) % n count -= 1 when :write process(request) { |c| buffer[head] = c } head = (head + 1) % n count += 1 else raise # something went wrong end } end end q = QueueTask.new(10) STDOUT.sync = true sender = Thread.new { ('a'..'z').each do |i| q.write i end; q.write "END" } receiver = Thread.new { until (c = q.read) == "END"; print c end } sender.join receiver.join end