root/trunk/lib/backgroundrb/rails_worker_proxy.rb

Revision 330, 6.0 kB (checked in by gethema..@gmail.com, 1 month ago)

migrate to 1.1 version

Line 
1 module BackgrounDRb
2   # A Worker proxy, which uses +method_missing+ for delegating method calls to the workers
3   class RailsWorkerProxy
4     attr_accessor :worker_name, :worker_method, :data, :worker_key,:middle_man
5
6     # create new worker proxy
7     def initialize(p_worker_name,p_worker_key = nil,p_middle_man = nil)
8       @worker_name = p_worker_name
9       @middle_man = p_middle_man
10       @worker_key = p_worker_key
11       @tried_connections = []
12     end
13
14     def method_missing(method_id,*args)
15       worker_method = method_id.to_s
16       arguments = args.first
17
18       arg,job_key,host_info,scheduled_at = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at)
19       new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc
20
21       if worker_method =~ /^async_(\w+)/
22         method_name = $1
23         worker_options = compact(:worker => worker_name,:worker_key => worker_key,
24                                  :worker_method => method_name,:job_key => job_key, :arg => arg)
25         run_method(host_info,:ask_work,worker_options)
26       elsif worker_method =~ /^enq_(\w+)/i
27         raise NoJobKey.new("Must specify a job key with enqueued tasks") if job_key.blank?
28         method_name = $1
29         marshalled_args = Marshal.dump(arg)
30         enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,
31                              :worker_method => method_name.to_s,:job_key => job_key.to_s,
32                              :args => marshalled_args,:timeout => arguments ? arguments[:timeout] : nil,:scheduled_at => new_schedule))
33        elsif worker_method =~ /^deq_(\w+)/i
34         raise NoJobKey.new("Must specify a job key to dequeue tasks") if job_key.blank?
35         method_name = $1
36         dequeue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,
37                              :worker_method => method_name.to_s,:job_key => job_key.to_s))
38       else
39         worker_options = compact(:worker => worker_name,:worker_key => worker_key,
40                                  :worker_method => worker_method,:job_key => job_key,:arg => arg)
41         run_method(host_info,:send_request,worker_options)
42       end
43     end
44
45     # enqueue tasks to the worker pool
46     def enqueue_task options = {}
47       BdrbJobQueue.insert_job(options)
48     end
49
50     # remove tasks from the worker pool
51     def dequeue_task options = {}
52       BdrbJobQueue.remove_job(options)
53     end
54
55     # invoke method on worker
56     def run_method host_info,method_name,worker_options = {}
57       result = []
58       connection = choose_connection(host_info)
59       raise NoServerAvailable.new("No BackgrounDRb server is found running") if connection.blank?
60       if host_info == :local or host_info.is_a?(String)
61         result << invoke_on_connection(connection,method_name,worker_options)
62       elsif host_info == :all
63         succeeded = false
64         begin
65           connection.each { |conn| result << invoke_on_connection(conn,method_name,worker_options) }
66           succeeded = true
67         rescue BdrbConnError; end
68         raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded
69       else
70         @tried_connections = [connection.server_info]
71         begin
72           result << invoke_on_connection(connection,method_name,worker_options)
73         rescue BdrbConnError => e
74           connection = middle_man.find_next_except_these(@tried_connections)
75           @tried_connections << connection.server_info
76           retry
77         end
78       end
79       return nil if method_name == :ask_work
80       return_result(result)
81     end
82
83     # choose a backgroundrb server connection and invoke worker method on it.
84     def invoke_on_connection connection,method_name,options = {}
85       raise NoServerAvailable.new("No BackgrounDRb is found running") unless connection
86       connection.send(method_name,options)
87     end
88
89     # get results back from the cache. Cache can be in-memory worker cache or memcache
90     # based cache
91     def ask_result job_key
92       options = compact(:worker => worker_name,:worker_key => worker_key,:job_key => job_key)
93       if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
94         return_result_from_memcache(options)
95       else
96         result = middle_man.backend_connections.map { |conn| conn.ask_result(options) }
97         return_result(result)
98       end
99     end
100
101     # return runtime information about worker
102     def worker_info
103       t_connections = middle_man.backend_connections
104       result = t_connections.map { |conn| conn.worker_info(compact(:worker => worker_name,:worker_key => worker_key)) }
105       return_result(result)
106     end
107
108     # generate worker key
109     def gen_key options
110       key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_')
111       key
112     end
113
114     # return result from memcache
115     def return_result_from_memcache options = {}
116       middle_man.cache[gen_key(options)]
117     end
118
119     # reset result within memcache for given key
120     def reset_memcache_result(job_key,value)
121       options = compact(:worker => worker_name,:worker_key => worker_key,\
122                           :job_key => job_key)
123       key = gen_key(options)
124       middle_man.cache[key] = value
125       value
126     end
127
128     def return_result result
129       result = Array(result)
130       result.size <= 1 ? result[0] : result
131     end
132
133     # delete a worker
134     def delete
135       middle_man.backend_connections.each do |connection|
136         connection.delete_worker(compact(:worker => worker_name, :worker_key => worker_key))
137       end
138       return worker_key
139     end
140
141     # choose a worker
142     def choose_connection host_info
143       case host_info
144       when :all; middle_man.backend_connections
145       when :local; middle_man.find_local
146       when String; middle_man.find_connection(host_info)
147       else; middle_man.choose_server
148       end
149     end
150
151     # helper method to compact a hash and for getting rid of nil parameters
152     def compact(options = { })
153       options.delete_if { |key,value| value.nil? }
154       options
155     end
156   end # end of RailsWorkerProxy class
157
158 end # end of BackgrounDRb module
Note: See TracBrowser for help on using the browser.