Class: ZTK::Parallel

Inherits:
Base
  • Object
show all
Defined in:
lib/ztk/parallel.rb

Overview

Parallel Processing Class

This class can be used to easily run iterative and linear processes in a parallel manner.

The before fork callback is called once in the parent process.

The after fork callback is called twice, once in the parent process and once in the child process.

Examples:

Parallel processing with callbacks


a_callback = Proc.new do |pid|
  puts "Hello from After Callback - PID #{pid}"
end

b_callback = Proc.new do |pid|
  puts "Hello from Before Callback - PID #{pid}"
end

parallel = ZTK::Parallel.new
parallel.config do |config|
  config.before_fork = b_callback
  config.after_fork = a_callback
end

puts Process.pid.inspect

3.times do |x|
  parallel.process do
    x
  end
end

parallel.waitall
puts parallel.results.inspect

Author:

Defined Under Namespace

Classes: Break, ExceptionWrapper

Constant Summary

MAX_FORKS =

Default Maximum Number of Forks

case RUBY_PLATFORM
when /darwin/ then
  %x( sysctl hw.ncpu ).strip.split(':').last.strip.to_i
when /linux/ then
  %x( grep -c processor /proc/cpuinfo ).strip.strip.to_i
end
MAX_MEMORY =

Platforms memory capacity in bytes

case RUBY_PLATFORM
when /darwin/ then
  %x( sysctl hw.memsize ).strip.split[1].to_i
when /linux/ then
  (%x( grep MemTotal /proc/meminfo ).strip.split[1].to_i * 1024)
end

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods inherited from Base

build_config, #config, #direct_log, hash_config, log_and_raise, #log_and_raise

Constructor Details

- (Parallel) initialize(configuration = {})

Returns a new instance of Parallel

Parameters:

  • configuration (Hash) (defaults to: {})

    Configuration options hash.

  • config (Hash)

    a customizable set of options



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/ztk/parallel.rb', line 86

def initialize(configuration={})
  super({
    :max_forks        => MAX_FORKS,
    :raise_exceptions => true
  }, configuration)

  (config.max_forks < 1) and log_and_raise(ParallelError, "max_forks must be equal to or greater than one!")

  @forks = Array.new
  @results = Array.new
  GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true

  trapped_signals = %w( term int hup )
  trapped_signals << "kill" if RUBY_VERSION < "2.2.0"

  trapped_signals.map(&:upcase).each do |signal|
    Signal.trap(signal) do
      $stderr.puts("SIG#{signal} received by PID##{Process.pid}; signaling child processes...")

      signal_all(signal)
      exit!(1)
    end
  end
end

Instance Attribute Details

- (Object) results

Result Set



80
81
82
# File 'lib/ztk/parallel.rb', line 80

def results
  @results
end

Instance Method Details

- (Integer) count

Count of active forks.

Returns:

  • (Integer)

    Current number of active forks.



231
232
233
234
# File 'lib/ztk/parallel.rb', line 231

def count
  config.ui.logger.debug { "count(#{@forks.count})" }
  @forks.count
end

- (Array<Integer>) pids

Child PIDs

Returns:

  • (Array<Integer>)

    An array of child PIDs, if any.



239
240
241
# File 'lib/ztk/parallel.rb', line 239

def pids
  @forks.collect{ |fork| fork[:pid] }
end

- (Integer) process { ... }

Process in parallel.

Yields:

  • Block should execute tasks to be performed in parallel.

Yield Returns:

  • (Object)

    Block can return any object to be marshalled back to the parent processes result set.

Returns:

  • (Integer)

    Returns the pid of the child process forked.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/ztk/parallel.rb', line 117

def process(&block)
  !block_given? and log_and_raise(ParallelError, "You must supply a block to the process method!")

  config.ui.logger.debug { "forks(#{@forks.inspect})" }

  while (@forks.count >= config.max_forks) do
    wait
  end

  child_reader, parent_writer = IO.pipe
  parent_reader, child_writer = IO.pipe

  config.before_fork and config.before_fork.call(Process.pid)
  pid = Process.fork do

    config.after_fork and config.after_fork.call(Process.pid)

    parent_writer.close
    parent_reader.close

    data = nil
    begin
      data = block.call
    rescue Exception => e
      config.ui.logger.fatal { e.message }
      e.backtrace.each do |line|
        config.ui.logger << line
        config.ui.logger << "\n"
      end
      data = ExceptionWrapper.new(e)
    end

    if !data.nil?
      config.ui.logger.debug { "write(#{data.inspect})" }
      child_writer.write(Base64.encode64(Marshal.dump(data)))
    end

    child_reader.close
    child_writer.close

    Process.exit!(0)
  end
  config.after_fork and config.after_fork.call(Process.pid)

  child_reader.close
  child_writer.close

  fork = {:reader => parent_reader, :writer => parent_writer, :pid => pid}
  @forks << fork

  pid
end

- (Integer) signal_all(signal = "KILL")

Signals all forks.

Returns:

  • (Integer)

    The number of processes signaled.



213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/ztk/parallel.rb', line 213

def signal_all(signal="KILL")
  signaled = 0
  if (!@forks.nil? && (@forks.count > 0))
    @forks.each do |fork|
      begin
        Process.kill(signal, fork[:pid])
        signaled += 1
      rescue
        nil
      end
    end
  end
  signaled
end

- (Array<pid, status, data>) wait(flags = 0)

Wait for a single fork to finish.

If a fork successfully finishes, it's return value from the process block is stored into the main result set.

Returns:

  • (Array<pid, status, data>)

    An array containing the pid, status and data returned from the process block. If wait2() fails nil is returned.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/ztk/parallel.rb', line 178

def wait(flags=0)
  config.ui.logger.debug { "wait" }
  config.ui.logger.debug { "forks(#{@forks.inspect})" }
  pid, status = (Process.wait2(-1, flags) rescue nil)

  if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil?
    data = (Marshal.load(Base64.decode64(fork[:reader].read.to_s)) rescue nil)
    config.ui.logger.debug { "read(#{data.inspect})" }

    data = process_data(data)
    !data.nil? and @results.push(data)

    fork[:reader].close
    fork[:writer].close

    @forks -= [fork]
    return [pid, status, data]
  end
  nil
end

- (Array<Object>) waitall

Waits for all forks to finish.

Returns:

  • (Array<Object>)

    The results from all of the process blocks.



202
203
204
205
206
207
208
# File 'lib/ztk/parallel.rb', line 202

def waitall
  config.ui.logger.debug { "waitall" }
  while @forks.count > 0
    self.wait
  end
  @results
end