Coverage for hooks/nrpe_helpers.py : 18%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Nrpe helpers module."""
2import glob
3import ipaddress
4import os
5import socket
6import subprocess
8from charmhelpers.core import hookenv
9from charmhelpers.core.host import is_container
10from charmhelpers.core.services import helpers
12import yaml
15NETLINKS_ERROR = False
18class InvalidCustomCheckException(Exception):
19 """Custom exception for Invalid nrpe check."""
21 pass
24class Monitors(dict):
25 """List of checks that a remote Nagios can query."""
27 def __init__(self, version="0.3"):
28 """Build monitors structure."""
29 self["monitors"] = {"remote": {"nrpe": {}}}
30 self["version"] = version
32 def add_monitors(self, mdict, monitor_label="default"):
33 """Add monitors passed in mdict."""
34 if not mdict or not mdict.get("monitors"):
35 return
37 for checktype in mdict["monitors"].get("remote", []):
38 check_details = mdict["monitors"]["remote"][checktype]
39 if self["monitors"]["remote"].get(checktype):
40 self["monitors"]["remote"][checktype].update(check_details)
41 else:
42 self["monitors"]["remote"][checktype] = check_details
44 for checktype in mdict["monitors"].get("local", []):
45 check_details = self.convert_local_checks(
46 mdict["monitors"]["local"],
47 monitor_label,
48 )
49 self["monitors"]["remote"]["nrpe"].update(check_details)
51 def add_nrpe_check(self, check_name, command):
52 """Add nrpe check to remote monitors."""
53 self["monitors"]["remote"]["nrpe"][check_name] = command
55 def convert_local_checks(self, monitors, monitor_src):
56 """Convert check from local checks to remote nrpe checks.
58 monitors -- monitor dict
59 monitor_src -- Monitor source principal, subordinate or user
60 """
61 mons = {}
62 for checktype in monitors.keys():
63 for checkname in monitors[checktype]:
64 try:
65 check_def = NRPECheckCtxt(
66 checktype,
67 monitors[checktype][checkname],
68 monitor_src,
69 )
70 mons[check_def["cmd_name"]] = {"command": check_def["cmd_name"]}
71 except InvalidCustomCheckException as e:
72 hookenv.log(
73 "Error encountered configuring local check "
74 '"{check}": {err}'.format(check=checkname, err=str(e)),
75 hookenv.ERROR,
76 )
77 return mons
80def get_local_ingress_address(binding):
81 """Get ingress IP address for a binding.
83 binding - e.g. 'monitors'
84 """
85 # using network-get to retrieve the address details if available.
86 hookenv.log("Getting ingress IP address for binding %s" % binding)
87 try:
88 network_info = hookenv.network_get(binding)
89 if network_info is not None and "ingress-addresses" in network_info:
90 hookenv.log("Using ingress-addresses")
91 # workaround lp#1897261
92 try:
93 ip_address = network_info["bind-addresses"][0]["addresses"][0][
94 "address"
95 ]
96 except KeyError:
97 # ignore KeyError and populate ip_address per old method
98 ip_address = None
99 if ip_address not in network_info["ingress-addresses"]:
100 ip_address = network_info["ingress-addresses"][0]
101 hookenv.log(ip_address)
102 return ip_address
103 except (NotImplementedError, FileNotFoundError):
104 # We'll fallthrough to the Pre 2.3 code below.
105 pass
107 # Pre 2.3 output
108 try:
109 ip_address = hookenv.network_get_primary_address(binding)
110 hookenv.log("Using primary-addresses")
111 except NotImplementedError:
112 # pre Juju 2.0
113 ip_address = hookenv.unit_private_ip()
114 hookenv.log("Using unit_private_ip")
115 hookenv.log(ip_address)
116 return ip_address
119class MonitorsRelation(helpers.RelationContext):
120 """Define a monitors relation."""
122 name = "monitors"
123 interface = "monitors"
125 def __init__(self, *args, **kwargs):
126 """Build superclass and principal relation."""
127 self.principal_relation = PrincipalRelation()
128 super(MonitorsRelation, self).__init__(*args, **kwargs)
130 def is_ready(self):
131 """Return true if the principal relation is ready."""
132 return self.principal_relation.is_ready()
134 def get_subordinate_monitors(self):
135 """Return default monitors defined by this charm."""
136 monitors = Monitors()
137 for check in SubordinateCheckDefinitions()["checks"]:
138 if check["cmd_params"]:
139 monitors.add_nrpe_check(check["cmd_name"], check["cmd_name"])
140 return monitors
142 def get_user_defined_monitors(self):
143 """Return monitors defined by monitors config option."""
144 monitors = Monitors()
145 monitors.add_monitors(yaml.safe_load(hookenv.config("monitors")), "user")
146 return monitors
148 def get_principal_monitors(self):
149 """Return monitors passed by relation with principal."""
150 return self.principal_relation.get_monitors()
152 def get_monitor_dicts(self):
153 """Return all monitor dicts."""
154 monitor_dicts = {
155 "principal": self.get_principal_monitors(),
156 "subordinate": self.get_subordinate_monitors(),
157 "user": self.get_user_defined_monitors(),
158 }
159 return monitor_dicts
161 def get_monitors(self):
162 """Return monitor dict.
164 All monitors merged together and local
165 monitors converted to remote nrpe checks.
166 """
167 all_monitors = Monitors()
168 monitors = [
169 self.get_principal_monitors(),
170 self.get_subordinate_monitors(),
171 self.get_user_defined_monitors(),
172 ]
173 for mon in monitors:
174 all_monitors.add_monitors(mon)
175 return all_monitors
177 def egress_subnets(self, relation_data):
178 """Return egress subnets.
180 This behaves the same as charmhelpers.core.hookenv.egress_subnets().
181 If it can't determine the egress subnets it will fall back to
182 ingress-address or finally private-address.
183 """
184 if "egress-subnets" in relation_data:
185 return relation_data["egress-subnets"]
186 if "ingress-address" in relation_data:
187 return relation_data["ingress-address"]
188 return relation_data["private-address"]
190 def get_data(self):
191 """Get relation data."""
192 super(MonitorsRelation, self).get_data()
193 if not hookenv.relation_ids(self.name):
194 return
195 # self['monitors'] comes from the superclass helpers.RelationContext
196 # and contains relation data for each 'monitors' relation (to/from
197 # Nagios).
198 subnets = [self.egress_subnets(info) for info in self["monitors"]]
199 self["monitor_allowed_hosts"] = ",".join(subnets)
201 def provide_data(self):
202 """Return relation info."""
203 address = get_local_ingress_address("monitors")
205 relation_info = {
206 "target-id": self.principal_relation.nagios_hostname(),
207 "monitors": self.get_monitors(),
208 "private-address": address,
209 "ingress-address": address,
210 "machine_id": os.environ["JUJU_MACHINE_ID"],
211 "model_id": hookenv.model_uuid(),
212 }
213 return relation_info
216class PrincipalRelation(helpers.RelationContext):
217 """Define a principal relation."""
219 def __init__(self, *args, **kwargs):
220 """Set name and interface."""
221 if hookenv.relations_of_type("nrpe-external-master"):
222 self.name = "nrpe-external-master"
223 self.interface = "nrpe-external-master"
224 elif hookenv.relations_of_type("general-info"):
225 self.name = "general-info"
226 self.interface = "juju-info"
227 elif hookenv.relations_of_type("local-monitors"):
228 self.name = "local-monitors"
229 self.interface = "local-monitors"
230 super(PrincipalRelation, self).__init__(*args, **kwargs)
232 def is_ready(self):
233 """Return true if the relation is connected."""
234 if self.name not in self:
235 return False
236 return "__unit__" in self[self.name][0]
238 def nagios_hostname(self):
239 """Return the string that nagios will use to identify this host."""
240 host_context = hookenv.config("nagios_host_context")
241 if host_context:
242 host_context += "-"
243 hostname_type = hookenv.config("nagios_hostname_type")
245 # Detect bare metal hosts
246 if hostname_type == "auto":
247 is_metal = "none" in subprocess.getoutput("/usr/bin/systemd-detect-virt")
248 if is_metal:
249 hostname_type = "host"
250 else:
251 hostname_type = "unit"
253 if hostname_type == "host" or not self.is_ready():
254 nagios_hostname = "{}{}".format(host_context, socket.gethostname())
255 return nagios_hostname
256 else:
257 principal_unitname = hookenv.principal_unit()
258 # Fallback to using "primary" if it exists.
259 if not principal_unitname:
260 for relunit in self[self.name]:
261 if relunit.get("primary", "False").lower() == "true":
262 principal_unitname = relunit["__unit__"]
263 break
264 nagios_hostname = "{}{}".format(host_context, principal_unitname)
265 nagios_hostname = nagios_hostname.replace("/", "-")
266 return nagios_hostname
268 def get_monitors(self):
269 """Return monitors passed by services on the self.interface relation."""
270 if not self.is_ready():
271 return
272 monitors = Monitors()
273 for rel in self[self.name]:
274 if rel.get("monitors"):
275 monitors.add_monitors(yaml.load(rel["monitors"]), "principal")
276 return monitors
278 def provide_data(self):
279 """Return nagios hostname and nagios host context."""
280 # Provide this data to principals because get_nagios_hostname expects
281 # them in charmhelpers/contrib/charmsupport/nrpe when writing principal
282 # service__* files
283 return {
284 "nagios_hostname": self.nagios_hostname(),
285 "nagios_host_context": hookenv.config("nagios_host_context"),
286 }
289class NagiosInfo(dict):
290 """Define a NagiosInfo dict."""
292 def __init__(self):
293 """Set principal relation and dict values."""
294 self.principal_relation = PrincipalRelation()
295 self["external_nagios_master"] = "127.0.0.1"
296 if hookenv.config("nagios_master") != "None":
297 self["external_nagios_master"] = "{},{}".format(
298 self["external_nagios_master"], hookenv.config("nagios_master")
299 )
300 self["nagios_hostname"] = self.principal_relation.nagios_hostname()
302 address = None
303 if hookenv.config("nagios_address_type").lower() == "public":
304 address = hookenv.unit_get("public-address")
305 elif hookenv.config("nagios_master") != "None":
306 # Try to work out the correct interface/IP. We can't use both
307 # network-get nor 'unit-get private-address' because both can
308 # return the wrong IP on systems with more than one interface
309 # (LP: #1736050).
310 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
311 s.connect((hookenv.config("nagios_master").split(",")[0], 80))
312 address = s.getsockname()[0]
313 s.close()
314 # Fallback to unit-get private-address
315 if not address:
316 address = hookenv.unit_get("private-address")
318 self["nagios_ipaddress"] = address
319 self["nrpe_ipaddress"] = get_local_ingress_address("monitors")
321 self["dont_blame_nrpe"] = "1" if hookenv.config("dont_blame_nrpe") else "0"
322 self["debug"] = "1" if hookenv.config("debug") else "0"
325class RsyncEnabled(helpers.RelationContext):
326 """Define a relation context for rsync enabled relation."""
328 def __init__(self):
329 """Set export_nagios_definitions."""
330 self["export_nagios_definitions"] = hookenv.config("export_nagios_definitions")
331 if (
332 hookenv.config("nagios_master")
333 and hookenv.config("nagios_master") != "None"
334 ):
335 self["export_nagios_definitions"] = True
337 def is_ready(self):
338 """Return true if relation is ready."""
339 return self["export_nagios_definitions"]
342class NRPECheckCtxt(dict):
343 """Convert a local monitor definition.
345 Create a dict needed for writing the nrpe check definition.
346 """
348 def __init__(self, checktype, check_opts, monitor_src):
349 """Set dict values."""
350 plugin_path = "/usr/lib/nagios/plugins"
351 if checktype == "procrunning":
352 self["cmd_exec"] = plugin_path + "/check_procs"
353 self["description"] = "Check process {executable} is running".format(
354 **check_opts
355 )
356 self["cmd_name"] = "check_proc_" + check_opts["executable"]
357 self["cmd_params"] = "-w {min} -c {max} -C {executable}".format(
358 **check_opts
359 )
360 elif checktype == "processcount":
361 self["cmd_exec"] = plugin_path + "/check_procs"
362 self["description"] = "Check process count"
363 self["cmd_name"] = "check_proc_principal"
364 if "min" in check_opts:
365 self["cmd_params"] = "-w {min} -c {max}".format(**check_opts)
366 else:
367 self["cmd_params"] = "-c {max}".format(**check_opts)
368 elif checktype == "disk":
369 self["cmd_exec"] = plugin_path + "/check_disk"
370 self["description"] = "Check disk usage " + check_opts["path"].replace(
371 "/", "_"
372 )
373 self["cmd_name"] = "check_disk_principal"
374 self["cmd_params"] = "-w 20 -c 10 -p " + check_opts["path"]
375 elif checktype == "custom":
376 custom_path = check_opts.get("plugin_path", plugin_path)
377 if not custom_path.startswith(os.path.sep):
378 custom_path = os.path.join(os.path.sep, custom_path)
379 if not os.path.isdir(custom_path):
380 raise InvalidCustomCheckException(
381 'Specified plugin_path "{}" does not exist or is not a '
382 "directory.".format(custom_path)
383 )
384 check = check_opts["check"]
385 self["cmd_exec"] = os.path.join(custom_path, check)
386 self["description"] = check_opts.get("desc", "Check %s" % check)
387 self["cmd_name"] = check
388 self["cmd_params"] = check_opts.get("params", "") or ""
389 self["description"] += " ({})".format(monitor_src)
390 self["cmd_name"] += "_" + monitor_src
393class SubordinateCheckDefinitions(dict):
394 """Return dict of checks the charm configures."""
396 def __init__(self):
397 """Set dict values."""
398 self.procs = self.proc_count()
399 load_thresholds = self._get_load_thresholds()
400 proc_thresholds = self._get_proc_thresholds()
401 disk_root_thresholds = self._get_disk_root_thresholds()
403 pkg_plugin_dir = "/usr/lib/nagios/plugins/"
404 local_plugin_dir = "/usr/local/lib/nagios/plugins/"
405 checks = [
406 {
407 "description": "Number of Zombie processes",
408 "cmd_name": "check_zombie_procs",
409 "cmd_exec": pkg_plugin_dir + "check_procs",
410 "cmd_params": hookenv.config("zombies"),
411 },
412 {
413 "description": "Number of processes",
414 "cmd_name": "check_total_procs",
415 "cmd_exec": pkg_plugin_dir + "check_procs",
416 "cmd_params": proc_thresholds,
417 },
418 {
419 "description": "Number of Users",
420 "cmd_name": "check_users",
421 "cmd_exec": pkg_plugin_dir + "check_users",
422 "cmd_params": hookenv.config("users"),
423 },
424 {
425 "description": "Connnection tracking table",
426 "cmd_name": "check_conntrack",
427 "cmd_exec": local_plugin_dir + "check_conntrack.sh",
428 "cmd_params": hookenv.config("conntrack"),
429 },
430 ]
432 if not is_container():
433 checks.extend(
434 [
435 {
436 "description": "Root disk",
437 "cmd_name": "check_disk_root",
438 "cmd_exec": pkg_plugin_dir + "check_disk",
439 "cmd_params": disk_root_thresholds,
440 },
441 {
442 "description": "System Load",
443 "cmd_name": "check_load",
444 "cmd_exec": pkg_plugin_dir + "check_load",
445 "cmd_params": load_thresholds,
446 },
447 {
448 "description": "Swap",
449 "cmd_name": "check_swap",
450 "cmd_exec": pkg_plugin_dir + "check_swap",
451 "cmd_params": hookenv.config("swap").strip(),
452 },
453 # Note: check_swap_activity *must* be listed after check_swap, else
454 # check_swap_activity will be removed during installation of
455 # check_swap.
456 {
457 "description": "Swap Activity",
458 "cmd_name": "check_swap_activity",
459 "cmd_exec": local_plugin_dir + "check_swap_activity",
460 "cmd_params": hookenv.config("swap_activity"),
461 },
462 {
463 "description": "Memory",
464 "cmd_name": "check_mem",
465 "cmd_exec": local_plugin_dir + "check_mem.pl",
466 "cmd_params": hookenv.config("mem"),
467 },
468 {
469 "description": "XFS Errors",
470 "cmd_name": "check_xfs_errors",
471 "cmd_exec": local_plugin_dir + "check_xfs_errors.py",
472 "cmd_params": hookenv.config("xfs_errors"),
473 },
474 {
475 "description": "ARP cache entries",
476 "cmd_name": "check_arp_cache",
477 "cmd_exec": os.path.join(
478 local_plugin_dir, "check_arp_cache.py"
479 ),
480 "cmd_params": "-w 60 -c 80",
481 },
482 ]
483 )
485 ro_filesystem_excludes = hookenv.config("ro_filesystem_excludes")
486 if ro_filesystem_excludes == "":
487 # specify cmd_params = '' to disable/remove the check from nrpe
488 check_ro_filesystem = {
489 "description": "Readonly filesystems",
490 "cmd_name": "check_ro_filesystem",
491 "cmd_exec": os.path.join(
492 local_plugin_dir, "check_ro_filesystem.py"
493 ),
494 "cmd_params": "",
495 }
496 else:
497 check_ro_filesystem = {
498 "description": "Readonly filesystems",
499 "cmd_name": "check_ro_filesystem",
500 "cmd_exec": os.path.join(
501 local_plugin_dir, "check_ro_filesystem.py"
502 ),
503 "cmd_params": "-e {}".format(
504 hookenv.config("ro_filesystem_excludes")
505 ),
506 }
507 checks.append(check_ro_filesystem)
509 if hookenv.config("lacp_bonds").strip():
510 for bond_iface in hookenv.config("lacp_bonds").strip().split():
511 if os.path.exists("/sys/class/net/{}".format(bond_iface)):
512 description = "LACP Check {}".format(bond_iface)
513 cmd_name = "check_lacp_{}".format(bond_iface)
514 cmd_exec = local_plugin_dir + "check_lacp_bond.py"
515 cmd_params = "-i {}".format(bond_iface)
516 lacp_check = {
517 "description": description,
518 "cmd_name": cmd_name,
519 "cmd_exec": cmd_exec,
520 "cmd_params": cmd_params,
521 }
522 checks.append(lacp_check)
524 if hookenv.config("netlinks"):
525 ifaces = yaml.safe_load(hookenv.config("netlinks"))
526 cmd_exec = local_plugin_dir + "check_netlinks.py"
527 if hookenv.config("netlinks_skip_unfound_ifaces"):
528 cmd_exec += " --skip-unfound-ifaces"
529 d_ifaces = self.parse_netlinks(ifaces)
530 for iface in d_ifaces:
531 description = "Netlinks status ({})".format(iface)
532 cmd_name = "check_netlinks_{}".format(iface)
533 cmd_params = d_ifaces[iface]
534 netlink_check = {
535 "description": description,
536 "cmd_name": cmd_name,
537 "cmd_exec": cmd_exec,
538 "cmd_params": cmd_params,
539 }
540 checks.append(netlink_check)
542 # Checking if CPU governor is supported by the system and add nrpe check
543 cpu_governor_paths = "/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor"
544 cpu_governor_supported = glob.glob(cpu_governor_paths)
545 requested_cpu_governor = hookenv.relation_get("requested_cpu_governor")
546 cpu_governor_config = hookenv.config("cpu_governor")
547 wanted_cpu_governor = cpu_governor_config or requested_cpu_governor
548 if wanted_cpu_governor and cpu_governor_supported:
549 description = "Check CPU governor scaler"
550 cmd_name = "check_cpu_governor"
551 cmd_exec = local_plugin_dir + "check_cpu_governor.py"
552 cmd_params = "--governor {}".format(wanted_cpu_governor)
553 cpu_governor_check = {
554 "description": description,
555 "cmd_name": cmd_name,
556 "cmd_exec": cmd_exec,
557 "cmd_params": cmd_params,
558 }
559 checks.append(cpu_governor_check)
561 self["checks"] = []
562 sub_postfix = str(hookenv.config("sub_postfix"))
563 # Automatically use _sub for checks shipped on a unit with the nagios
564 # charm. Mostly for backwards compatibility.
565 principal_unit = hookenv.principal_unit()
566 if sub_postfix == "" and principal_unit:
567 md = hookenv._metadata_unit(principal_unit)
568 if md and md.pop("name", None) == "nagios":
569 sub_postfix = "_sub"
570 nrpe_config_sub_tmpl = "/etc/nagios/nrpe.d/{}_*.cfg"
571 nrpe_config_tmpl = "/etc/nagios/nrpe.d/{}.cfg"
572 for check in checks:
573 # This can be used to clean up old files before rendering the new
574 # ones
575 nrpe_configfiles_sub = nrpe_config_sub_tmpl.format(check["cmd_name"])
576 nrpe_configfiles = nrpe_config_tmpl.format(check["cmd_name"])
577 check["matching_files"] = glob.glob(nrpe_configfiles_sub)
578 check["matching_files"].extend(glob.glob(nrpe_configfiles))
579 check["description"] += " (sub)"
580 check["cmd_name"] += sub_postfix
581 self["checks"].append(check)
583 def _get_proc_thresholds(self):
584 """Return suitable processor thresholds."""
585 if hookenv.config("procs") == "auto":
586 proc_thresholds = "-k -w {} -c {}".format(
587 25 * self.procs + 100, 50 * self.procs + 100
588 )
589 else:
590 proc_thresholds = hookenv.config("procs")
591 return proc_thresholds
593 def _get_load_thresholds(self):
594 """Return suitable load thresholds."""
595 if hookenv.config("load") == "auto":
596 # Give 1min load alerts higher thresholds than 15 min load alerts
597 warn_multipliers = (4, 2, 1)
598 crit_multipliers = (8, 4, 2)
599 load_thresholds = ("-w %s -c %s") % (
600 ",".join([str(m * self.procs) for m in warn_multipliers]),
601 ",".join([str(m * self.procs) for m in crit_multipliers]),
602 )
603 else:
604 load_thresholds = hookenv.config("load")
605 return load_thresholds
607 def _get_disk_root_thresholds(self):
608 """Return suitable disk thresholds."""
609 if hookenv.config("disk_root"):
610 disk_root_thresholds = hookenv.config("disk_root") + " -p / "
611 else:
612 disk_root_thresholds = ""
613 return disk_root_thresholds
615 def proc_count(self):
616 """Return number number of processing units."""
617 return int(subprocess.check_output(["nproc", "--all"]))
619 def parse_netlinks(self, ifaces):
620 """Parse a list of strings, or a single string.
622 Looks if the interfaces exist and configures extra parameters (or
623 properties) -> ie. ['mtu:9000', 'speed:1000', 'op:up']
624 """
625 iface_path = "/sys/class/net/{}"
626 props_dict = {"mtu": "-m {}", "speed": "-s {}", "op": "-o {}"}
627 if type(ifaces) == str:
628 ifaces = [ifaces]
630 d_ifaces = {}
631 for iface in ifaces:
632 iface_props = iface.strip().split()
633 # no ifaces defined; SKIP
634 if len(iface_props) == 0:
635 continue
637 target = iface_props[0]
638 try:
639 matches = match_cidr_to_ifaces(target)
640 except Exception as e:
641 # Log likely unintentional errors and set flag for blocked status,
642 # if appropriate.
643 if isinstance(e, ValueError) and "has host bits set" in e.args[0]:
644 hookenv.log(
645 "Error parsing netlinks: {}".format(e.args[0]),
646 level=hookenv.ERROR,
647 )
648 set_netlinks_error()
649 # Treat target as explicit interface name
650 matches = [target]
652 iface_devs = [
653 target
654 for target in matches
655 if os.path.exists(iface_path.format(target))
656 ]
657 # no ifaces found; SKIP
658 if not iface_devs:
659 continue
661 # parse extra parameters (properties)
662 del iface_props[0]
663 extra_params = ""
664 for prop in iface_props:
665 # wrong format (key:value); SKIP
666 if prop.find(":") < 0:
667 continue
669 # only one ':' expected
670 kv = prop.split(":")
671 if len(kv) == 2 and kv[0].lower() in props_dict:
672 extra_params += " "
673 extra_params += props_dict[kv[0].lower()].format(kv[1])
675 for iface_dev in iface_devs:
676 d_ifaces[iface_dev] = "-i {}{}".format(iface_dev, extra_params)
677 return d_ifaces
680def match_cidr_to_ifaces(cidr):
681 """Use CIDR expression to search for matching network adapters.
683 Returns a list of adapter names.
684 """
685 import netifaces # Avoid import error before this dependency gets installed
687 network = ipaddress.IPv4Network(cidr)
688 matches = []
689 for adapter in netifaces.interfaces():
690 ipv4_addr_structs = netifaces.ifaddresses(adapter).get(netifaces.AF_INET, [])
691 addrs = [
692 ipaddress.IPv4Address(addr_struct["addr"])
693 for addr_struct in ipv4_addr_structs
694 ]
695 if any(addr in network for addr in addrs):
696 matches.append(adapter)
697 return matches
700def has_netlinks_error():
701 """Return True in case of netlinks related errors."""
702 return NETLINKS_ERROR
705def set_netlinks_error():
706 """Set the flag indicating a netlinks related error."""
707 global NETLINKS_ERROR
708 NETLINKS_ERROR = True