# Copyright © PyroLab Project Contributors
# Licensed under the terms of the GNU GPLv3+ License
# (see pyrolab/__init__.py for details)
"""
Server Resources
----------------
The scripts used for putting up and taking down Daemons and Workers from the
multiprocessing module. Since child processes need to be able to import the
script containing the target function, the target functions are contained
within this module where no new processes are ever spawned. The prevents
the recursive creation of new processes as the module is imported.
Server Resource Manager
-----------------------
The server resource manager handles the putting up and taking down of Daemons
for various instruments. To increase computer speed and processing
capabilities in cases where multiple instruments are hosted from the same
computer, each instrument is created in its own Daemon using the
Python ``multiprocessing`` module.
"""
from __future__ import annotations
import logging
import multiprocessing
import threading
import time
from datetime import datetime
from multiprocessing import current_process
from multiprocessing.queues import Queue
from typing import TYPE_CHECKING, Dict, Tuple
from Pyro5.core import locate_ns
from pyrolab import RUNTIME_CONFIG
from pyrolab.configure import GlobalConfiguration, PyroLabConfiguration
from pyrolab.nameserver import start_ns_loop
if TYPE_CHECKING:
from Pyro5.core import URI
from pyrolab.configure import (
DaemonConfiguration,
NameServerConfiguration,
ServiceConfiguration,
)
from pyrolab.server import Daemon
log = logging.getLogger(__name__)
[docs]
class NameServerRunner(multiprocessing.Process):
"""
A process for running nameservers using Python's ``multiprocessing``.
Advantages of using a child Process include the fact that if a server
dies or hangs up, the entire program doesn't stall or need to be restarted,
just the process that contained the server. Thus errors can be handled
and servers autonomously restarted and managed.
Parameters
----------
name : str
The name of the nameserver being run.
nsconfig : NameServerConfiguration
The configuration for the nameserver.
msg_queue : multiprocessing.Queue
A message queue. ResourceRunner listens for when ``None`` is placed
in the queue, which is a sentinel value to shutdown the process.
msg_polling : float, optional
The time in seconds between polling the message queue.
"""
def __init__(
self,
*args,
name: str = "",
nsconfig: NameServerConfiguration = None,
msg_queue: Queue = None,
msg_polling: float = 1.0,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
if not name:
raise ValueError("NameServerRunner requires a name")
if not nsconfig:
raise ValueError("NameServerRunner requires a NameServerConfiguration")
if not msg_queue:
raise ValueError("NameServerRunner requires a message queue")
self.name = name
self.msg_queue: Queue = msg_queue
self.msg_polling = msg_polling
self.nsconfig = nsconfig
self.KILL_SIGNAL = False
[docs]
def process_message_queue(self) -> None:
"""
A message handler.
if the sentinel value ``None`` is placed in the message queue, sets
a flag signifying a shutdown signal has been received.
"""
self._timer = threading.Timer(self.msg_polling, self.process_message_queue)
self._timer.daemon = True
self._timer.start()
if not self.msg_queue.empty():
msg = self.msg_queue.get()
log.debug(f"Message received: '{msg}'")
if msg is None:
log.info("KILL message received for nameserver '%s'", self.name)
self.KILL_SIGNAL = True
[docs]
def stay_alive(self) -> bool:
"""
A callback listener; if the sentinel value ``None`` is placed in the
message queue, returns True signifying a shutdown signal has been
received.
This function is called by the Daemon's ``requestLoop()``.
Returns
-------
bool
True if shutdown signal received, False otherwise.
"""
log.debug(f"Stay alive='{not self.KILL_SIGNAL}'")
return not self.KILL_SIGNAL
[docs]
def run(self) -> None:
"""
Creates and runs the child process.
When the kill signal is received, gracefully shuts down and removes
its registration from the nameserver.
"""
log.info("Starting nameserver '%s'", self.name)
# Configure the Pyro5 environment
self.nsconfig.update_pyro_config()
# Start the thread that checks for messages
self.process_message_queue()
# Begin looping
start_ns_loop(self.nsconfig, loop_condition=self.stay_alive)
[docs]
class DaemonRunner(multiprocessing.Process):
"""
A process for running server daemons using Python's ``multiprocessing``.
Advantages of using a ResourceRunner include the fact that if a server
dies or hangs up, the entire program doesn't stall or need to be restarted,
just the process that contained the server. Thus errors can be handled
and servers autonomously restarted and managed.
Other advantages should include speed; splitting servers across processors
means that resource-heavy instruments won't bog down adjacent instruments.
Parameters
----------
name : str
The name of the daemon being run.
daemonconfig : DaemonConfiguration
The configuration for the daemon.
serviceconfigs : Dict[str, ServiceConfiguration]
The configuration for the services belonging to the given daemon.
msg_queue : multiprocessing.Queue
A message queue. ResourceRunner listens for when "None" is placed
in the queue, which is a sentinel value to shutdown the process.
msg_polling : float, optional
The time in seconds between polling the message queue.
"""
def __init__(
self,
*args,
name: str,
daemonconfig: DaemonConfiguration,
serviceconfigs: Dict[str, ServiceConfiguration],
msg_queue: Queue,
shared_uris: dict[str, URI],
msg_polling: float = 1.0,
**kwargs,
) -> None:
log.debug("Building DaemonRunner")
super().__init__(*args, **kwargs)
self.name = name
self.msg_queue: Queue = msg_queue
self.uris = shared_uris
self.msg_polling = msg_polling
self.daemonconfig = daemonconfig
self.serviceconfigs = serviceconfigs
self.KILL_SIGNAL = False
[docs]
def setup_daemon(self) -> Tuple[Daemon, Dict[str, URI]]:
"""
Locates and loads the Daemon class, adds Pyro's ``behavior``, and
registers the hosted object with the Daemon.
Returns
-------
daemon, uri
The instantiated Daemon object and the URI for the hosted object,
to be registered with the nameserver.
"""
daemon = self.daemonconfig._get_daemon()
daemon = daemon()
uris = {}
for sname, sconfig in self.serviceconfigs.items():
log.info(f"Registering service '{sname}'")
service = sconfig._get_service()
log.debug("Getting service uri")
uri = daemon.register(service)
uris[sname] = uri
log.info("URI for '%s' = %s", sname, uri)
if self.daemonconfig.nameservers:
log.debug("Self-registering daemon")
uri = daemon.register(daemon)
uris[self.name] = uri
self.uris.clear()
self.uris.update(uris)
return daemon, uris
[docs]
def process_message_queue(self) -> None:
"""
A message handler.
if the sentinel value ``None`` is placed in the message queue, sets
a flag signifying a shutdown signal has been received.
"""
self._timer = threading.Timer(self.msg_polling, self.process_message_queue)
self._timer.daemon = True
self._timer.start()
if not self.msg_queue.empty():
msg = self.msg_queue.get()
if msg is None:
log.info("KILL message received for daemon '%s'", self.name)
self.KILL_SIGNAL = True
[docs]
def stay_alive(self) -> bool:
"""
A callback listener; if the sentinel value ``None`` is placed in the
message queue, returns True signifying a shutdown signal has been
received.
This function is called by the Daemon's ``requestLoop()``.
Returns
-------
bool
True if shutdown signal received, False otherwise.
"""
return not self.KILL_SIGNAL
[docs]
def run(self) -> None:
"""
Creates and runs the child process.
When the kill signal is received, gracefully shuts down and removes
its registration from the nameserver.
"""
log.info("Starting daemon '%s'", self.name)
# Set Pyro5 settings for daemon
self.daemonconfig.update_pyro_config()
daemon, uris = self.setup_daemon()
GLOBAL_CONFIG = PyroLabConfiguration.from_file(RUNTIME_CONFIG)
# Register all services with the nameserver
log.debug("Registering services with nameserver")
for sname, sinfo in self.serviceconfigs.items():
for ns in sinfo.nameservers:
nscfg = GLOBAL_CONFIG.nameservers[ns]
try:
log.debug(
f"Attempting to register '{sname}' with nameserver '{ns}' at {nscfg.host}:{nscfg.ns_port}"
)
ns = locate_ns(nscfg.host, nscfg.ns_port)
ns.register(sname, uris[sname], metadata={sinfo.description})
except Exception as e:
log.exception(e)
raise e
log.debug("All registrations completed")
for ns in self.daemonconfig.nameservers:
nscfg = GLOBAL_CONFIG.nameservers[ns]
ns = locate_ns(nscfg.host, nscfg.ns_port)
description = (
f"Daemon for {', '.join([str(sname) for sname in self.serviceconfigs])}"
)
ns.register(self.name, uris[self.name], metadata={description})
# Start the request loop
self.process_message_queue()
log.debug("entering request loop for '%s'", self.name)
daemon.requestLoop(loopCondition=self.stay_alive)
log.debug("requestloop exited for '%s'", self.name)
# Cleanup
log.info("Daemon '%s' is shutting down.", self.name)
self._timer.cancel()
for sname, sinfo in self.serviceconfigs.items():
for ns in sinfo.nameservers:
nscfg = GLOBAL_CONFIG.nameservers[ns]
try:
ns = locate_ns(nscfg.host, nscfg.ns_port)
ns.remove(sname)
except Exception as e:
log.exception(e)
for ns in self.daemonconfig.nameservers:
nscfg = GLOBAL_CONFIG.nameservers[ns]
try:
ns = locate_ns(nscfg.host, nscfg.ns_port)
ns.remove(self.name)
except Exception as e:
log.exception(e)
[docs]
class NameServerProcessGroup:
def __init__(
self,
process: NameServerRunner,
msg_queue: Queue,
created: datetime,
) -> None:
self.process = process
self.msg_queue = msg_queue
self.created = created
[docs]
class DaemonProcessGroup:
def __init__(
self,
process: DaemonRunner,
msg_queue: Queue,
created: datetime,
shared_uris: dict[str, URI],
) -> None:
self.process = process
self.msg_queue = msg_queue
self.created = created
self.shared_uris = shared_uris
[docs]
class ProcessManager:
"""
A manager class for running a set of PyroLab processes.
ProcessManager is a singleton. Access the global object by calling
:py:func:`instance`. Only the main process can access the ProcessManager.
"""
_instance = None
nameservers: Dict[str, NameServerProcessGroup] = {}
daemons: Dict[str, DaemonProcessGroup] = {}
GLOBAL_CONFIG: GlobalConfiguration
manager: multiprocessing.Manager
_timer: threading.Timer
def __init__(self) -> None:
raise RuntimeError(
"Cannot directly instantiate singleton, call ``instance()`` instead."
)
[docs]
@classmethod
def instance(cls) -> "ProcessManager":
"""
Get the singleton instance of the ProcessManager.
Only the main process can access the ProcessManager.
Returns
-------
ProcessManager
The singleton instance of the ProcessManager.
Raises
------
RuntimeError
If the requesting process is not the main process.
"""
# TODO: Do we want to change it so the GlobalConfiguration object is
# injected, instead of "gotten" by the ProcessManager itself?
log.debug("ProcessManager instance requested")
if current_process().name != "MainProcess":
log.critical("ProcessManager instance requested from non-main process")
raise Exception(
"ProcessManager should only be accessed by the main process."
)
if cls._instance is None:
log.debug("ProcessManager instance did not exist, created")
inst = cls.__new__(cls)
inst.nameservers = {}
inst.daemons = {}
inst.GLOBAL_CONFIG = GlobalConfiguration.instance()
inst.start_checkup_timer()
inst.manager = multiprocessing.Manager()
cls._instance = inst
return cls._instance
[docs]
def start_checkup_timer(self, duration: float = 30.0) -> None:
"""
Starts a timer to check up on the processes every n seconds (default 30).
"""
self._timer = threading.Timer(duration, self.checkup)
self._timer.daemon = True
log.debug("Starting checkup timer")
self._timer.start()
[docs]
def stop_checkup_timer(self) -> None:
"""
Stops the checkup timer.
"""
log.debug("Stopping checkup timer")
if hasattr(self, "_timer"):
self._timer.cancel()
[docs]
def launch_nameserver(self, nameserver: str) -> bool:
"""
Launch a nameserver.
"""
log.info("Manager attempting to launch server '%s'", nameserver)
nscfg = self.GLOBAL_CONFIG.get_nameserver_config(nameserver)
messenger = multiprocessing.Queue()
runner = NameServerRunner(
name=nameserver, nsconfig=nscfg, msg_queue=messenger, daemon=True
)
self.nameservers[nameserver] = NameServerProcessGroup(
runner, messenger, datetime.now()
)
runner.start()
return True
[docs]
def get_nameserver_process_info(self, nameserver: str) -> Dict[str, str]:
"""
Gets info on a nameserver process.
Parameters
----------
nameserver : str
The name of the nameserver.
Returns
-------
Dict[str, str]
A dictionary of information about the nameserver. Keys are:
``created``, ``status``, ``uri``.
"""
log.debug("Entering get_nameserver_process_info()")
if nameserver in self.nameservers:
pgroup = self.nameservers[nameserver]
if pgroup.process.is_alive():
status = running_time_human_readable(pgroup.created)
else:
status = "Died"
uri = f"{pgroup.process.nsconfig.host}:{pgroup.process.nsconfig.ns_port}"
return {
"created": pgroup.created.strftime("%Y-%m-%d %H:%M:%S"),
"status": status,
"uri": uri,
}
else:
return {
"created": "",
"status": "Stopped",
"uri": "",
}
[docs]
def launch_daemon(self, daemon: str) -> bool:
"""
Launch a daemon and all its associated services.
"""
log.info("Manager attempting to launch daemon '%s'", daemon)
daemonconfig = self.GLOBAL_CONFIG.get_daemon_config(daemon)
serviceconfigs = self.GLOBAL_CONFIG.get_service_configs_for_daemon(daemon)
messenger = multiprocessing.Queue()
shared_uris = self.manager.dict()
runner = DaemonRunner(
name=daemon,
daemonconfig=daemonconfig,
serviceconfigs=serviceconfigs,
msg_queue=messenger,
shared_uris=shared_uris,
daemon=True,
)
self.daemons[daemon] = DaemonProcessGroup(
runner, messenger, datetime.now(), shared_uris
)
runner.start()
return True
[docs]
def get_daemon_process_info(self, daemon: str) -> Dict[str, str]:
"""
Return the process group for a daemon.
"""
log.debug("Entering get_daemon_process_info()")
if daemon in self.daemons:
pgroup = self.daemons[daemon]
if pgroup.process.is_alive():
status = running_time_human_readable(pgroup.created)
else:
status = "Died"
if daemon in pgroup.shared_uris:
uri = str(pgroup.shared_uris[daemon])
else:
uri = ""
return {
"created": pgroup.created.strftime("%Y-%m-%d %H:%M:%S"),
"status": status,
"uri": uri,
}
else:
return {
"created": "",
"status": "Stopped",
"uri": "",
}
[docs]
def get_service_process_info(self, service: str) -> Dict[str, str]:
"""
Return the process info for a service.
"""
log.debug("Entering get_service_process_info()")
for daemon_name, daemon in self.daemons.items():
for srvc_name, srvc in daemon.process.serviceconfigs.items():
if srvc_name == service:
return {
"daemon": daemon_name,
"uri": str(daemon.shared_uris[service]),
}
return {
"daemon": "",
"uri": "",
}
[docs]
def checkup(self, continuous: bool = True) -> None:
"""
Checkup the processes.
"""
log.debug("Entering checkup()")
for ns in list(self.nameservers.keys()):
if not self.nameservers[ns].process.is_alive():
log.warning(f"Nameserver '{ns}' died, attempting to restart process.")
try:
self.nameservers[ns].process.join()
log.debug(f"Nameserver '{ns}' joined processes.")
del self.nameservers[ns]
self.launch_nameserver(ns)
except Exception as e:
log.exception(e)
for daemon in list(self.daemons.keys()):
if not self.daemons[daemon].process.is_alive():
log.warning(f"Daemon '{daemon}' died, attempting to restart process.")
try:
self.daemons[daemon].process.join()
log.debug(f"Daemon '{daemon}' joined processes.")
del self.daemons[daemon]
self.launch_daemon(daemon)
except Exception as e:
log.exception(e)
if continuous:
self.start_checkup_timer()
[docs]
def shutdown_nameserver(self, nameserver: str) -> bool:
log.info(f"Sending KILL message to nameserver '{nameserver}'")
group = self.nameservers.pop(nameserver)
polling = group.process.msg_polling
group.msg_queue.put(None)
time.sleep(2 * polling)
return True
[docs]
def shutdown_daemon(self, daemon: str) -> bool:
log.info(f"Sending KILL message to daemon '{daemon}'")
group = self.daemons.pop(daemon)
polling = group.process.msg_polling
group.msg_queue.put(None)
time.sleep(2 * polling)
return True
[docs]
def reload(self) -> bool:
"""
Reload all entities.
"""
log.info("Reloading all running entities.")
running_nameservers = list(self.nameservers.keys())
running_daemons = list(self.daemons.keys())
self.stop_checkup_timer()
for name in running_daemons:
self.shutdown_daemon(name)
for name in running_nameservers:
self.shutdown_nameserver(name)
for name in running_nameservers:
if name in self.GLOBAL_CONFIG.config.nameservers:
self.launch_nameserver(name)
for name in running_daemons:
if name in self.GLOBAL_CONFIG.config.daemons:
self.launch_daemon(name)
self.start_checkup_timer()
return True
[docs]
def shutdown_all(self) -> None:
"""
Shutdown all entities.
"""
log.info("Shutting down all running entities.")
self.stop_checkup_timer()
for daemon in list(self.daemons.keys()):
self.shutdown_daemon(daemon)
for nameserver in list(self.nameservers.keys()):
self.shutdown_nameserver(nameserver)
log.info("All running entities successfully shut down.")
[docs]
def running_time_human_readable(start: datetime, end: datetime = None) -> str:
"""
Return the time delta of two times (or one and now) in plain English.
Parameters
----------
start : datetime
The start time.
end : datetime, optional
The end time. Defaults to now.
Returns
-------
str
The time delta in plain English.
"""
if end:
delta = end - start
else:
delta = datetime.now() - start
if delta.days > 0:
return f"Up {delta.days} days"
elif delta.seconds > 3600:
return f"Up {delta.seconds // 3600} hours"
elif delta.seconds > 120:
return f"Up {delta.seconds // 60} minutes"
elif delta.seconds > 60:
return f"Up 1 minute"
else:
return f"Up {delta.seconds} seconds"