root/trunk/lib/backgroundrb/bdrb_cluster_connection.rb

Revision 327, 5.5 kB (checked in by gethema..@gmail.com, 5 months ago)

sync code with git

Line 
1 # class stores connections to BackgrounDRb servers in a cluster manner
2 module BackgrounDRb
3   class ClusterConnection
4     include ClientHelper
5     attr_accessor :backend_connections,:config,:cache,:bdrb_servers
6     attr_accessor :disconnected_connections
7
8     # initialize cluster connection
9     def initialize
10       @bdrb_servers = []
11       @backend_connections = []
12       @disconnected_connections = {}
13
14       @last_polled_time = Time.now
15       @request_count = 0
16
17       initialize_memcache if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
18       establish_connections
19       @round_robin = (0...@backend_connections.length).to_a
20     end
21
22     # initialize memache if client is storing results in memcache
23     def initialize_memcache
24       require 'memcache'
25       memcache_options = {
26         :c_threshold => 10_000,
27         :compression => true,
28         :debug => false,
29         :namespace => 'backgroundrb_result_hash',
30         :readonly => false,
31         :urlencode => false
32       }
33       @cache = MemCache.new(memcache_options)
34       @cache.servers = BDRB_CONFIG[:memcache].split(',')
35     end
36
37     # initialize all backend server connections
38     def establish_connections
39       klass = Struct.new(:ip,:port)
40       if t_servers = BDRB_CONFIG[:client]
41         connections = t_servers.split(',')
42         connections.each do |conn_string|
43           ip = conn_string.split(':')[0]
44           port = conn_string.split(':')[1].to_i
45           @bdrb_servers << klass.new(ip,port)
46         end
47       end
48       @bdrb_servers << klass.new(BDRB_CONFIG[:backgroundrb][:ip],BDRB_CONFIG[:backgroundrb][:port].to_i)
49       @bdrb_servers.each_with_index do |connection_info,index|
50         next if @backend_connections.detect { |x| x.server_info == "#{connection_info.ip}:#{connection_info.port}" }
51         @backend_connections << Connection.new(connection_info.ip,connection_info.port,self)
52       end
53     end # end of method establish_connections
54
55     # every 10 request or 10 seconds it will try to reconnect to bdrb servers which were down
56     def discover_server_periodically
57       @disconnected_connections.each do |key,connection|
58         connection.establish_connection
59         if connection.connection_status
60           @backend_connections << connection
61           connection.close_connection
62           @disconnected_connections[key] = nil
63         end
64       end
65       @disconnected_connections.delete_if { |key,value| value.nil? }
66       @round_robin = (0...@backend_connections.length).to_a
67     end
68
69     # Find live connections except those mentioned in array, because they
70     # are already dead.
71     def find_next_except_these connections
72       invalid_connections = @backend_connections.select { |x| connections.include?(x.server_info) }
73       @backend_connections.delete_if { |x| connections.include?(x.server_info) }
74       @round_robin = (0...@backend_connections.length).to_a
75       invalid_connections.each do |x|
76         @disconnected_connections[x.server_info] = x
77       end
78       chosen = @backend_connections.detect { |x| !(connections.include?(x.server_info)) }
79       raise NoServerAvailable.new("No BackgrounDRb server is found running") unless chosen
80       chosen
81     end
82
83     # Fina a connection by host name and port
84     def find_connection host_info
85       conn = @backend_connections.detect { |x| x.server_info == host_info }
86       raise NoServerAvailable.new("BackgrounDRb server is not found running on #{host_info}") unless conn
87       return conn
88     end
89
90     # find the local configured connection
91     def find_local
92       find_connection("#{BDRB_CONFIG[:backgroundrb][:ip]}:#{BDRB_CONFIG[:backgroundrb][:port]}")
93     end
94
95     # return the worker proxy
96     def worker(worker_name,worker_key = nil)
97       update_stats
98       RailsWorkerProxy.new(worker_name,worker_key,self)
99     end
100
101     # Update the stats and discover new nodes if they came up.
102     def update_stats
103       @request_count += 1
104       discover_server_periodically if(time_to_discover? && !@disconnected_connections.empty?)
105     end
106
107     # Check if, we should try to discover new bdrb servers
108     def time_to_discover?
109       if((@request_count%10 == 0) or (Time.now > (@last_polled_time + 10.seconds)))
110         @last_polled_time = Time.now
111         return true
112       else
113         return false
114       end
115     end
116
117     # Send worker information of all currently running workers from all configured bdrb
118     # servers
119     def all_worker_info
120       update_stats
121       info_data = {}
122       @backend_connections.each do |t_connection|
123         info_data[t_connection.server_info] = t_connection.all_worker_info rescue nil
124       end
125       return info_data
126     end
127
128     # one of the backend connections are chosen and worker is started on it
129     def new_worker(options = {})
130       update_stats
131       succeeded = false
132       @backend_connections.each do |connection|
133         begin
134           connection.new_worker(options)
135           succeeded = true
136         rescue BdrbConnError; end
137       end
138       raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded
139       return options[:worker_key]
140     end
141
142     # choose a server in round robin manner.
143     def choose_server
144       if @round_robin.empty?
145         @round_robin = (0...@backend_connections.length).to_a
146       end
147       if @round_robin.empty? && @backend_connections.empty?
148         discover_server_periodically
149         raise NoServerAvailable.new("No BackgrounDRb server is found running") if @round_robin.empty? && @backend_connections.empty?
150       end
151       @backend_connections[@round_robin.shift]
152     end
153   end # end of ClusterConnection
154 end # end of Module BackgrounDRb
Note: See TracBrowser for help on using the browser.