Class: ZTK::Parallel
Overview
Parallel Processing Class
This class can be used to easily run iterative and linear processes in a parallel manner.
The before fork callback is called once in the parent process.
The after fork callback is called twice, once in the parent process and once in the child process.
Defined Under Namespace
Classes: Break, ExceptionWrapper
Constant Summary
- MAX_FORKS =
Default Maximum Number of Forks
case RUBY_PLATFORM when /darwin/ then %x( sysctl hw.ncpu ).strip.split(':').last.strip.to_i when /linux/ then %x( grep -c processor /proc/cpuinfo ).strip.strip.to_i end
- MAX_MEMORY =
Platforms memory capacity in bytes
case RUBY_PLATFORM when /darwin/ then %x( sysctl hw.memsize ).strip.split[1].to_i when /linux/ then (%x( grep MemTotal /proc/meminfo ).strip.split[1].to_i * 1024) end
Instance Attribute Summary (collapse)
-
- (Object) results
Result Set.
Instance Method Summary (collapse)
-
- (Integer) count
Count of active forks.
-
- (Parallel) initialize(configuration = {})
constructor
A new instance of Parallel.
-
- (Array<Integer>) pids
Child PIDs.
-
- (Integer) process { ... }
Process in parallel.
-
- (Integer) signal_all(signal = "KILL")
Signals all forks.
-
- (Array<pid, status, data>) wait(flags = 0)
Wait for a single fork to finish.
-
- (Array<Object>) waitall
Waits for all forks to finish.
Methods inherited from Base
build_config, #config, #direct_log, hash_config, log_and_raise, #log_and_raise
Constructor Details
- (Parallel) initialize(configuration = {})
Returns a new instance of Parallel
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/ztk/parallel.rb', line 86 def initialize(configuration={}) super({ :max_forks => MAX_FORKS, :raise_exceptions => true }, configuration) (config.max_forks < 1) and log_and_raise(ParallelError, "max_forks must be equal to or greater than one!") @forks = Array.new @results = Array.new GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true trapped_signals = %w( term int hup ) trapped_signals << "kill" if RUBY_VERSION < "2.2.0" trapped_signals.map(&:upcase).each do |signal| Signal.trap(signal) do $stderr.puts("SIG#{signal} received by PID##{Process.pid}; signaling child processes...") signal_all(signal) exit!(1) end end end |
Instance Attribute Details
- (Object) results
Result Set
80 81 82 |
# File 'lib/ztk/parallel.rb', line 80 def results @results end |
Instance Method Details
- (Integer) count
Count of active forks.
231 232 233 234 |
# File 'lib/ztk/parallel.rb', line 231 def count config.ui.logger.debug { "count(#{@forks.count})" } @forks.count end |
- (Array<Integer>) pids
Child PIDs
239 240 241 |
# File 'lib/ztk/parallel.rb', line 239 def pids @forks.collect{ |fork| fork[:pid] } end |
- (Integer) process { ... }
Process in parallel.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/ztk/parallel.rb', line 117 def process(&block) !block_given? and log_and_raise(ParallelError, "You must supply a block to the process method!") config.ui.logger.debug { "forks(#{@forks.inspect})" } while (@forks.count >= config.max_forks) do wait end child_reader, parent_writer = IO.pipe parent_reader, child_writer = IO.pipe config.before_fork and config.before_fork.call(Process.pid) pid = Process.fork do config.after_fork and config.after_fork.call(Process.pid) parent_writer.close parent_reader.close data = nil begin data = block.call rescue Exception => e config.ui.logger.fatal { e. } e.backtrace.each do |line| config.ui.logger << line config.ui.logger << "\n" end data = ExceptionWrapper.new(e) end if !data.nil? config.ui.logger.debug { "write(#{data.inspect})" } child_writer.write(Base64.encode64(Marshal.dump(data))) end child_reader.close child_writer.close Process.exit!(0) end config.after_fork and config.after_fork.call(Process.pid) child_reader.close child_writer.close fork = {:reader => parent_reader, :writer => parent_writer, :pid => pid} @forks << fork pid end |
- (Integer) signal_all(signal = "KILL")
Signals all forks.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/ztk/parallel.rb', line 213 def signal_all(signal="KILL") signaled = 0 if (!@forks.nil? && (@forks.count > 0)) @forks.each do |fork| begin Process.kill(signal, fork[:pid]) signaled += 1 rescue nil end end end signaled end |
- (Array<pid, status, data>) wait(flags = 0)
Wait for a single fork to finish.
If a fork successfully finishes, it's return value from the process block is stored into the main result set.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/ztk/parallel.rb', line 178 def wait(flags=0) config.ui.logger.debug { "wait" } config.ui.logger.debug { "forks(#{@forks.inspect})" } pid, status = (Process.wait2(-1, flags) rescue nil) if !pid.nil? && !status.nil? && !(fork = @forks.select{ |f| f[:pid] == pid }.first).nil? data = (Marshal.load(Base64.decode64(fork[:reader].read.to_s)) rescue nil) config.ui.logger.debug { "read(#{data.inspect})" } data = process_data(data) !data.nil? and @results.push(data) fork[:reader].close fork[:writer].close @forks -= [fork] return [pid, status, data] end nil end |
- (Array<Object>) waitall
Waits for all forks to finish.
202 203 204 205 206 207 208 |
# File 'lib/ztk/parallel.rb', line 202 def waitall config.ui.logger.debug { "waitall" } while @forks.count > 0 self.wait end @results end |