root/trunk/lib/backgroundrb/bdrb_connection.rb

Revision 327, 4.6 kB (checked in by gethema..@gmail.com, 5 months ago)

sync code with git

Line 
1 module BackgrounDRb
2   class Connection
3     attr_accessor :server_ip,:server_port,:cluster_conn,:connection_status
4
5     def initialize ip,port,cluster_conn
6       @mutex = Mutex.new
7       @server_ip = ip
8       @server_port = port
9       @cluster_conn = cluster_conn
10       @connection_status = true
11     end
12
13
14     def establish_connection
15       begin
16         timeout(3) do
17           @connection = TCPSocket.open(server_ip, server_port)
18           @connection.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
19         end
20         @connection_status = true
21       rescue Timeout::Error
22         @connection_status = false
23       rescue Exception => e
24         @connection_status = false
25       end
26     end
27
28     def write_data data
29       begin
30         flush_in_loop(data)
31       rescue Errno::EAGAIN
32         return
33       rescue Errno::EPIPE
34         establish_connection
35         if @connection_status
36           flush_in_loop(data)
37         else
38           @connection_status = false
39           raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}")
40         end
41       rescue
42         establish_connection
43         if @connection_status
44           flush_in_loop(data)
45         else
46           @connection_status = false
47           raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}")
48         end
49       end
50     end
51
52     def server_info
53       "#{server_ip}:#{server_port}"
54     end
55
56     def flush_in_loop(data)
57       t_length = data.length
58       loop do
59         break if t_length <= 0
60         written_length = @connection.write(data)
61         raise "Error writing to socket" if written_length <= 0
62         result = @connection.flush
63         data = data[written_length..-1]
64         t_length = data.length
65       end
66     end
67
68     def dump_object data
69       establish_connection
70       raise BackgrounDRb::BdrbConnError.new("Error while connecting to the backgroundrb server #{server_info}") unless @connection_status
71
72       object_dump = Marshal.dump(data)
73       dump_length = object_dump.length.to_s
74       length_str = dump_length.rjust(9,'0')
75       final_data = length_str + object_dump
76       @mutex.synchronize { write_data(final_data) }
77     end
78
79     def close_connection
80       @connection.close
81       @connection = nil
82     end
83
84     def ask_work p_data
85       p_data[:type] = :async_invoke
86       dump_object(p_data)
87       close_connection
88     end
89
90     def new_worker p_data
91       p_data[:type] = :start_worker
92       dump_object(p_data)
93       close_connection
94       # RailsWorkerProxy.worker(p_data[:worker],p_data[:worker_key],self)
95     end
96
97     def worker_info(p_data)
98       p_data[:type] = :worker_info
99       dump_object(p_data)
100       bdrb_response = nil
101       @mutex.synchronize { bdrb_response = read_from_bdrb() }
102       close_connection
103       bdrb_response
104     end
105
106     def all_worker_info
107       p_data = { }
108       p_data[:type] = :all_worker_info
109       dump_object(p_data)
110       bdrb_response = nil
111       @mutex.synchronize { bdrb_response = read_from_bdrb() }
112       close_connection
113       bdrb_response
114     end
115
116     def delete_worker p_data
117       p_data[:type] = :delete_worker
118       dump_object(p_data)
119       close_connection
120     end
121
122     def read_object
123       begin
124         message_length_str = @connection.read(9)
125         message_length = message_length_str.to_i
126         message_data = @connection.read(message_length)
127         return message_data
128       rescue
129         raise BackgrounDRb::BdrbConnError.new("Not able to connect #{server_info}")
130       end
131     end
132
133     def gen_key options
134       if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
135         key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_')
136         key
137       else
138         options[:job_key]
139       end
140     end
141
142     def ask_result(p_data)
143       if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
144         return_result_from_memcache(p_data)
145       else
146         p_data[:type] = :get_result
147         dump_object(p_data)
148         bdrb_response = nil
149         @mutex.synchronize { bdrb_response = read_from_bdrb() }
150         close_connection
151         bdrb_response ? bdrb_response[:data] : nil
152       end
153     end
154
155     def read_from_bdrb(timeout = 3)
156       begin
157         ret_val = select([@connection],nil,nil,timeout)
158         return nil unless ret_val
159         raw_response = read_object()
160         master_response = Marshal.load(raw_response)
161         return master_response
162       rescue
163         return nil
164       end
165     end
166
167     def send_request(p_data)
168       p_data[:type] = :sync_invoke
169       dump_object(p_data)
170       bdrb_response = nil
171       @mutex.synchronize { bdrb_response = read_from_bdrb(nil) }
172       close_connection
173       bdrb_response ? bdrb_response[:data] : nil
174     end
175   end
176 end
Note: See TracBrowser for help on using the browser.