Changeset 276

Show
Ignore:
Timestamp:
12/15/07 03:57:44 (9 months ago)
Author:
gethema..@gmail.com
Message:

clean up logging things

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/script/backgroundrb

    r275 r276  
    1010$LOAD_PATH.unshift(WORKER_ROOT) 
    1111 
    12 # require "#{PACKET_APP}/server/master_worker" 
    1312require "packet" 
    1413require "meta_worker" 
     
    3433      [STDIN, STDOUT, STDERR].each {|desc| desc.reopen(log_file)} 
    3534    end 
    36     MasterProxy.new() 
     35 
     36    BackgrounDRb::MasterProxy.new() 
    3737  end 
    3838when 'stop' 
     
    5050  File.delete(path) 
    5151else 
    52   MasterProxy.new() 
     52  BackgrounDRb::MasterProxy.new() 
    5353end 
  • trunk/server/master_worker.rb

    r275 r276  
    22 
    33module BackgrounDRb 
    4 end 
     4  # Class wraps a logger object for debugging internal errors within server 
     5  class DebugMaster 
     6    attr_accessor :log_mode 
     7    attr_accessor :logger 
     8    def initialize(log_mode) 
     9      @log_mode = log_mode 
     10      if @log_mode == :foreground 
     11        @logger = Logger.new(STDOUT) 
     12      else 
     13        @logger = Logger.new("#{RAILS_HOME}/log/backgroundrb_debug.log") 
     14      end 
     15    end 
    516 
    6 class MasterWorker 
    7   def receive_data p_data 
    8     @server_log.info(p_data) 
    9     @tokenizer.extract(p_data) do |b_data| 
    10       t_data = Marshal.load(b_data) 
    11       @server_log.info(t_data) 
    12       case t_data[:type] 
    13       when :do_work: process_work(t_data) 
    14       when :get_status: process_status(t_data) 
    15       when :get_result: process_request(t_data) 
    16       when :start_worker: start_worker_request(t_data) 
    17       when :delete_worker: delete_drb_worker(t_data) 
    18       when :all_worker_status: query_all_worker_status(t_data) 
    19       end 
     17    def info(data) 
     18      @logger.info(data) 
     19    end 
     20 
     21    def debug(data) 
     22      @logger.debug(data) 
    2023    end 
    2124  end 
    2225 
    23   def query_all_worker_status(p_data) 
    24     dumpable_status = { } 
    25     workers.each { |key,value| dumpable_status[key] = value.worker_status } 
    26     send_object(dumpable_status) 
     26  class MasterWorker 
     27    attr_accessor :debug_logger 
     28    def receive_data p_data 
     29      debug_logger.info(p_data) 
     30      @tokenizer.extract(p_data) do |b_data| 
     31        t_data = Marshal.load(b_data) 
     32        debug_logger.info(t_data) 
     33        case t_data[:type] 
     34        when :do_work: process_work(t_data) 
     35        when :get_status: process_status(t_data) 
     36        when :get_result: process_request(t_data) 
     37        when :start_worker: start_worker_request(t_data) 
     38        when :delete_worker: delete_drb_worker(t_data) 
     39        when :all_worker_status: query_all_worker_status(t_data) 
     40        end 
     41      end 
     42    end 
     43 
     44    def query_all_worker_status(p_data) 
     45      dumpable_status = { } 
     46      workers.each { |key,value| dumpable_status[key] = value.worker_status } 
     47      send_object(dumpable_status) 
     48    end 
     49 
     50    def delete_drb_worker(t_data) 
     51      worker_name = t_data[:worker] 
     52      job_key = t_data[:job_key] 
     53      worker_name_key = gen_worker_key(worker_name,job_key) 
     54      begin 
     55        ask_worker(worker_name,:job_key => t_data[:job_key],:type => :request, :data => { :worker_method => :exit}) 
     56      rescue Packet::DisconnectError => sock_error 
     57        workers.delete(worker_name_key) 
     58      rescue 
     59        debug_logger.info($!.to_s) 
     60        debug_logger.info($!.backtrace.join("\n")) 
     61        return 
     62      end 
     63    end 
     64 
     65    def start_worker_request(p_data) 
     66      start_worker(p_data) 
     67    end 
     68 
     69    def process_work(t_data) 
     70      worker_name = t_data[:worker] 
     71      worker_name_key = gen_worker_key(worker_name,t_data[:job_key]) 
     72      t_data.delete(:worker) 
     73      t_data.delete(:type) 
     74      begin 
     75        ask_worker(worker_name,:data => t_data, :type => :request) 
     76      rescue Packet::DisconnectError => sock_error 
     77        workers.delete(worker_name_key) 
     78      rescue 
     79        debug_logger.info($!.to_s) 
     80        debug_logger.info($!.backtrace.join("\n")) 
     81        return 
     82      end 
     83 
     84    end 
     85 
     86    def process_status(t_data) 
     87      worker_name = t_data[:worker] 
     88      job_key = t_data[:job_key] 
     89      worker_name_key = gen_worker_key(worker_name,job_key) 
     90      status_data = reactor.result_hash[worker_name_key.to_sym] 
     91      send_object(status_data) 
     92    end 
     93 
     94    def process_request(t_data) 
     95      worker_name = t_data[:worker] 
     96      worker_name_key = gen_worker_key(worker_name,t_data[:job_key]) 
     97      t_data.delete(:worker) 
     98      t_data.delete(:type) 
     99      begin 
     100        ask_worker(worker_name,:data => t_data, :type => :request) 
     101      rescue Packet::DisconnectError => sock_error 
     102        workers.delete(worker_name_key) 
     103      rescue 
     104        debug_logger.info($!.to_s) 
     105        debug_logger.info($!.backtrace.join("\n")) 
     106        return 
     107      end 
     108    end 
     109 
     110    # this method can receive one shot status reports or proper results 
     111    def worker_receive p_data 
     112      send_object(p_data) 
     113    end 
     114 
     115    def unbind 
     116      debug_logger.info("Client disconected") 
     117    end 
     118    def post_init 
     119      @tokenizer = BinParser.new 
     120    end 
     121    def connection_completed; end 
    27122  end 
    28123 
    29   def delete_drb_worker(t_data) 
    30     worker_name = t_data[:worker] 
    31     job_key = t_data[:job_key] 
    32     worker_name_key = gen_worker_key(worker_name,job_key) 
    33     begin 
    34       ask_worker(worker_name,:job_key => t_data[:job_key],:type => :request, :data => { :worker_method => :exit}) 
    35     rescue Packet::DisconnectError => sock_error 
    36       workers.delete(worker_name_key) 
    37     rescue 
    38       puts $! 
    39       puts $!.backtrace 
    40       @server_log.info($!.to_s) 
    41       @server_log.info($!.backtrace.join("\n")) 
    42       return 
    43     end 
    44   end 
     124  class MasterProxy 
     125    def initialize 
     126      config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 
     127      debug_logger = DebugMaster.new(config_file[:backgroundrb][:log]) 
    45128 
    46   def start_worker_request(p_data) 
    47     start_worker(p_data) 
    48   end 
    49  
    50   def process_work(t_data) 
    51     worker_name = t_data[:worker] 
    52     worker_name_key = gen_worker_key(worker_name,t_data[:job_key]) 
    53     t_data.delete(:worker) 
    54     t_data.delete(:type) 
    55     begin 
    56       ask_worker(worker_name,:data => t_data, :type => :request) 
    57     rescue Packet::DisconnectError => sock_error 
    58       workers.delete(worker_name_key) 
    59     rescue 
    60       puts $! 
    61       puts $!.backtrace 
    62       @server_log.info($!.to_s) 
    63       @server_log.info($!.backtrace.join("\n")) 
    64       return 
     129      load_rails_env(config_file) 
     130      Packet::Reactor.run do |t_reactor| 
     131        t_reactor.start_worker(:worker => :log_worker) 
     132        t_reactor.start_server(config_file[:backgroundrb][:ip],config_file[:backgroundrb][:port],MasterWorker) { |conn|  conn.debug_logger = debug_logger } 
     133      end 
    65134    end 
    66135 
    67   end 
    68  
    69   def process_status(t_data) 
    70     worker_name = t_data[:worker] 
    71     job_key = t_data[:job_key] 
    72     worker_name_key = gen_worker_key(worker_name,job_key) 
    73     status_data = reactor.result_hash[worker_name_key.to_sym] 
    74     send_object(status_data) 
    75   end 
    76  
    77   def process_request(t_data) 
    78     worker_name = t_data[:worker] 
    79     worker_name_key = gen_worker_key(worker_name,t_data[:job_key]) 
    80     t_data.delete(:worker) 
    81     t_data.delete(:type) 
    82     begin 
    83       ask_worker(worker_name,:data => t_data, :type => :request) 
    84     rescue Packet::DisconnectError => sock_error 
    85       workers.delete(worker_name_key) 
    86     rescue 
    87       puts $! 
    88       puts $!.backtrace 
    89       @server_log.info($!.to_s) 
    90       @server_log.info($!.backtrace.join("\n")) 
    91       return 
     136    def load_rails_env(config_file) 
     137      ActiveRecord::Base.allow_concurrency = true 
     138      db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result) 
     139      run_env = config_file[:backgroundrb][:environment] || 'development' 
     140      ENV["RAILS_ENV"] = run_env 
     141      RAILS_ENV.replace(run_env) if defined?(RAILS_ENV) 
     142      require RAILS_HOME + '/config/environment.rb' 
     143      ActiveRecord::Base.establish_connection(db_config_file[run_env]) 
    92144    end 
    93   end 
    94  
    95   # this method can receive one shot status reports or proper results 
    96   def worker_receive p_data 
    97     send_object(p_data) 
    98   end 
    99  
    100   def unbind 
    101     puts "Client disconnected" 
    102   end 
    103   def post_init 
    104     @tokenizer = BinParser.new 
    105     @server_log = Logger.new("#{RAILS_HOME}/log/backgroundrb_debug.log") 
    106   end 
    107   def connection_completed; end 
    108 end 
    109  
    110  
    111 class MasterProxy 
    112   def initialize 
    113     config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result) 
    114     load_rails_env(config_file) 
    115     Packet::Reactor.run do |t_reactor| 
    116       t_reactor.start_worker(:worker => :log_worker) 
    117       t_reactor.start_server(config_file[:backgroundrb][:ip],config_file[:backgroundrb][:port],MasterWorker) 
    118     end 
    119   end 
    120  
    121   def load_rails_env(config_file) 
    122     ActiveRecord::Base.allow_concurrency = true 
    123     db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result) 
    124     run_env = config_file[:backgroundrb][:environment] || 'development' 
    125     ENV["RAILS_ENV"] = run_env 
    126     RAILS_ENV.replace(run_env) if defined?(RAILS_ENV) 
    127     require RAILS_HOME + '/config/environment.rb' 
    128     ActiveRecord::Base.establish_connection(db_config_file[run_env]) 
    129145  end 
    130146end 
    131147 
    132148 
     149