root/trunk/server/lib/bdrb_thread_pool.rb

Revision 326, 2.5 kB (checked in by gethema..@gmail.com, 6 months ago)

check in new backgroundrb code

Line 
1 module BackgrounDRb
2   class WorkData
3     attr_accessor :args,:block,:job_method,:persistent_job_id,:job_key
4     def initialize(args,job_key,job_method,persistent_job_id)
5       @args = args
6       @job_key = job_key
7       @job_method = job_method
8       @persistent_job_id = persistent_job_id
9     end
10   end
11
12   class ThreadPool
13     attr_accessor :size,:threads,:work_queue,:logger
14     attr_accessor :result_queue,:master
15
16     def initialize(master,size,logger)
17       @master = master
18       @logger = logger
19       @size = size
20       @threads = []
21       @work_queue = Queue.new
22       @size.times { add_thread }
23     end
24
25     # can be used to make a call in threaded manner
26     # passed block runs in a thread from thread pool
27     # for example in a worker method you can do:
28     #   def user_tags url
29     #     thread_pool.defer(:fetch_url,url)
30     #   end
31     #   def fetch_url(url)
32     #     begin
33     #       data = Net::HTTP.get(url,'/')
34     #       File.open("#{RAILS_ROOT}/log/pages.txt","w") do |fl|
35     #         fl.puts(data)
36     #       end
37     #     rescue
38     #       logger.info "Error downloading page"
39     #     end
40     #   end
41     # you can invoke above method from rails as:
42     #   MiddleMan.worker(:rss_worker).async_user_tags(:arg => "en.wikipedia.org")
43     # assuming method is defined in rss_worker
44
45     def defer(method_name,args = nil)
46       job_key = Thread.current[:job_key]
47       persistent_job_id = Thread.current[:persistent_job_id]
48       @work_queue << WorkData.new(args,job_key,method_name,persistent_job_id)
49     end
50
51     # Start worker threads
52     def add_thread
53       @threads << Thread.new do
54         Thread.current[:job_key] = nil
55         Thread.current[:persistent_job_id] = nil
56         while true
57           task = @work_queue.pop
58           Thread.current[:job_key] = task.job_key
59           Thread.current[:persistent_job_id] = task.persistent_job_id
60           block_result = run_task(task)
61         end
62       end
63     end
64
65     # run tasks popped out of queue
66     def run_task task
67       block_arity = master.method(task.job_method).arity
68       begin
69         ActiveRecord::Base.verify_active_connections!
70         t_data = task.args
71         result = nil
72         if block_arity != 0
73           result = master.send(task.job_method,task.args)
74         else
75           result = master.send(task.job_method)
76         end
77         return result
78       rescue
79         logger.info($!.to_s)
80         logger.info($!.backtrace.join("\n"))
81         return nil
82       end
83     end
84   end #end of class ThreadPool
85 end # end of module BackgrounDRb
86
Note: See TracBrowser for help on using the browser.