Class MCollective::Client
In: lib/mcollective/client.rb
Parent: Object

Helpers for writing clients that can talk to agents, do discovery and so forth

Methods

Attributes

options  [RW] 
stats  [RW] 

Public Class methods

[Source]

    # File lib/mcollective/client.rb, line 8
 8:         def initialize(configfile)
 9:             @config = Config.instance
10:             @config.loadconfig(configfile) unless @config.configured
11: 
12:             @connection = PluginManager["connector_plugin"]
13:             @security = PluginManager["security_plugin"]
14: 
15:             @security.initiated_by = :client
16:             @options = nil
17:             @subscriptions = {}
18: 
19:             @connection.connect
20:         end

Public Instance methods

Returns the configured main collective if no specific collective is specified as options

[Source]

    # File lib/mcollective/client.rb, line 24
24:         def collective
25:             if @options[:collective].nil?
26:                 @config.main_collective
27:             else
28:                 @options[:collective]
29:             end
30:         end

Disconnects cleanly from the middleware

[Source]

    # File lib/mcollective/client.rb, line 33
33:         def disconnect
34:             Log.debug("Disconnecting from the middleware")
35:             @connection.disconnect
36:         end

Performs a discovery of nodes matching the filter passed returns an array of nodes

[Source]

     # File lib/mcollective/client.rb, line 101
101:         def discover(filter, timeout)
102:             begin
103:                 hosts = []
104:                 Timeout.timeout(timeout) do
105:                     reqid = sendreq("ping", "discovery", filter)
106:                     Log.debug("Waiting #{timeout} seconds for discovery replies to request #{reqid}")
107: 
108:                     loop do
109:                         msg = receive(reqid)
110:                         Log.debug("Got discovery reply from #{msg[:senderid]}")
111:                         hosts << msg[:senderid]
112:                     end
113:                 end
114:             rescue Timeout::Error => e
115:                 hosts.sort
116:             rescue Exception => e
117:                 raise
118:             end
119:         end

Performs a discovery and then send a request, performs the passed block for each response

   times = discovered_req("status", "mcollectived", options, client) {|resp|
      pp resp
   }

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 173
173:         def discovered_req(body, agent, options=false)
174:             stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
175: 
176:             options = @options unless options
177: 
178:             STDOUT.sync = true
179: 
180:             print("Determining the amount of hosts matching filter for #{options[:disctimeout]} seconds .... ")
181: 
182:             begin
183:                 discovered_hosts = discover(options[:filter], options[:disctimeout])
184:                 discovered = discovered_hosts.size
185:                 hosts_responded = []
186:                 hosts_not_responded = discovered_hosts
187: 
188:                 stat[:discoverytime] = Time.now.to_f - stat[:starttime]
189: 
190:                 puts("#{discovered}\n\n")
191:             rescue Interrupt
192:                 puts("Discovery interrupted.")
193:                 exit!
194:             end
195: 
196:             raise("No matching clients found") if discovered == 0
197: 
198:             begin
199:                 Timeout.timeout(options[:timeout]) do
200:                     reqid = sendreq(body, agent, options[:filter])
201: 
202:                     (1..discovered).each do |c|
203:                         resp = receive(reqid)
204: 
205:                         hosts_responded << resp[:senderid]
206:                         hosts_not_responded.delete(resp[:senderid]) if hosts_not_responded.include?(resp[:senderid])
207: 
208:                         yield(resp)
209:                     end
210:                 end
211:             rescue Interrupt => e
212:             rescue Timeout::Error => e
213:             end
214: 
215:             stat[:totaltime] = Time.now.to_f - stat[:starttime]
216:             stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
217:             stat[:responses] = hosts_responded.size
218:             stat[:responsesfrom] = hosts_responded
219:             stat[:noresponsefrom] = hosts_not_responded
220:             stat[:discovered] = discovered
221: 
222:             @stats = stat
223:             return stat
224:         end

Prints out the stats returns from req and discovered_req in a nice way

[Source]

     # File lib/mcollective/client.rb, line 227
