| 1 |
module BackgrounDRb |
|---|
| 2 |
class Connection |
|---|
| 3 |
attr_accessor :server_ip,:server_port,:cluster_conn,:connection_status |
|---|
| 4 |
|
|---|
| 5 |
def initialize ip,port,cluster_conn |
|---|
| 6 |
@mutex = Mutex.new |
|---|
| 7 |
@server_ip = ip |
|---|
| 8 |
@server_port = port |
|---|
| 9 |
@cluster_conn = cluster_conn |
|---|
| 10 |
@connection_status = true |
|---|
| 11 |
end |
|---|
| 12 |
|
|---|
| 13 |
|
|---|
| 14 |
def establish_connection |
|---|
| 15 |
begin |
|---|
| 16 |
timeout(3) do |
|---|
| 17 |
@connection = TCPSocket.open(server_ip, server_port) |
|---|
| 18 |
@connection.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1) |
|---|
| 19 |
end |
|---|
| 20 |
@connection_status = true |
|---|
| 21 |
rescue Timeout::Error |
|---|
| 22 |
@connection_status = false |
|---|
| 23 |
rescue Exception => e |
|---|
| 24 |
@connection_status = false |
|---|
| 25 |
end |
|---|
| 26 |
end |
|---|
| 27 |
|
|---|
| 28 |
def write_data data |
|---|
| 29 |
begin |
|---|
| 30 |
flush_in_loop(data) |
|---|
| 31 |
rescue Errno::EAGAIN |
|---|
| 32 |
return |
|---|
| 33 |
rescue Errno::EPIPE |
|---|
| 34 |
establish_connection |
|---|
| 35 |
if @connection_status |
|---|
| 36 |
flush_in_loop(data) |
|---|
| 37 |
else |
|---|
| 38 |
@connection_status = false |
|---|
| 39 |
raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}") |
|---|
| 40 |
end |
|---|
| 41 |
rescue |
|---|
| 42 |
establish_connection |
|---|
| 43 |
if @connection_status |
|---|
| 44 |
flush_in_loop(data) |
|---|
| 45 |
else |
|---|
| 46 |
@connection_status = false |
|---|
| 47 |
raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}") |
|---|
| 48 |
end |
|---|
| 49 |
end |
|---|
| 50 |
end |
|---|
| 51 |
|
|---|
| 52 |
def server_info |
|---|
| 53 |
"#{server_ip}:#{server_port}" |
|---|
| 54 |
end |
|---|
| 55 |
|
|---|
| 56 |
def flush_in_loop(data) |
|---|
| 57 |
t_length = data.length |
|---|
| 58 |
loop do |
|---|
| 59 |
break if t_length <= 0 |
|---|
| 60 |
written_length = @connection.write(data) |
|---|
| 61 |
raise "Error writing to socket" if written_length <= 0 |
|---|
| 62 |
result = @connection.flush |
|---|
| 63 |
data = data[written_length..-1] |
|---|
| 64 |
t_length = data.length |
|---|
| 65 |
end |
|---|
| 66 |
end |
|---|
| 67 |
|
|---|
| 68 |
def dump_object data |
|---|
| 69 |
establish_connection |
|---|
| 70 |
raise BackgrounDRb::BdrbConnError.new("Error while connecting to the backgroundrb server #{server_info}") unless @connection_status |
|---|
| 71 |
|
|---|
| 72 |
object_dump = Marshal.dump(data) |
|---|
| 73 |
dump_length = object_dump.length.to_s |
|---|
| 74 |
length_str = dump_length.rjust(9,'0') |
|---|
| 75 |
final_data = length_str + object_dump |
|---|
| 76 |
@mutex.synchronize { write_data(final_data) } |
|---|
| 77 |
end |
|---|
| 78 |
|
|---|
| 79 |
def close_connection |
|---|
| 80 |
@connection.close |
|---|
| 81 |
@connection = nil |
|---|
| 82 |
end |
|---|
| 83 |
|
|---|
| 84 |
def ask_work p_data |
|---|
| 85 |
p_data[:type] = :async_invoke |
|---|
| 86 |
dump_object(p_data) |
|---|
| 87 |
close_connection |
|---|
| 88 |
end |
|---|
| 89 |
|
|---|
| 90 |
def new_worker p_data |
|---|
| 91 |
p_data[:type] = :start_worker |
|---|
| 92 |
dump_object(p_data) |
|---|
| 93 |
close_connection |
|---|
| 94 |
# RailsWorkerProxy.worker(p_data[:worker],p_data[:worker_key],self) |
|---|
| 95 |
end |
|---|
| 96 |
|
|---|
| 97 |
def worker_info(p_data) |
|---|
| 98 |
p_data[:type] = :worker_info |
|---|
| 99 |
dump_object(p_data) |
|---|
| 100 |
bdrb_response = nil |
|---|
| 101 |
@mutex.synchronize { bdrb_response = read_from_bdrb() } |
|---|
| 102 |
close_connection |
|---|
| 103 |
bdrb_response |
|---|
| 104 |
end |
|---|
| 105 |
|
|---|
| 106 |
def all_worker_info |
|---|
| 107 |
p_data = { } |
|---|
| 108 |
p_data[:type] = :all_worker_info |
|---|
| 109 |
dump_object(p_data) |
|---|
| 110 |
bdrb_response = nil |
|---|
| 111 |
@mutex.synchronize { bdrb_response = read_from_bdrb() } |
|---|
| 112 |
close_connection |
|---|
| 113 |
bdrb_response |
|---|
| 114 |
end |
|---|
| 115 |
|
|---|
| 116 |
def delete_worker p_data |
|---|
| 117 |
p_data[:type] = :delete_worker |
|---|
| 118 |
dump_object(p_data) |
|---|
| 119 |
close_connection |
|---|
| 120 |
end |
|---|
| 121 |
|
|---|
| 122 |
def read_object |
|---|
| 123 |
begin |
|---|
| 124 |
message_length_str = @connection.read(9) |
|---|
| 125 |
message_length = message_length_str.to_i |
|---|
| 126 |
message_data = @connection.read(message_length) |
|---|
| 127 |
return message_data |
|---|
| 128 |
rescue |
|---|
| 129 |
raise BackgrounDRb::BdrbConnError.new("Not able to connect #{server_info}") |
|---|
| 130 |
end |
|---|
| 131 |
end |
|---|
| 132 |
|
|---|
| 133 |
def gen_key options |
|---|
| 134 |
if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' |
|---|
| 135 |
key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_') |
|---|
| 136 |
key |
|---|
| 137 |
else |
|---|
| 138 |
options[:job_key] |
|---|
| 139 |
end |
|---|
| 140 |
end |
|---|
| 141 |
|
|---|
| 142 |
def ask_result(p_data) |
|---|
| 143 |
if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' |
|---|
| 144 |
return_result_from_memcache(p_data) |
|---|
| 145 |
else |
|---|
| 146 |
p_data[:type] = :get_result |
|---|
| 147 |
dump_object(p_data) |
|---|
| 148 |
bdrb_response = nil |
|---|
| 149 |
@mutex.synchronize { bdrb_response = read_from_bdrb() } |
|---|
| 150 |
close_connection |
|---|
| 151 |
bdrb_response ? bdrb_response[:data] : nil |
|---|
| 152 |
end |
|---|
| 153 |
end |
|---|
| 154 |
|
|---|
| 155 |
def read_from_bdrb(timeout = 3) |
|---|
| 156 |
begin |
|---|
| 157 |
ret_val = select([@connection],nil,nil,timeout) |
|---|
| 158 |
return nil unless ret_val |
|---|
| 159 |
raw_response = read_object() |
|---|
| 160 |
master_response = Marshal.load(raw_response) |
|---|
| 161 |
return master_response |
|---|
| 162 |
rescue |
|---|
| 163 |
return nil |
|---|
| 164 |
end |
|---|
| 165 |
end |
|---|
| 166 |
|
|---|
| 167 |
def send_request(p_data) |
|---|
| 168 |
p_data[:type] = :sync_invoke |
|---|
| 169 |
dump_object(p_data) |
|---|
| 170 |
bdrb_response = nil |
|---|
| 171 |
@mutex.synchronize { bdrb_response = read_from_bdrb(nil) } |
|---|
| 172 |
close_connection |
|---|
| 173 |
bdrb_response ? bdrb_response[:data] : nil |
|---|
| 174 |
end |
|---|
| 175 |
end |
|---|
| 176 |
end |
|---|