Changeset 209
- Timestamp:
- 11/21/07 11:53:11 (1 year ago)
- Files:
-
- branches/version10/framework/core.rb (modified) (1 diff)
- branches/version10/framework/packet.rb (modified) (3 diffs)
- branches/version10/framework/packet_master.rb (modified) (7 diffs)
- branches/version10/framework/worker.rb (modified) (3 diffs)
- branches/version10/server/log_worker.rb (added)
- branches/version10/server/master_worker.rb (modified) (3 diffs)
- branches/version10/server/meta_worker.rb (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/version10/framework/core.rb
r207 r209 204 204 @timer_hash.each do |key,timer| 205 205 if timer.run_now? 206 #begin207 206 timer.run 208 # rescue209 # @timer_hash.delete(key)210 # end211 207 @timer_hash.delete(key) if !timer.respond_to?(:interval) 212 208 end branches/version10/framework/packet.rb
r207 r209 1 require "rubygems"2 1 require "socket" 3 require "base64"4 2 require "yaml" 5 3 require "forwardable" … … 28 26 require "connection" 29 27 require "worker" 30 #require "cpu_worker"31 28 32 29 # This file is just a runner of things and hence does basic initialization of thingies required for running … … 34 31 35 32 36 PACKET_APP =File.expand_path'../' unless defined?(PACKET_APP)33 PACKET_APP = File.expand_path'../' unless defined?(PACKET_APP) 37 34 38 35 module Packet branches/version10/framework/packet_master.rb
r207 r209 11 11 def self.run 12 12 master_reactor_instance = new 13 master_reactor_instance.live_workers = DoubleKeyedHash.new 13 14 yield(master_reactor_instance) 14 15 master_reactor_instance.load_workers … … 54 55 dump_object(p_object,connection) 55 56 end 56 def_delegators :@reactor, :start_server, :connect, :add_periodic_timer, :add_timer, :cancel_timer,:reconnect 57 def_delegators :@reactor, :start_server, :connect, :add_periodic_timer, :add_timer, :cancel_timer,:reconnect, :start_worker 57 58 end 58 59 handler_instance.workers = @live_workers … … 80 81 # where worker is actually required in master as well as in worker. 81 82 def load_workers 82 @live_workers = DoubleKeyedHash.new 83 83 84 if defined?(WORKER_ROOT) 84 85 worker_root = WORKER_ROOT … … 92 93 require worker_name 93 94 worker_klass = Object.const_get(worker_name.classify) 95 next if worker_klass.no_auto_load 94 96 fork_and_load(worker_klass) 95 97 end 96 98 97 # FIXME: easiest and yet perhaps a bit ugly 99 # FIXME: easiest and yet perhaps a bit ugly, its just to make sure that from each 100 # worker proxy one can access other workers 98 101 @live_workers.each do |key,worker_instance| 99 102 worker_instance.workers = @live_workers … … 101 104 end 102 105 106 def start_worker(worker_name,options = {}) 107 require worker_name.to_s 108 worker_klass = Object.const_get(worker_name.classify) 109 fork_and_load(worker_klass,options) 110 end 111 103 112 # method forks given worker file in a new process 104 def fork_and_load(worker_klass )113 def fork_and_load(worker_klass,worker_options = { }) 105 114 t_worker_name = worker_klass.worker_name 106 115 worker_pimp = worker_klass.worker_proxy.to_s … … 110 119 # socket to which master process is going to write 111 120 worker_read_end,master_write_end = UNIXSocket.pair(Socket::SOCK_STREAM) 121 worker_read_fd,master_write_fd = UNIXSocket.pair 112 122 113 worker_read_fd,master_write_fd = UNIXSocket.pair114 123 if((pid = fork()).nil?) 115 # close file handles which are not required in child116 124 $0 = "ruby #{worker_klass.worker_name}" 117 125 master_write_end.close … … 119 127 master_write_fd.close 120 128 # master_write_end.close if master_write_end 121 worker_klass.start_worker(:write_end => worker_write_end,:read_end => worker_read_end,:read_fd => worker_read_fd )129 worker_klass.start_worker(:write_end => worker_write_end,:read_end => worker_read_end,:read_fd => worker_read_fd,:options => worker_options) 122 130 end 123 131 Process.detach(pid) 124 # if no pimp exists for the given class then we should create a pimp class for the worker 125 # meta programmatically. 132 126 133 unless worker_pimp.blank? 127 134 require worker_pimp branches/version10/framework/worker.rb
r207 r209 5 5 iattr_accessor :fd_reader,:msg_writer,:msg_reader,:worker_name 6 6 iattr_accessor :worker_proxy 7 attr_accessor :worker_started 7 iattr_accessor :no_auto_load 8 attr_accessor :worker_started, :worker_options 8 9 after_connection :provide_workers 9 10 … … 15 16 @fd_reader = messengers[:read_fd] 16 17 t_instance = new 18 t_instance.worker_options = messengers[:options] 17 19 t_instance.worker_init if t_instance.respond_to?(:worker_init) 18 20 t_instance.start_reactor … … 36 38 send_data(:data => t_data,:function => options[:function],:callback_signature => t_callback.signature) 37 39 else 38 send_data(:data => t_data,:function => options[:function] )40 send_data(:data => t_data,:function => options[:function],:requested_worker => options[:worker],:requesting_worker => worker_name,:type => :request) 39 41 end 40 42 end branches/version10/server/master_worker.rb
r206 r209 10 10 require "cron_trigger" 11 11 require "trigger" 12 require "log_worker" 12 13 13 14 require File.expand_path("#{RAILS_HOME}/config/environment.rb") … … 57 58 end 58 59 def connection_completed; end 59 60 def log log_data61 workers[:log_worker].send_request(:type => :request, :data => log_data)62 end63 60 end 64 61 … … 68 65 config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 69 66 Packet::Reactor.run do |t_reactor| 67 t_reactor.start_worker("log_worker") 70 68 t_reactor.start_server(config_file[:backgroundrb][:ip],config_file[:backgroundrb][:port],MasterWorker) 71 69 end branches/version10/server/meta_worker.rb
r196 r209 1 1 # FIXME: need to wrap workers within a namespace 2 2 module BackgrounDRb 3 # this class is a dummy class that implements things required for passing data to 4 # actual logger worker 5 class PacketLogger 6 def initialize(worker) 7 @worker = worker 8 end 9 def info(p_data) 10 @worker.send_request(:worker => :log_worker, :data => p_data) 11 end 12 13 def debug 14 @worker.send_request(:worker => :log_worker, :data => p_data) 15 end 16 end 17 3 18 class MetaWorker < Packet::Worker 4 19 attr_accessor :config_file, :my_schedule, :run_time, :trigger_type, :trigger 20 attr_accessor :logger 5 21 6 22 def worker_init 7 23 @config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 24 @logger = PacketLogger.new(self) 8 25 if @config_file[:schedules] 9 26 @my_schedule = @config_file[:schedules][worker_name.to_sym] … … 11 28 end 12 29 create if respond_to?(:create) 30 end 31 32 # a user may pass trigger arguments to dynamically define the schedule 33 def load_schedule_from_args 34 13 35 end 14 36
