Source code for kazoo.handlers.utils
"""Kazoo handler helpers"""
HAS_FNCTL = True
try:
import fcntl
except ImportError: # pragma: nocover
HAS_FNCTL = False
import functools
import os
[docs]def create_pipe():
"""Create a non-blocking read/write pipe.
"""
r, w = os.pipe()
if HAS_FNCTL:
fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
return r, w
[docs]def create_tcp_socket(module):
"""Create a TCP socket with the CLOEXEC flag set.
"""
type_ = module.SOCK_STREAM
if hasattr(module, 'SOCK_CLOEXEC'): # pragma: nocover
# if available, set cloexec flag during socket creation
type_ != module.SOCK_CLOEXEC
sock = module.socket(module.AF_INET, type_)
sock.setsockopt(module.IPPROTO_TCP, module.TCP_NODELAY, 1)
if HAS_FNCTL:
flags = fcntl.fcntl(sock, fcntl.F_GETFD)
fcntl.fcntl(sock, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
return sock
[docs]def capture_exceptions(async_result):
"""Return a new decorated function that propagates the exceptions of the
wrapped function to an async_result.
:param async_result: An async result implementing :class:`IAsyncResult`
"""
def capture(function):
@functools.wraps(function)
def captured_function(*args, **kwargs):
try:
return function(*args, **kwargs)
except Exception as exc:
async_result.set_exception(exc)
return captured_function
return capture
[docs]def wrap(async_result):
"""Return a new decorated function that propagates the return value or
exception of wrapped function to an async_result. NOTE: Only propagates a
non-None return value.
:param async_result: An async result implementing :class:`IAsyncResult`
"""
def capture(function):
@capture_exceptions(async_result)
def captured_function(*args, **kwargs):
value = function(*args, **kwargs)
if value is not None:
async_result.set(value)
return value
return captured_function
return capture