# # $Id: pipe.rb,v 1.14 2003/03/17 16:26:50 fukumoto Exp $ # # Pipe (<= Enumerable) # Pipe.new(maxqlen = nil) # inspect # close # closed? # push(args...) # pop # pop_eof? # dequeue # each # generator(procobj, args...) # # Enumerable # pipe # collect_in_parallel # find_all_in_parallel # each_product_in_parallel # step(n) # require 'semaphore' $print_thread_error = true unless defined?($print_thread_error) class Pipe include Enumerable def initialize(maxqlen = nil) raise ArgumentError if maxqlen and not maxqlen.kind_of? Integer super() @queue = [] @done = false @had_done = false @maxqlen = maxqlen @push_sem = CountingSemaphore.new(@maxqlen) if @maxqlen @pop_sem = CountingSemaphore.new(0) if block_given? @block = Proc.new { yield self } spawn_thread end end def spawn_thread @thread = Thread.new { begin @block.call rescue Exception $stderr.puts($!, $@) if $print_thread_error raise ensure self.close end } end def rewind raise unless @thread and @block kill # strange thing may occur if there were threads waiting on semaphore @queue = [] @done = false @had_done = false @push_sem = CountingSemaphore.new(@maxqlen) if @maxqlen @pop_sem = CountingSemaphore.new(0) spawn_thread end def join @thread.join end def kill @thread.kill @thread = nil end def inspect sprintf("#", self.__id__, (@done ? "closed" : "open"), @queue.length) end def close @done = true @pop_sem.signal self end def closed? @done end def eof? @done and @queue.empty? end def push(obj) @push_sem.wait if @maxqlen raise EOFError, "pipe closed" if @done @queue.push(obj) @pop_sem.signal self end def pop @pop_sem.wait if not @queue.empty? @queue.shift elsif @done @pop_sem.signal @had_done = true nil else raise "Unexpected" end ensure @push_sem.signal if @maxqlen end def pop_eof? @pop_sem.wait if not @queue.empty? [@queue.shift, false] elsif @done @pop_sem.signal @had_done = true [nil, true] else raise "Unxpected" end ensure @push_sem.signal if @maxqlen end def dequeue @pop_sem.wait if not @queue.empty? @queue.shift elsif @done @pop_sem.signal @had_done = true raise EOFError, "out of data stream" else raise "Unexpected" end ensure @push_sem.signal if @maxqlen end def each raise EOFError, "out of data stream" if @had_done while (obj, eof = pop_eof?; not eof) yield obj end end def generator(gen, *args) $stderr.puts "Pipe had a thread already" if $VERBOSE and @thread @block = Proc.new { gen.call(*args) { |obj| self.push(obj) } } spawn_thread self end end module Enumerable def pipe(maxqlen = nil) if block_given? Pipe.new(maxqlen) { |pipe| self.each do |item| pipe.push(yield(item)) end } else Pipe.new(maxqlen) { |pipe| self.each do |item| pipe.push(item) end } end end alias collect_in_parallel pipe def find_all_in_parallel Pipe.new { |pipe| self.each do |item| pipe.push(item) if yield(item) end } end def find_all_in_parallel_unordered Pipe.new { |pipe| self.each_in_parallel do |item| pipe.push(item) if yield(item) end } end def each_product_in_parallel(opt = {}) pipes = self.map { |item| item.pipe } loop { had_elem = false had_short_item = false valarray = [] pipes.each do |item| value, eof = item.pop_eof? had_elem = true if not eof had_short_item = true if eof valarray.push(value) end break if not had_elem raise IndexError if had_short_item and opt[:raise_unequal_length] yield(valarray) } self end def step(n) Pipe.new { |pipe| self.each_with_index do |item, index| pipe.push(item) if index % n == 0 end } end end # end of module Enumerable if $0 == __FILE__ # Pipe can be used as generic inter-thread communication channel pipe_1 = Pipe.new sender = Thread.new { 1000.times do |i| pipe_1.push(i) end; pipe_1.close } receiver = Thread.new { until (i, eof = pipe_1.pop_eof?; eof); end } sender.join; receiver.join # Pipe spawns a thread if block is given, closes automatically when exiting the block pipe_2 = Pipe.new { |p| 1000.times do |i| p.push(i) end } begin i, eof = pipe_2.pop_eof? end until eof # Pipe is Enumerable, thus any methods of Enumerable can be used pipe_3 = Pipe.new { |p| 1000.times do |i| p.push(i) end } pipe_3.find_all { |x| x % 31 == 0 }.each do |x| print x, " " end # Also, any Enumerable can be converted to Pipe # Pipe itself is an Enumerable, so that it is possible to cascade pipes (1..100).pipe{|x| 2**x}.pipe{|x| x % 31}.each do |x| puts x end [1..9, 'a'..'i'].each_product_in_parallel do |array| p array end # Generator-like functions can be converted to Enumerable using Pipe#generator def permute(a) if a.empty? then yield [] else a.each_with_index do |a_i, i| permute(a[0...i]+a[i+1..-1]) { |p| yield [a_i, *p] } end end end pipe_4 = Pipe.new(1).generator(method(:permute), [1,2,3]) pipe_4.each do |x| p x end end