root/trunk/server/lib/master_proxy.rb

Revision 328, 4.8 kB (checked in by gethema..@gmail.com, 3 months ago)

update trunk for 1.1 release

Line 
1 module BackgrounDRb
2   class MasterProxy
3     attr_accessor :reloadable_workers,:worker_triggers,:reactor
4     def initialize
5       raise "Running old Ruby version, upgrade to Ruby >= 1.8.5" unless check_for_ruby_version
6
7       log_flag = BDRB_CONFIG[:backgroundrb][:debug_log].nil? ? true : BDRB_CONFIG[:backgroundrb][:debug_log]
8       debug_logger = DebugMaster.new(BDRB_CONFIG[:backgroundrb][:log],log_flag)
9
10       load_rails_env
11
12       find_reloadable_worker
13
14       Packet::Reactor.run do |t_reactor|
15         @reactor = t_reactor
16         t_reactor.start_worker(:worker => :log_worker,:worker_env => false) if log_flag
17         t_reactor.start_server(BDRB_CONFIG[:backgroundrb][:ip],
18                                BDRB_CONFIG[:backgroundrb][:port],MasterWorker) do |conn|
19           conn.debug_logger = debug_logger
20         end
21         t_reactor.next_turn { reload_workers }
22       end
23     end
24
25     # FIXME: Method by same name exists in Packet::NbioHelper module
26     def gen_worker_key(worker_name,worker_key = nil)
27       return worker_name if worker_key.nil?
28       return "#{worker_name}_#{worker_key}".to_sym
29     end
30
31
32     # method should find reloadable workers and load their schedule from config file
33     def find_reloadable_worker
34       t_workers = Dir["#{WORKER_ROOT}/**/*.rb"]
35       @reloadable_workers = t_workers.map do |x|
36         worker_name = File.basename(x,".rb")
37         require worker_name
38         worker_klass = Object.const_get(worker_name.classify)
39         worker_klass.reload_flag ? worker_klass : nil
40       end.compact
41       @worker_triggers = { }
42       @reloadable_workers.each do |t_worker|
43         schedule = load_reloadable_schedule(t_worker)
44         if schedule && !schedule.empty?
45           @worker_triggers[t_worker.worker_name.to_sym] = schedule
46         end
47       end
48     end
49
50     # load schedule of workers which should be restarted on schedule
51     def load_reloadable_schedule(t_worker)
52       worker_method_triggers = { }
53       all_schedules = BDRB_CONFIG[:schedules]
54       return if all_schedules.nil? or all_schedules.empty?
55       worker_schedule = all_schedules[t_worker.worker_name.to_sym]
56
57       worker_schedule && worker_schedule.each do |key,value|
58         case value[:trigger_args]
59         when String
60           cron_args = value[:trigger_args] || "0 0 0 0 0"
61           trigger = BackgrounDRb::CronTrigger.new(cron_args)
62           worker_method_triggers[key] = {
63             :trigger => trigger,:data => value[:data],
64             :runtime => trigger.fire_after_time(Time.now).to_i
65           }
66         when Hash
67           trigger = BackgrounDRb::Trigger.new(value[:trigger_args])
68           worker_method_triggers[key] = {
69             :trigger => trigger,:data => value[:trigger_args][:data],
70             :runtime => trigger.fire_after_time(Time.now).to_i
71           }
72         end
73       end
74       worker_method_triggers
75     end
76
77     # Start the workers whose schedule has come
78     def reload_workers
79       return if worker_triggers.empty?
80       worker_triggers.each do |key,value|
81         value.delete_if { |key,value| value[:trigger].respond_to?(:end_time) && value[:trigger].end_time <= Time.now }
82       end
83
84       worker_triggers.each do |worker_name,trigger|
85         trigger.each do |key,value|
86           time_now = Time.now.to_i
87           if value[:runtime] < time_now
88             load_and_invoke(worker_name,key,value)
89             t_time = value[:trigger].fire_after_time(Time.now)
90             value[:runtime] = t_time.to_i
91           end
92         end
93       end
94     end
95
96     # method will load the worker and invoke worker method
97     def load_and_invoke(worker_name,p_method,data)
98       begin
99         require worker_name.to_s
100         worker_key = Packet::Guid.hexdigest
101         @reactor.start_worker(:worker => worker_name,:worker_key => worker_key,:disable_log => true)
102         worker_name_key = gen_worker_key(worker_name,worker_key)
103         data_request = {:data => { :worker_method => p_method,:arg => data[:data]},
104           :type => :request, :result => false
105         }
106
107         exit_request = {:data => { :worker_method => :exit},
108           :type => :request, :result => false
109         }
110         t_worker = @reactor.live_workers[worker_name_key]
111         if t_worker
112           t_worker.send_request(data_request)
113           t_worker.send_request(exit_request)
114         end
115       rescue LoadError
116         puts "no such worker #{worker_name}"
117       rescue MissingSourceFile
118         puts "no such worker #{worker_name}"
119         return
120       end
121     end
122
123     def load_rails_env
124       db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result)
125       run_env = ENV["RAILS_ENV"]
126       ActiveRecord::Base.establish_connection(db_config_file[run_env])
127       ActiveRecord::Base.allow_concurrency = true
128     end
129
130     def check_for_ruby_version; RUBY_VERSION >= "1.8.5"; end
131   end # end of module BackgrounDRb
132 end
133
Note: See TracBrowser for help on using the browser.