Changeset 212
- Timestamp:
- 12/03/07 06:36:05 (1 year ago)
- Files:
-
- branches/version10/framework/packet_master.rb (modified) (4 diffs)
- branches/version10/framework/worker.rb (modified) (1 diff)
- branches/version10/lib/backgroundrb.rb (modified) (3 diffs)
- branches/version10/server/master_worker.rb (modified) (2 diffs)
- branches/version10/server/meta_worker.rb (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/version10/framework/packet_master.rb
r210 r212 63 63 end 64 64 def_delegators(:@reactor, :start_server, :connect, :add_periodic_timer, \ 65 :add_timer, :cancel_timer,:reconnect, :start_worker )65 :add_timer, :cancel_timer,:reconnect, :start_worker,:delete_worker) 66 66 67 67 end … … 85 85 end 86 86 87 def delete_worker(worker_options = {}) 88 p "Calling delete worker #{worker_options}" 89 worker_name = worker_options[:worker] 90 worker_name_key = gen_worker_key(worker_name,worker_options[:job_key]) 91 worker_options[:method] = :exit 92 @live_workers[worker_name_key].send_request(worker_options) 93 end 94 87 95 # method loads workers in new processes 88 96 # FIXME: this method can be fixed, so as worker code can be actually, required … … 90 98 # where worker is actually required in master as well as in worker. 91 99 def load_workers 92 93 100 if defined?(WORKER_ROOT) 94 101 worker_root = WORKER_ROOT … … 115 122 def start_worker(worker_options = { }) 116 123 worker_name = worker_options[:worker].to_s 124 worker_name_key = gen_worker_key(worker_name,worker_options[:job_key]) 125 return if @live_workers[worker_name_key] 117 126 worker_options.delete(:worker) 118 127 require worker_name branches/version10/framework/worker.rb
r211 r212 29 29 end 30 30 31 # when you directly call send_data from a worker, it always, sends data32 # as an object33 31 def send_data p_data 34 32 dump_object(p_data,msg_writer) branches/version10/lib/backgroundrb.rb
r211 r212 44 44 end 45 45 46 def do_work p_data 47 p_data[:type] = :do_work 48 establish_connection() 49 raise "Error Connecting to Master Process" unless @connection_status 46 def new_worker p_data 47 p_data[:type] = :start_worker 48 establish_connection 49 raise "Error connecting to master process" unless @connection_status 50 dump_object(p_data,@connection) 51 end 52 53 def delete_worker p_data 54 p_data[:type] = :delete_worker 55 establish_connection 56 raise "Error connecting to master process" unless @connection_status 50 57 dump_object(p_data,@connection) 51 58 end … … 67 74 rescue 68 75 raise "Error reading from master" 76 end 77 end 78 79 def query_all_workers 80 p_data = { } 81 p_data[:type] = :all_worker_status 82 establish_connection 83 raise "Err connecting to master process" unless @connection_status 84 dump_object(p_data,@connection) 85 begin 86 ret_val = select([@connection],nil,nil,3) 87 return nil unless ret_val 88 raw_response = read_object() 89 master_response = Marshal.load(raw_response) 90 return master_response 91 rescue 92 puts $! 93 puts $!.backtrace 94 return nil 69 95 end 70 96 end … … 96 122 dump_object(p_data,@connection) 97 123 begin 98 ret_val = select([@connection],nil,nil, 3)124 ret_val = select([@connection],nil,nil,nil) 99 125 return nil unless ret_val 100 126 raw_response = read_object() branches/version10/server/master_worker.rb
r211 r212 25 25 when :get_result: process_request(t_data) 26 26 when :start_worker: start_worker_request(t_data) 27 when :delete_worker: delete_drb_worker(t_data) 28 when :all_worker_status: query_all_worker_status(t_data) 27 29 end 28 30 end 29 31 end 30 32 33 def query_all_worker_status(p_data) 34 dumpable_status = { } 35 workers.each { |key,value| dumpable_status[key] = value.worker_status } 36 send_object(dumpable_status) 37 end 38 39 def delete_drb_worker(t_data) 40 worker_name = t_data[:worker] 41 job_key = t_data[:job_key] 42 ask_worker(worker_name,:job_key => t_data[:job_key],:type => :request, :data => { :method => :exit}) 43 end 44 31 45 def start_worker_request(p_data) 32 worker_name = p_data[:worker] 33 p_data.delete(:worker) 34 p_data.delete(:type) 35 start_worker(worker_name,p_data) 46 start_worker(p_data) 36 47 end 37 48 … … 72 83 def initialize 73 84 config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 74 Packet::Reactor.server_logger = "#{RAILS_HOME}/log/backgroundrb_server.log"85 # Packet::Reactor.server_logger = "#{RAILS_HOME}/log/backgroundrb_server.log" 75 86 Packet::Reactor.run do |t_reactor| 76 87 t_reactor.start_worker(:worker => "log_worker") branches/version10/server/meta_worker.rb
r210 r212 23 23 @config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 24 24 @logger = PacketLogger.new(self) 25 if @config_file[:schedules] 25 if(@worker_options[:schedule] && no_auto_load) 26 load_schedule_from_args 27 elsif @config_file[:schedules] 26 28 @my_schedule = @config_file[:schedules][worker_name.to_sym] 27 29 load_schedule if @my_schedule 28 30 end 29 create if respond_to?(:create) 31 if respond_to?(:create) 32 create(@worker_options[:data]) 33 end 30 34 end 31 35 32 36 # a user may pass trigger arguments to dynamically define the schedule 33 37 def load_schedule_from_args 34 38 @my_schedule = @worker_options[:schedule] 39 load_schedule if @my_schedule 35 40 end 36 41
