root/trunk/lib/backgroundrb/bdrb_job_queue.rb

Revision 328, 2.0 kB (checked in by gethema..@gmail.com, 3 months ago)

update trunk for 1.1 release

Line 
1 # Model for storing jobs/tasks persisted to the database
2
3 class BdrbJobQueue < ActiveRecord::Base
4   validates_uniqueness_of :job_key,:scope => [:worker_name,:worker_key]
5   # find next task from the table
6   def self.find_next(worker_name,worker_key = nil)
7     returned_job = nil
8     transaction do
9       unless worker_key
10         #use ruby time stamps for time calculations as db might have different times than what is calculated by ruby/rails
11         t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND scheduled_at <= ? ", worker_name, 0, Time.now.utc ],:lock => true)
12       else
13         t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND worker_key = ? AND scheduled_at <= ? ", worker_name, 0, worker_key, Time.now.utc ],:lock => true)
14       end
15       if t_job
16         t_job.taken = 1
17         t_job.started_at = Time.now.utc
18         t_job.save
19         returned_job = t_job
20       end
21     end
22     returned_job
23   end
24
25   # release a job and mark it to be unfinished and free.
26   # useful, if inside a worker, processing of this job failed and you want it to process later
27   def release_job
28     self.class.transaction do
29       self.taken = 0
30       self.started_at = nil
31       self.save
32     end
33   end
34
35   # insert a new job for processing. jobs added will be automatically picked by the appropriate worker
36   def self.insert_job(options = { })
37     transaction do
38       options.merge!(:submitted_at => Time.now.utc,:finished => 0,:taken => 0)
39       t_job = new(options)
40       t_job.save
41     end
42   end
43
44   # remove a job from table
45   def self.remove_job(options = { })
46     transaction do
47       t_job_id = find(:first, :conditions => options.merge(:finished => 0,:taken => 0),:lock => true)
48       delete(t_job_id)
49     end
50   end
51
52   # Mark a job as finished
53   def finish!
54     self.class.transaction do
55       self.finished = 1
56       self.finished_at = Time.now.utc
57       self.job_key = "finished_#{Time.now.utc.to_i}_#{job_key}"
58       self.save
59     end
60     Thread.current[:persistent_job_id] = nil
61     Thread.current[:job_key] = nil
62     nil
63   end
64 end
65
Note: See TracBrowser for help on using the browser.