Changeset 275
- Timestamp:
- 12/15/07 03:28:40 (8 months ago)
- Files:
-
- trunk/framework/connection.rb (modified) (1 diff)
- trunk/framework/core.rb (modified) (2 diffs)
- trunk/framework/meta_pimp.rb (modified) (2 diffs)
- trunk/framework/nbio.rb (modified) (1 diff)
- trunk/framework/packet_master.rb (modified) (6 diffs)
- trunk/framework/worker.rb (modified) (1 diff)
- trunk/script/backgroundrb (modified) (1 diff)
- trunk/server/master_worker.rb (modified) (4 diffs)
- trunk/server/meta_worker.rb (modified) (7 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/framework/connection.rb
r217 r275 3 3 4 4 module Packet 5 class Connection 6 # method gets called when connection to external server is completed 7 def connection_completed 8 5 module Connection 6 def send_data p_data 7 begin 8 write_data(p_data,connection) 9 rescue DisconnectError => sock_error 10 close_connection 11 end 9 12 end 10 13 11 # method gets called when external client is disconnected12 def unbind13 14 def invoke_init 15 @initialized = true 16 post_init if respond_to?(:post_init) 14 17 end 15 18 16 # method gets called just at the beginning of initializing things.17 def post_init18 19 def close_connection 20 unbind if respond_to?(:unbind) 21 reactor.remove_connection(connection) 19 22 end 20 23 21 def send_data 22 24 def close_connection_after_writing 25 connection.flush 26 close_connection 23 27 end 24 28 25 def ask_worker 26 27 end 28 29 def receive_data 30 29 def send_object p_object 30 dump_object(p_object,connection) 31 31 end 32 32 end # end of class Connection trunk/framework/core.rb
r269 r275 65 65 def accept_connection(sock_opts) 66 66 sock_io = sock_opts[:socket] 67 67 puts "Someone is attempting a connection" 68 68 begin 69 69 client_socket,client_sockaddr = sock_io.accept_nonblock … … 131 131 def start_server(ip,port,t_module,&block) 132 132 BasicSocket.do_not_reverse_lookup = true 133 configure_socket_options133 # configure_socket_options 134 134 t_socket = TCPServer.new(ip,port.to_i) 135 t_socket.setsockopt(*@tcp_defer_accept_opts) rescue nil135 # t_socket.setsockopt(*@tcp_defer_accept_opts) rescue nil 136 136 listen_sockets[t_socket.fileno] = { :socket => t_socket,:block => block,:module => t_module } 137 137 @read_ios << t_socket trunk/framework/meta_pimp.rb
r229 r275 5 5 # initializer of pimp 6 6 attr_accessor :callback_hash 7 attr_accessor :worker_status 7 attr_accessor :worker_status, :worker_key 8 8 def pimp_init 9 9 @callback_hash ||= {} 10 10 @worker_status = nil 11 11 @worker_result = nil 12 @worker_key = nil 12 13 @tokenizer = BinParser.new 13 14 end … … 39 40 40 41 def save_worker_status(data_options = { }) 41 @worker_status = data_options[:data] 42 # @worker_status = data_options[:data] 43 reactor.update_result(worker_key,data_options[:data]) 42 44 end 43 45 trunk/framework/nbio.rb
r258 r275 64 64 p_sock.write_nonblock(final_data) 65 65 rescue Errno::EAGAIN 66 puts " Lol"66 puts "EAGAIN Error while writing socket" 67 67 return 68 68 rescue Errno::EINTR trunk/framework/packet_master.rb
r259 r275 1 # FIXME: Some code is duplicated between worker class and this Reactor class, that can be fixed2 # with help of creation of Connection class and enabling automatic inheritance of that class and3 # mixing in of methods from that class.4 1 module Packet 5 2 class Reactor 6 3 include Core 7 4 attr_accessor :fd_writers, :msg_writers,:msg_reader 5 attr_accessor :result_hash 6 8 7 attr_accessor :live_workers 9 8 after_connection :provide_workers … … 15 14 def self.run 16 15 master_reactor_instance = new 16 master_reactor_instance.result_hash = {} 17 17 master_reactor_instance.live_workers = DoubleKeyedHash.new 18 18 yield(master_reactor_instance) … … 21 21 end # end of run method 22 22 23 def update_result(worker_key,result) 24 @result_hash ||= {} 25 @result_hash[worker_key.to_sym] = result 26 end 27 23 28 def provide_workers(handler_instance,t_sock) 24 29 class << handler_instance … … 26 31 attr_accessor :workers,:connection,:reactor, :initialized,:signature 27 32 include NbioHelper 28 29 def send_data p_data 30 begin 31 write_data(p_data,connection) 32 rescue Errno::EPIPE 33 # probably a callback, when there is a error in writing to the socket 34 end 35 end 36 37 def invoke_init 38 @initialized = true 39 post_init 40 end 41 42 def close_connection 43 unbind 44 reactor.remove_connection(connection) 45 end 46 47 def close_connection_after_writing 48 connection.flush 49 unbind 50 reactor.remove_connection(connection) 51 end 52 33 include Connection 53 34 def ask_worker(*args) 54 35 worker_name = args.shift … … 59 40 end 60 41 61 def send_object p_object62 dump_object(p_object,connection)63 end64 42 def_delegators(:@reactor, :start_server, :connect, :add_periodic_timer, \ 65 43 :add_timer, :cancel_timer,:reconnect, :start_worker,:delete_worker) … … 157 135 @live_workers[worker_name_key,master_read_end.fileno] = pimp_klass.new(master_write_end,pid,self) 158 136 else 159 @live_workers[worker_name_key,master_read_end.fileno] = Packet::MetaPimp.new(master_write_end,pid,self) 137 t_pimp = Packet::MetaPimp.new(master_write_end,pid,self) 138 t_pimp.worker_key = worker_name_key 139 @live_workers[worker_name_key,master_read_end.fileno] = t_pimp 160 140 end 161 141 trunk/framework/worker.rb
r217 r275 63 63 attr_accessor :worker, :connection, :reactor, :initialized, :signature 64 64 include NbioHelper 65 def send_data p_data 66 begin 67 write_data(p_data,connection) 68 rescue Errno::EPIPE 69 # probably a callback 70 end 71 end 72 73 def invoke_init 74 @initialized = true 75 post_init 76 end 77 78 def close_connection 79 unbind 80 reactor.remove_connection(connection) 81 end 82 83 def close_connection_after_writing 84 connection.flush 85 unbind 86 reactor.remove_connection(connection) 87 end 88 89 def send_object p_object 90 dump_object(p_object,connection) 91 end 92 65 include Connection 93 66 def_delegators :@reactor, :start_server, :connect, :add_periodic_timer, :add_timer, :cancel_timer,:reconnect 94 67 end trunk/script/backgroundrb
r269 r275 32 32 if config_file[:backgroundrb][:log].nil? or config_file[:backgroundrb][:log] != 'foreground' 33 33 log_file = File.open(SERVER_LOGGER,"w+") 34 p "***************** : changing file descriptors"35 34 [STDIN, STDOUT, STDERR].each {|desc| desc.reopen(log_file)} 36 else37 p "***************** : everything on stdout"38 35 end 39 36 MasterProxy.new() trunk/server/master_worker.rb
r269 r275 1 1 #!/usr/bin/env ruby 2 # PACKET_APP = File.expand_path(File.join(File.dirname(__FILE__) + "/.."))3 # RAILS_HOME = File.expand_path(File.join(File.dirname(__FILE__) + "/../.."))4 5 2 6 3 module BackgrounDRb … … 72 69 def process_status(t_data) 73 70 worker_name = t_data[:worker] 74 status_data = workers[worker_name].worker_status 71 job_key = t_data[:job_key] 72 worker_name_key = gen_worker_key(worker_name,job_key) 73 status_data = reactor.result_hash[worker_name_key.to_sym] 75 74 send_object(status_data) 76 75 end … … 113 112 def initialize 114 113 config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 114 load_rails_env(config_file) 115 115 Packet::Reactor.run do |t_reactor| 116 116 t_reactor.start_worker(:worker => :log_worker) … … 118 118 end 119 119 end 120 121 def load_rails_env(config_file) 122 ActiveRecord::Base.allow_concurrency = true 123 db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result) 124 run_env = config_file[:backgroundrb][:environment] || 'development' 125 ENV["RAILS_ENV"] = run_env 126 RAILS_ENV.replace(run_env) if defined?(RAILS_ENV) 127 require RAILS_HOME + '/config/environment.rb' 128 ActiveRecord::Base.establish_connection(db_config_file[run_env]) 129 end 120 130 end 121 131 trunk/server/meta_worker.rb
r271 r275 1 # FIXME: need to wrap workers within a namespace2 1 module BackgrounDRb 3 2 # this class is a dummy class that implements things required for passing data to … … 15 14 end 16 15 end 16 # == MetaWorker class 17 # BackgrounDRb workers are asynchrounous reactors which work using events 18 # You are free to use threads in your workers, but be reasonable with them. 19 # Following methods are available to all workers from parent classes. 20 # * BackgrounDRb::MetaWorker#connect 21 # 22 # Above method connects to an external tcp server and integrates the connection 23 # within reactor loop of worker. For example: 24 # 25 # class TimeClient 26 # def receive_data(p_data) 27 # worker.get_external_data(p_data) 28 # end 29 # 30 # def post_init 31 # p "***************** : connection completed" 32 # end 33 # end 34 # 35 # class FooWorker < BackgrounDRb::MetaWorker 36 # set_worker_name :foo_worker 37 # def create(args = nil) 38 # external_connection = nil 39 # connect("localhost",11009,TimeClient) { |conn| external_connection = conn } 40 # end 41 # 42 # def get_external_data(p_data) 43 # puts "And external data is : #{p_data}" 44 # end 45 # end 46 # * BackgrounDRb::MetaWorker#start_server 47 # 48 # Above method allows you to start a tcp server from your worker, all the 49 # accepted connections are integrated with event loop of worker 50 # class TimeServer 51 # 52 # def receive_data(p_data) 53 # end 54 # 55 # def post_init 56 # add_periodic_timer(2) { say_hello_world } 57 # end 58 # 59 # def connection_completed 60 # end 61 # 62 # def say_hello_world 63 # p "***************** : invoking hello world #{Time.now}" 64 # send_data("Hello World\n") 65 # end 66 # end 67 # 68 # class ServerWorker < BackgrounDRb::MetaWorker 69 # set_worker_name :server_worker 70 # def create(args = nil) 71 # # start the server when worker starts 72 # start_server("0.0.0.0",11009,TimeServer) do |client_connection| 73 # client_connection.say_hello_world 74 # end 75 # end 76 # end 17 77 18 78 class MetaWorker < Packet::Worker … … 24 84 attr_accessor :logger 25 85 86 # does initialization of worker stuff and invokes create method in 87 # user defined worker class 26 88 def worker_init 27 89 @config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 28 load_rails_env90 # load_rails_env 29 91 @logger = PacketLogger.new(self) 30 92 if(@worker_options && @worker_options[:schedule] && no_auto_load) … … 41 103 end 42 104 105 # loads workers schedule from options supplied from rails 43 106 # a user may pass trigger arguments to dynamically define the schedule 44 107 def load_schedule_from_args … … 47 110 end 48 111 112 # receives requests/responses from master process or other workers 49 113 def receive_data p_data 50 114 if p_data[:data][:worker_method] == :exit … … 58 122 end 59 123 124 # method is responsible for invoking appropriate method in user 60 125 def process_request(p_data) 61 126 user_input = p_data[:data] … … 187 252 end 188 253 189 private 190 def load_rails_env 191 ActiveRecord::Base.allow_concurrency = true 192 db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result) 193 run_env = @config_file[:backgroundrb][:environment] || 'development' 194 require RAILS_HOME + '/config/environment.rb' 195 ENV['RAILS_ENV'] = run_env 196 ActiveRecord::Base.establish_connection(db_config_file[run_env]) 197 end 254 # private 255 # def load_rails_env 256 # ActiveRecord::Base.allow_concurrency = true 257 # db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result) 258 # run_env = @config_file[:backgroundrb][:environment] || 'development' 259 # ENV["RAILS_ENV"] = run_env 260 # RAILS_ENV.replace(run_env) if defined?(RAILS_ENV) 261 # require RAILS_HOME + '/config/environment.rb' 262 # ActiveRecord::Base.establish_connection(db_config_file[run_env]) 263 # end 198 264 end # end of class MetaWorker 199 265 end # end of module BackgrounDRb
