Class | MCollective::Client |
In: |
lib/mcollective/client.rb
|
Parent: | Object |
Helpers for writing clients that can talk to agents, do discovery and so forth
options | [RW] | |
stats | [RW] |
# File lib/mcollective/client.rb, line 8 8: def initialize(configfile) 9: @config = Config.instance 10: @config.loadconfig(configfile) unless @config.configured 11: 12: @connection = PluginManager["connector_plugin"] 13: @security = PluginManager["security_plugin"] 14: 15: @security.initiated_by = :client 16: @options = nil 17: @subscriptions = {} 18: 19: @connection.connect 20: end
Returns the configured main collective if no specific collective is specified as options
# File lib/mcollective/client.rb, line 24 24: def collective 25: if @options[:collective].nil? 26: @config.main_collective 27: else 28: @options[:collective] 29: end 30: end
Disconnects cleanly from the middleware
# File lib/mcollective/client.rb, line 33 33: def disconnect 34: Log.debug("Disconnecting from the middleware") 35: @connection.disconnect 36: end
Performs a discovery of nodes matching the filter passed returns an array of nodes
# File lib/mcollective/client.rb, line 101 101: def discover(filter, timeout) 102: begin 103: hosts = [] 104: Timeout.timeout(timeout) do 105: reqid = sendreq("ping", "discovery", filter) 106: Log.debug("Waiting #{timeout} seconds for discovery replies to request #{reqid}") 107: 108: loop do 109: msg = receive(reqid) 110: Log.debug("Got discovery reply from #{msg[:senderid]}") 111: hosts << msg[:senderid] 112: end 113: end 114: rescue Timeout::Error => e 115: hosts.sort 116: rescue Exception => e 117: raise 118: end 119: end
Performs a discovery and then send a request, performs the passed block for each response
times = discovered_req("status", "mcollectived", options, client) {|resp| pp resp }
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb, line 173 173: def discovered_req(body, agent, options=false) 174: stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 175: 176: options = @options unless options 177: 178: STDOUT.sync = true 179: 180: print("Determining the amount of hosts matching filter for #{options[:disctimeout]} seconds .... ") 181: 182: begin 183: discovered_hosts = discover(options[:filter], options[:disctimeout]) 184: discovered = discovered_hosts.size 185: hosts_responded = [] 186: hosts_not_responded = discovered_hosts 187: 188: stat[:discoverytime] = Time.now.to_f - stat[:starttime] 189: 190: puts("#{discovered}\n\n") 191: rescue Interrupt 192: puts("Discovery interrupted.") 193: exit! 194: end 195: 196: raise("No matching clients found") if discovered == 0 197: 198: begin 199: Timeout.timeout(options[:timeout]) do 200: reqid = sendreq(body, agent, options[:filter]) 201: 202: (1..discovered).each do |c| 203: resp = receive(reqid) 204: 205: hosts_responded << resp[:senderid] 206: hosts_not_responded.delete(resp[:senderid]) if hosts_not_responded.include?(resp[:senderid]) 207: 208: yield(resp) 209: end 210: end 211: rescue Interrupt => e 212: rescue Timeout::Error => e 213: end 214: 215: stat[:totaltime] = Time.now.to_f - stat[:starttime] 216: stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 217: stat[:responses] = hosts_responded.size 218: stat[:responsesfrom] = hosts_responded 219: stat[:noresponsefrom] = hosts_not_responded 220: stat[:discovered] = discovered 221: 222: @stats = stat 223: return stat 224: end
Prints out the stats returns from req and discovered_req in a nice way
# File lib/mcollective/client.rb, line 227 227: def display_stats(stats, options=false, caption="stomp call summary") 228: options = @options unless options 229: 230: if options[:verbose] 231: puts("\n---- #{caption} ----") 232: 233: if stats[:discovered] 234: puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}") 235: else 236: puts(" Nodes: #{stats[:responses]}") 237: end 238: 239: printf(" Start Time: %s\n", Time.at(stats[:starttime])) 240: printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000) 241: printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000) 242: printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000) 243: 244: else 245: if stats[:discovered] 246: printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000) 247: else 248: printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000) 249: end 250: end 251: 252: if stats[:noresponsefrom].size > 0 253: puts("\nNo response from:\n") 254: 255: stats[:noresponsefrom].each do |c| 256: puts if c % 4 == 1 257: printf("%30s", c) 258: end 259: 260: puts 261: end 262: end
Blocking call that waits for ever for a message to arrive.
If you give it a requestid this means you‘ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.
# File lib/mcollective/client.rb, line 77 77: def receive(requestid = nil) 78: msg = nil 79: 80: begin 81: msg = @connection.receive 82: 83: msg = @security.decodemsg(msg) 84: 85: msg[:senderid] = Digest::MD5.hexdigest(msg[:senderid]) if ENV.include?("MCOLLECTIVE_ANON") 86: 87: raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{msg[:requestid]}") if msg[:requestid] != requestid 88: rescue SecurityValidationFailed => e 89: Log.warn("Ignoring a message that did not pass security validations") 90: retry 91: rescue MsgDoesNotMatchRequestID => e 92: Log.debug("Ignoring a message for some other client") 93: retry 94: end 95: 96: msg 97: end
Send a request, performs the passed block for each response
times = req("status", "mcollectived", options, client) {|resp|
pp resp
}
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb, line 129 129: def req(body, agent, options=false, waitfor=0) 130: stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 131: 132: options = @options unless options 133: 134: STDOUT.sync = true 135: 136: hosts_responded = 0 137: 138: begin 139: Timeout.timeout(options[:timeout]) do 140: reqid = sendreq(body, agent, options[:filter]) 141: 142: loop do 143: resp = receive(reqid) 144: 145: hosts_responded += 1 146: 147: yield(resp) 148: 149: break if (waitfor != 0 && hosts_responded >= waitfor) 150: end 151: end 152: rescue Interrupt => e 153: rescue Timeout::Error => e 154: end 155: 156: stat[:totaltime] = Time.now.to_f - stat[:starttime] 157: stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 158: stat[:responses] = hosts_responded 159: stat[:noresponsefrom] = [] 160: 161: @stats = stat 162: return stat 163: end
Sends a request and returns the generated request id, doesn‘t wait for responses and doesn‘t execute any passed in code blocks for responses
# File lib/mcollective/client.rb, line 40 40: def sendreq(msg, agent, filter = {}) 41: target = Util.make_target(agent, :command, collective) 42: 43: reqid = Digest::MD5.hexdigest("#{@config.identity}-#{Time.now.to_f.to_s}-#{target}") 44: 45: # Security plugins now accept an agent and collective, ones written for <= 1.1.4 dont 46: # but we still want to support them, try to call them in a compatible way if they 47: # dont support the new arguments 48: begin 49: req = @security.encoderequest(@config.identity, target, msg, reqid, filter, agent, collective) 50: rescue ArgumentError 51: req = @security.encoderequest(@config.identity, target, msg, reqid, filter) 52: end 53: 54: Log.debug("Sending request #{reqid} to #{target}") 55: 56: unless @subscriptions.include?(agent) 57: topic = Util.make_target(agent, :reply, collective) 58: Log.debug("Subscribing to #{topic}") 59: 60: Util.subscribe(topic) 61: @subscriptions[agent] = 1 62: end 63: 64: Timeout.timeout(2) do 65: @connection.send(target, req) 66: end 67: 68: reqid 69: end