# # $Id: threadutil.rb,v 1.9 2003/03/15 14:35:59 fukumoto Exp fukumoto $ # # thread utilities # # Thread # Thread.critical_section { ... } # # ThreadList (Enumerable) # add # join # kill # raise # each # #=begin # ThreadGroup # join # kill #=end # # Enumerable # parallel # each_in_parallel # find_in_parallel # # Integer # threads # require 'thread' # require 'monitor' $print_thread_error = true unless defined?($print_thread_error) class Thread def Thread.critical_section Thread.critical = true yield ensure Thread.critical = false end def Thread.spin_wait_until Thread.critical = true until yield Thread.critical = false Thread.pass Thread.critical = true end nil ensure Thread.critical = false end def Thread.spin_wait_while Thread.critical = true while yield Thread.critical = false Thread.pass Thread.critical = true end nil ensure Thread.critical = false end end class ThreadList include Enumerable attr_reader :list def initialize @list = [] end def inspect sprintf("#", self.__id__, @list.length) end def add(x) @list.push(x) self end def join begin @list.pop.join until @list.empty? rescue ThreadError retry end self end def kill @list.each do |t| begin t.kill if t != Thread.current rescue ThreadError retry end end end def raise(*params) @list.each do |t| begin t.raise *params if t != Thread.current rescue ThreadError end end end def each @list.each do |i| yield i end end end =begin class ThreadGroup def join self.list.each do |th| th.join if th != Thread.current end self end def kill self.list.each do |th| th.kill if th != Thread.current end end end =end module Enumerable def parallel # threads = ThreadGroup.new threads = ThreadList.new self.each do |i| threads.add(Thread.new(i) { |j| begin yield j rescue Exception STDERR.puts($!, $@) if $print_thread_error raise end }) end threads end alias each_in_parallel parallel def each_in_parallel_with_index threads = ThreadList.new self.each_with_index do |item, index| threads.add(Thread.new(item, index) { |_item, _index| begin yield _item, _index rescue Exception STDERR.puts($!, $@) if $print_thread_error raise end }) end threads end class Terminate < Exception; end def find_in_parallel(ifnone = nil) mutex = Mutex.new cv = ConditionVariable.new found = false foundobj = nil done = false joiner = nil mutex.synchronize do threads = self.each_in_parallel do |item| # ??? there's possibility of getting Terminate exception here begin value = yield(item) if value mutex.synchronize do foundobj ||= item found = true cv.signal end end rescue Terminate end end joiner = Thread.new { threads.join mutex.synchronize do done = true cv.signal end } cv.wait(mutex) until (found or done) joiner.kill if joiner.alive? threads.raise Terminate if not found and ifnone ifnone.call end foundobj end end # end of def find_in_parallel end class Integer def threads (0...self).parallel { |i| yield i } end def to_infty # Generator.new { |g| i = self; loop { g.yield i; i = i.succ } } Pipe.new { |p| i = self; loop { p.push(i); i = i.succ } } end end