root/trunk/server/lib/master_worker.rb

Revision 330, 5.8 kB (checked in by gethema..@gmail.com, 3 months ago)

migrate to 1.1 version

Line 
1 #!/usr/bin/env ruby
2 module BackgrounDRb
3   # Class wraps a logger object for debugging internal errors within server
4   class DebugMaster
5     attr_accessor :log_mode,:logger,:log_flag
6     def initialize(log_mode,log_flag = true)
7       @log_mode = log_mode
8       @log_flag = log_flag
9       if @log_mode == :foreground
10         @logger = ::Logger.new(STDOUT)
11       else
12         @logger = ::Logger.new("#{RAILS_HOME}/log/backgroundrb_debug_#{BDRB_CONFIG[:backgroundrb][:port]}.log")
13       end
14     end
15
16     def info(data)
17       return unless @log_flag
18       @logger.info(data)
19     end
20
21     def debug(data)
22       return unless @log_flag
23       @logger.debug(data)
24     end
25   end
26
27   class MasterWorker
28     attr_accessor :debug_logger
29     include BackgrounDRb::BdrbServerHelper
30     # receives requests from rails and based on request type invoke appropriate method
31     def receive_data p_data
32       @tokenizer.extract(p_data) do |b_data|
33         begin
34           t_data = load_data b_data
35           if t_data
36             case t_data[:type]
37               # async method invocation
38             when :async_invoke: async_method_invoke(t_data)
39               # get status/result
40             when :get_result: get_result_object(t_data)
41               # sync method invocation
42             when :sync_invoke: method_invoke(t_data)
43             when :start_worker: start_worker_request(t_data)
44             when :delete_worker: delete_drb_worker(t_data)
45             when :worker_info: pass_worker_info(t_data)
46             when :all_worker_info: all_worker_info(t_data)
47             else; debug_logger.info("Invalid request")
48             end
49           end
50         rescue Exception => e
51           debug_logger.info(e)
52           debug_logger.info(e.backtrace.join("\n"))
53           send_object(nil)
54         end
55       end
56     end
57
58     # Send worker info to the user
59     def pass_worker_info(t_data)
60       worker_name_key = gen_worker_key(t_data[:worker],t_data[:worker_key])
61       worker_instance = reactor.live_workers[worker_name_key]
62       info_response = { :worker => t_data[:worker],:worker_key => t_data[:worker_key]}
63       worker_instance ? (info_response[:status] = :running) : (info_response[:status] = :stopped)
64       send_object(info_response)
65     end
66
67     # collect all worker info in an array and send to the user
68     def all_worker_info(t_data)
69       info_response = []
70       reactor.live_workers.each do |key,value|
71         worker_key = (value.worker_key.to_s).gsub(/#{value.worker_name}_?/,"")
72         info_response << { :worker => value.worker_name,:worker_key => worker_key,:status => :running }
73       end
74       send_object(info_response)
75     end
76
77     # Delete the worker. Sends TERM signal to the worker process and removes
78     # worker key from list of available workers
79     def delete_drb_worker(t_data)
80       worker_name = t_data[:worker]
81       worker_key = t_data[:worker_key]
82       worker_name_key = gen_worker_key(worker_name,worker_key)
83       begin
84         worker_instance = reactor.live_workers[worker_name_key]
85         raise Packet::InvalidWorker.new("Invalid worker with name #{worker_name} key #{worker_key}") unless worker_instance
86         Process.kill('TERM',worker_instance.pid)
87         # Warning: Change is temporary, may break things
88         reactor.live_workers.delete(worker_name_key)
89       rescue Packet::DisconnectError => sock_error
90         reactor.remove_worker(sock_error)
91       rescue
92         debug_logger.info($!.to_s)
93         debug_logger.info($!.backtrace.join("\n"))
94       end
95     end
96
97     # start a new worker
98     def start_worker_request(p_data)
99       start_worker(p_data)
100     end
101
102     # Invoke an asynchronous method on a worker
103     def async_method_invoke(t_data)
104       worker_name = t_data[:worker]
105       worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
106       t_data.delete(:worker)
107       t_data.delete(:type)
108       begin
109         ask_worker(worker_name_key,:data => t_data, :type => :request, :result => false)
110       rescue Packet::DisconnectError => sock_error
111         reactor.live_workers.delete(worker_name_key)
112       rescue
113         debug_logger.info($!.message)
114         debug_logger.info($!.backtrace.join("\n"))
115         return
116       end
117     end
118
119     # Given a cache key, ask the worker for result stored in it.
120     # If you are using Memcache for result storage, this method won't be
121     # called at all and bdrb client library will directly fetch
122     # the results from memcache and return
123     def get_result_object(t_data)
124       worker_name = t_data[:worker]
125       worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
126       t_data.delete(:worker)
127       t_data.delete(:type)
128       begin
129         ask_worker(worker_name_key,:data => t_data, :type => :get_result,:result => true)
130       rescue Packet::DisconnectError => sock_error
131         reactor.live_workers.delete(worker_name_key)
132       rescue
133         debug_logger.info($!.to_s)
134         debug_logger.info($!.backtrace.join("\n"))
135         return
136       end
137     end
138
139     # Invoke a synchronous/blocking method on a worker.
140     def method_invoke(t_data)
141       worker_name = t_data[:worker]
142       worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
143       t_data.delete(:worker)
144       t_data.delete(:type)
145       begin
146         ask_worker(worker_name_key,:data => t_data, :type => :request,:result => true)
147       rescue Packet::DisconnectError => sock_error
148         reactor.live_workers.delete(worker_name_key)
149       rescue
150         debug_logger.info($!.message)
151         debug_logger.info($!.backtrace.join("\n"))
152         return
153       end
154     end
155
156     # Receieve responses from workers and dispatch them back to the client
157     def worker_receive p_data
158       send_object(p_data)
159     end
160
161     def unbind; end
162
163     # called whenever a new connection is made.Initializes binary data parser
164     def post_init
165       @tokenizer = Packet::BinParser.new
166     end
167     def connection_completed; end
168   end
169 end
170
171
172
173
Note: See TracBrowser for help on using the browser.