227:         def display_stats(stats, options=false, caption="stomp call summary")
228:             options = @options unless options
229: 
230:             if options[:verbose]
231:                 puts("\n---- #{caption} ----")
232: 
233:                 if stats[:discovered]
234:                     puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
235:                 else
236:                     puts("           Nodes: #{stats[:responses]}")
237:                 end
238: 
239:                 printf("      Start Time: %s\n", Time.at(stats[:starttime]))
240:                 printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
241:                 printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
242:                 printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
243: 
244:             else
245:                 if stats[:discovered]
246:                     printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
247:                 else
248:                     printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
249:                 end
250:             end
251: 
252:             if stats[:noresponsefrom].size > 0
253:                 puts("\nNo response from:\n")
254: 
255:                 stats[:noresponsefrom].each do |c|
256:                     puts if c % 4 == 1
257:                     printf("%30s", c)
258:                 end
259: 
260:                 puts
261:             end
262:         end

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you‘ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.

[Source]

    # File lib/mcollective/client.rb, line 77
77:         def receive(requestid = nil)
78:             msg = nil
79: 
80:             begin
81:                 msg = @connection.receive
82: 
83:                 msg = @security.decodemsg(msg)
84: 
85:                 msg[:senderid] = Digest::MD5.hexdigest(msg[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
86: 
87:                 raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{msg[:requestid]}") if msg[:requestid] != requestid
88:             rescue SecurityValidationFailed => e
89:                 Log.warn("Ignoring a message that did not pass security validations")
90:                 retry
91:             rescue MsgDoesNotMatchRequestID => e
92:                 Log.debug("Ignoring a message for some other client")
93:                 retry
94:             end
95: 
96:             msg
97:         end

Send a request, performs the passed block for each response

times = req("status", "mcollectived", options, client) {|resp|

  pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 129
129:         def req(body, agent, options=false, waitfor=0)
130:             stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
131: 
132:             options = @options unless options
133: 
134:             STDOUT.sync = true
135: 
136:             hosts_responded = 0
137: 
138:             begin
139:                 Timeout.timeout(options[:timeout]) do
140:                     reqid = sendreq(body, agent, options[:filter])
141: 
142:                     loop do
143:                         resp = receive(reqid)
144: 
145:                         hosts_responded += 1
146: 
147:                         yield(resp)
148: 
149:                         break if (waitfor != 0 && hosts_responded >= waitfor)
150:                     end
151:                 end
152:             rescue Interrupt => e
153:             rescue Timeout::Error => e
154:             end
155: 
156:             stat[:totaltime] = Time.now.to_f - stat[:starttime]
157:             stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
158:             stat[:responses] = hosts_responded
159:             stat[:noresponsefrom] = []
160: 
161:             @stats = stat
162:             return stat
163:         end

Sends a request and returns the generated request id, doesn‘t wait for responses and doesn‘t execute any passed in code blocks for responses

[Source]

    # File lib/mcollective/client.rb, line 40
40:         def sendreq(msg, agent, filter = {})
41:             target = Util.make_target(agent, :command, collective)
42: 
43:             reqid = Digest::MD5.hexdigest("#{@config.identity}-#{Time.now.to_f.to_s}-#{target}")
44: 
45:             # Security plugins now accept an agent and collective, ones written for <= 1.1.4 dont
46:             # but we still want to support them, try to call them in a compatible way if they
47:             # dont support the new arguments
48:             begin
49:                 req = @security.encoderequest(@config.identity, target, msg, reqid, filter, agent, collective)
50:             rescue ArgumentError
51:                 req = @security.encoderequest(@config.identity, target, msg, reqid, filter)
52:             end
53: 
54:             Log.debug("Sending request #{reqid} to #{target}")
55: 
56:             unless @subscriptions.include?(agent)
57:                 topic = Util.make_target(agent, :reply, collective)
58:                 Log.debug("Subscribing to #{topic}")
59: 
60:                 Util.subscribe(topic)
61:                 @subscriptions[agent] = 1
62:             end
63: 
64:             Timeout.timeout(2) do
65:                 @connection.send(target, req)
66:             end
67: 
68:             reqid
69:         end

[Validate]