Source code for Pyro5.core

"""
Multi purpose stuff used by both clients and servers (URI etc)

Pyro - Python Remote Objects.  Copyright by Irmen de Jong (irmen@razorvine.net).
"""

import re
import logging
import contextlib
import ipaddress
import socket
import random
import serpent
from typing import Union, Optional
from . import config, errors, socketutil, serializers


__all__ = ["URI", "DAEMON_NAME", "NAMESERVER_NAME", "resolve", "locate_ns", "type_meta"]

log = logging.getLogger("Pyro5.core")

# standard object name for the Daemon object
DAEMON_NAME = "Pyro.Daemon"

# standard name for the Name server itself
NAMESERVER_NAME = "Pyro.NameServer"


class URI(object):
    """
    Pyro object URI (universal resource identifier).
    The uri format is like this: ``PYRO:objectid@location`` where location is one of:

    - ``hostname:port`` (tcp/ip socket on given port)
    - ``./u:sockname`` (Unix domain socket on localhost)

    There is also a 'Magic format' for simple name resolution using Name server:
      ``PYRONAME:objectname[@location]``  (optional name server location, can also omit location port)
    And one that looks up things in the name server by metadata:
      ``PYROMETA:meta1,meta2,...[@location]``  (optional name server location, can also omit location port)

    You can write the protocol in lowercase if you like (``pyro:...``) but it will
    automatically be converted to uppercase internally.
    """
    uriRegEx = re.compile(r"(?P<protocol>[Pp][Yy][Rr][Oo][a-zA-Z]*):(?P<object>\S+?)(@(?P<location>.+))?$")

    def __init__(self, uri):
        if isinstance(uri, URI):
            state = uri.__getstate__()
            self.__setstate__(state)
            return
        if not isinstance(uri, str):
            raise TypeError("uri parameter object is of wrong type")
        self.sockname = self.host = self.port = None
        match = self.uriRegEx.match(uri)
        if not match:
            raise errors.PyroError("invalid uri")
        self.protocol = match.group("protocol").upper()
        self.object = match.group("object")
        location = match.group("location")
        if self.protocol == "PYRONAME":
            self._parseLocation(location, config.NS_PORT)
        elif self.protocol == "PYRO":
            if not location:
                raise errors.PyroError("invalid uri")
            self._parseLocation(location, None)
        elif self.protocol == "PYROMETA":
            self.object = set(m.strip() for m in self.object.split(","))
            self._parseLocation(location, config.NS_PORT)
        else:
            raise errors.PyroError("invalid uri (protocol)")

    def _parseLocation(self, location, defaultPort):
        if not location:
            return
        if location.startswith("./u:"):
            self.sockname = location[4:]
            if (not self.sockname) or ':' in self.sockname:
                raise errors.PyroError("invalid uri (location)")
        else:
            if location.startswith("["):  # ipv6
                if location.startswith("[["):  # possible mistake: double-bracketing
                    raise errors.PyroError("invalid ipv6 address: enclosed in too many brackets")
                ipv6locationmatch = re.match(r"\[([0-9a-fA-F:%]+)](:(\d+))?", location)
                if not ipv6locationmatch:
                    raise errors.PyroError("invalid ipv6 address: the part between brackets must be a numeric ipv6 address")
                self.host, _, self.port = ipv6locationmatch.groups()
            else:
                self.host, _, self.port = location.partition(":")
            if not self.port:
                self.port = defaultPort
            try:
                self.port = int(self.port)
            except (ValueError, TypeError):
                raise errors.PyroError("invalid port in uri, port=" + str(self.port))

    @staticmethod
    def isUnixsockLocation(location):
        """determine if a location string is for a Unix domain socket"""
        return location.startswith("./u:")

    @property
    def location(self):
        """property containing the location string, for instance ``"servername.you.com:5555"``"""
        if self.host:
            if ":" in self.host:  # ipv6
                return "[%s]:%d" % (self.host, self.port)
            else:
                return "%s:%d" % (self.host, self.port)
        elif self.sockname:
            return "./u:" + self.sockname
        else:
            return None

    def __str__(self):
        if self.protocol == "PYROMETA":
            result = "PYROMETA:" + ",".join(self.object)
        else:
            result = self.protocol + ":" + self.object
        if self.location:
            return result + "@" + self.location
        return result

    def __repr__(self):
        return "<%s.%s at 0x%x; %s>" % (self.__class__.__module__, self.__class__.__name__, id(self), str(self))

    def __eq__(self, other):
        if not isinstance(other, URI):
            return False
        return self.__getstate__() == other.__getstate__()

    def __ne__(self, other):
        return not self.__eq__(other)

    def __hash__(self):
        return hash(self.__getstate__())

    def __getstate__(self):
        return self.protocol, self.object, self.sockname, self.host, self.port

    def __setstate__(self, state):
        self.protocol, self.object, self.sockname, self.host, self.port = state


class _ExceptionWrapper(object):
    """Class that wraps a remote exception. If this is returned, Pyro will
    re-throw the exception on the receiving side. Usually this is taken care of
    by a special response message flag, but in the case of batched calls this
    flag is useless and another mechanism was needed."""

    def __init__(self, exception):
        self.exception = exception

    def raiseIt(self):
        raise self.exception

    def __serialized_dict__(self):
        """serialized form as a dictionary"""
        return {
            "__class__": "Pyro5.core._ExceptionWrapper",
            "exception": serializers.SerializerBase.class_to_dict(self.exception)
        }


# register the special serializers for the pyro objects with Serpent
serpent.register_class(URI, serializers.pyro_class_serpent_serializer)
serpent.register_class(_ExceptionWrapper, serializers.pyro_class_serpent_serializer)
serializers.SerializerBase.register_class_to_dict(URI, serializers.serialize_pyro_object_to_dict, serpent_too=False)
serializers.SerializerBase.register_class_to_dict(_ExceptionWrapper, _ExceptionWrapper.__serialized_dict__, serpent_too=False)


def resolve(uri: Union[str, URI], delay_time: float = 0.0) -> URI:
    """
    Resolve a 'magic' uri (PYRONAME, PYROMETA) into the direct PYRO uri.
    It finds a name server, and use that to resolve a PYRONAME uri into the direct PYRO uri pointing to the named object.
    If uri is already a PYRO uri, it is returned unmodified.
    You can consider this a shortcut function so that you don't have to locate and use a name server proxy yourself.
    Note: if you need to resolve more than a few names, consider using the name server directly instead of repeatedly
    calling this function, to avoid the name server lookup overhead from each call.
    You can set delay_time to the maximum number of seconds you are prepared to wait until a name registration
    becomes available in the nameserver.
    """
    if isinstance(uri, str):
        uri = URI(uri)
    elif not isinstance(uri, URI):
        raise TypeError("can only resolve Pyro URIs")
    if uri.protocol == "PYRO":
        return uri
    log.debug("resolving %s", uri)
    from . import nameserver   # doing it here to avoid circular import issues
    if uri.protocol == "PYRONAME":
        with locate_ns(uri.host, uri.port) as ns:
            return nameserver.lookup(ns, uri.object, delay_time)
    elif uri.protocol == "PYROMETA":
        with locate_ns(uri.host, uri.port) as ns:
            candidates = nameserver.yplookup(ns, uri.object, None, False, delay_time)
            if candidates:
                candidate = random.choice(list(candidates.values()))
                log.debug("resolved to candidate %s", candidate)
                return URI(candidate)
            raise errors.NamingError("no registrations available with desired metadata properties %s" % uri.object)
    else:
        raise errors.PyroError("invalid uri protocol")


[docs] def locate_ns(host: Union[str, ipaddress.IPv4Address, ipaddress.IPv6Address] = "", port: Optional[int] = None, broadcast: bool = True) -> "client.Proxy": """Get a proxy for a name server somewhere in the network.""" from . import client if not host: # first try localhost if we have a good chance of finding it there if config.NS_HOST in ("localhost", "::1") or config.NS_HOST.startswith("127."): if ":" in config.NS_HOST: # ipv6 hosts = ["[%s]" % config.NS_HOST] else: # Some systems have 127.0.1.1 in the hosts file assigned to the hostname, # so try this too (only if it's actually used as a valid ip address) try: socket.gethostbyaddr("127.0.1.1") hosts = [config.NS_HOST] if config.NS_HOST == "127.0.1.1" else [config.NS_HOST, "127.0.1.1"] except socket.error: hosts = [config.NS_HOST] for host in hosts: uristring = "PYRO:%s@%s:%d" % (NAMESERVER_NAME, host, port or config.NS_PORT) log.debug("locating the NS: %s", uristring) proxy = client.Proxy(uristring) with contextlib.suppress(errors.PyroError): proxy._pyroBind() log.debug("located NS") return proxy if config.PREFER_IP_VERSION == 6: broadcast = False # ipv6 doesn't have broadcast. We should probably use multicast.... if broadcast: # broadcast lookup if not port: port = config.NS_BCPORT log.debug("broadcast locate") sock = socketutil.create_bc_socket(reuseaddr=config.SOCK_REUSE, timeout=0.7) for _ in range(3): try: for bcaddr in config.BROADCAST_ADDRS: try: sock.sendto(b"GET_NSURI", 0, (bcaddr, port)) except socket.error as x: err = getattr(x, "errno", x.args[0]) # handle some errno's that some platforms like to throw: if err not in socketutil.ERRNO_EADDRNOTAVAIL and err not in socketutil.ERRNO_EADDRINUSE: raise data, _ = sock.recvfrom(100) sock.close() text = data.decode("iso-8859-1") log.debug("located NS: %s", text) proxy = client.Proxy(text) return proxy except socket.timeout: continue with contextlib.suppress(OSError, socket.error): sock.shutdown(socket.SHUT_RDWR) sock.close() log.debug("broadcast locate failed, try direct connection on NS_HOST") else: log.debug("skipping broadcast lookup") # broadcast failed or skipped, try PYRO directly on specific host host = config.NS_HOST port = config.NS_PORT elif not isinstance(host, str): host = str(host) # take care of the occasion where host is an ipaddress.IpAddress # pyro direct lookup port = config.NS_PORT if not port else port if URI.isUnixsockLocation(host): uristring = "PYRO:%s@%s" % (NAMESERVER_NAME, host) else: # if not a unix socket, check for ipv6 if host and ":" in str(host): host = "[%s]" % host uristring = "PYRO:%s@%s:%d" % (NAMESERVER_NAME, host, port) uri = URI(uristring) log.debug("locating the NS: %s", uri) proxy = client.Proxy(uri) try: proxy._pyroBind() log.debug("located NS") return proxy except errors.PyroError as x: raise errors.NamingError("Failed to locate the nameserver") from x
def type_meta(class_or_object, prefix="class:"): """extracts type metadata from the given class or object, can be used as Name server metadata.""" if hasattr(class_or_object, "__mro__"): return {prefix + c.__module__ + "." + c.__name__ for c in class_or_object.__mro__ if c.__module__ not in ("builtins", "__builtin__")} if hasattr(class_or_object, "__class__"): return type_meta(class_or_object.__class__) return frozenset()