# Copyright © PyroLab Project Contributors
# Licensed under the terms of the GNU GPLv3+ License
# (see pyrolab/__init__.py for details)
"""
ThorCam
=======
Module providing basic functionality for Thorlabs cameras.
Provides a base class with common functionality for Thorlabs cameras. Not
intended to be instantiated directly, even if it works.
Driver for ThorLabs cameras interfacing with the ThorCam software DLLs.
.. attention::
Presently Windows only.
Requires ThorCam software. Download it at `thorlabs.com`_.
.. _thorlabs.com: https://www.thorlabs.com/software_pages/ViewSoftwarePage.cfm?Code=ThorCam
Potential future Linux support, since ThorLabs does provide a Windows and
Linux SDK.
.. admonition:: Dependencies
:class: note
thorlabs_kinesis (:ref:`installation instructions <Thorlabs Kinesis Package>`)
"""
import logging
import socket
import threading
import time
from ctypes import *
from typing import Tuple, Optional
import numpy as np
import cv2 as cv
from pyrolab.api import expose
from pyrolab.drivers.cameras import Camera
from pyrolab.api import locate_ns, Proxy
log = logging.getLogger(__name__)
[docs]
class ThorCamBase(Camera):
"""
The Thorlabs camera base driver.
Attributes
----------
HEADERSIZE : int
brightness : int
color : bool
roi_shape : (int, int)
roi_pos : (int, int)
"""
def __init__(self):
self._HEADERSTRUCT = np.zeros(4, dtype=np.uintc)
self.stop_video = threading.Event()
self.brightness = 5
self.color = True
@property
@expose
def HEADERSIZE(self) -> int:
"""The size in bytes of the header in each serialized message (read only)."""
return self._HEADERSTRUCT.itemsize * self._HEADERSTRUCT.size
@property
@expose
def brightness(self) -> int:
"""Integer (range 1-10) defining the brightness, where 5 leaves the
brightness unchanged."""
return self._brightness
@brightness.setter
@expose
def brightness(self, brightness: int) -> None:
self._brightness = brightness
@property
@expose
def color(self) -> bool:
"""Sets whether to transmit color (``True``) or grayscale (``False``)
images."""
return self._color
@color.setter
@expose
def color(self, color: bool) -> None:
self._color = color
@property
@expose
def roi_shape(self) -> Tuple[int, int]:
"""The region of interest shape."""
return self._software_roi_shape
@roi_shape.setter
@expose
def roi_shape(self, shape: Tuple[int, int]) -> None:
self._software_roi_shape = shape
@property
@expose
def roi_pos(self) -> Tuple[int, int]:
"""Sets the upper left corner of the region of interest in pixels."""
return self._software_roi_pos
@roi_pos.setter
@expose
def roi_pos(self, pos: Tuple[int, int]) -> None:
self._software_roi_pos = pos
[docs]
@expose
def connect(self, *args, **kwargs):
"""
Opens the serial communication with the Thorlabs camera and sets defaults.
Not implemented in the base class. Should be overwritten by inheriting
classes.
"""
raise NotImplementedError
def _obtain_roi(self, image: np.array) -> np.array:
log.debug(f"shape of image: {image.shape}")
log.debug(f"shape: {self.roi_shape}")
log.debug(f"positions: {self.roi_pos}")
if self.color:
image = image[
self.roi_pos[1] : self.roi_pos[1] + self.roi_shape[1],
self.roi_pos[0] : self.roi_pos[0] + self.roi_shape[0],
:,
]
else:
image = image[
self.roi_pos[1] : self.roi_pos[1] + self.roi_shape[1],
self.roi_pos[0] : self.roi_pos[0] + self.roi_shape[0],
]
log.debug(f"new shape of image: {image.shape}")
return image
def _bayer_convert(self, raw: np.array) -> np.array:
"""
Coverts the raw data to something that can be displayed on-screen.
Image data is retrieved as a single-dimensional array. This function
converts it into either multidimensional BGR or grayscale image.
Parameters
----------
raw : np.array
The raw data that is received from the camera.
Returns
-------
np.array
The converted data.
"""
ow = (raw.shape[0] // 4) * 4
oh = (raw.shape[1] // 4) * 4
R = raw[0::2, 0::2]
B = raw[1::2, 1::2]
G0 = raw[0::2, 1::2]
G1 = raw[1::2, 0::2]
if self.color:
log.debug("Bayer convert (color)")
frame_height = raw.shape[0] // 2
frame_width = raw.shape[1] // 2
G = G0[:oh, :ow] // 2 + G1[:oh, :ow] // 2
bayer_R = np.array(R, dtype=np.uint8).reshape(frame_height, frame_width)
bayer_G = np.array(G, dtype=np.uint8).reshape(frame_height, frame_width)
bayer_B = np.array(B, dtype=np.uint8).reshape(frame_height, frame_width)
log.debug("Stacking color data")
dStack = np.clip(
np.dstack(
(
bayer_B * (self.brightness / 5),
bayer_G * (self.brightness / 5),
bayer_R * (self.brightness / 5),
)
),
0,
np.power(2, self.bit_depth) - 1,
).astype("uint8")
else:
log.debug("Bayer convert (grayscale)")
bayer = (
R[:oh, :ow] // 3
+ B[:oh, :ow] // 3
+ (G0[:oh, :ow] // 2 + G1[:oh, :ow] // 2) // 3
)
frame_height = bayer.shape[0]
frame_width = bayer.shape[1]
bayer_T = np.array(bayer, dtype=np.uint8).reshape(frame_height, frame_width)
log.debug("Stacking grayscale data")
dStack = np.clip(
bayer_T * (self.brightness / 5),
0,
np.power(2, self.bit_depth) - 1,
).astype("uint8")
log.debug("Bayer data stacked")
return dStack
[docs]
def get_frame(self) -> np.array:
"""
Retrieves the last frame from the camera's memory buffer.
.. warning::
Not a Pyro exposed function, cannot be called from a Proxy. We
recommend using the :py:class:`ThorCamClient` for streaming
video/getting remote images.
Retrieves the last frame from the camera memory buffer and processes it
into a computer-readable image format.
Can only be called after :py:func:`start_capture`.
Returns
-------
img : np.array
The last frame from the camera's memory buffer.
"""
raise NotImplementedError
def _write_header(
self, size: int, d1: int = 1, d2: int = 1, d3: int = 1
) -> np.array:
"""
Creates the message header for the image being transferred over socket.
Format is an array of 4 ``np.uintc`` values, ordered as
(size, d1, d2, d3) where d1, d2, d3 are the dimensions of the image
(typically ``img.shape``, if ``img`` is a numpy array).
Parameters
----------
size : int
The total length of the message (excluding the header).
d1 : int, optional
The shape of the first dimension of the image (default 1).
d2 : int, optional
The shape of the second dimension of the image (default 1).
d3 : int, optional
The shape of the third dimension of the image (default 1).
"""
log.debug(f"Received: {size} {d1} {d2} {d3}")
return np.array((size, d1, d2, d3), dtype=np.uintc)
def _remote_streaming_loop(self):
"""
Starts a separate thread to stream frames.
This function is called as a separate thread when streaming is initiated.
It will loop, sending frame by frame across the socket connection,
until the ``stop_video`` is set (by :py:func:`stop_capture`).
Sends a single image and waits for ACK before sending the next image.
"""
log.debug("Waiting for client to connect...")
self.serversocket.listen(5)
self.clientsocket, address = self.serversocket.accept()
self.clientsocket.settimeout(5.0)
log.debug("Accepted client socket")
while not self.stop_video.is_set():
log.debug("Getting frame")
encode_param = [int(cv.IMWRITE_JPEG_QUALITY), 90]
success, msg = cv.imencode(".jpg", self.get_frame(), encode_param)
if not success:
log.debug("Compression failed")
log.debug("Serializing")
ser_msg = msg.tobytes()
header = self._write_header(len(ser_msg), *msg.shape)
ser_msg = header.tobytes() + ser_msg
try:
log.debug(f"Sending message ({len(ser_msg)} bytes)")
self.clientsocket.send(ser_msg)
log.debug("Message sent")
check_msg = self.clientsocket.recv(4096)
log.debug(f"ACK: {check_msg}")
except TimeoutError:
print("Connection timed out!")
self.end_stream()
def _get_socket(self) -> Tuple[str, int]:
"""
Opens an socket on the local machine using an available port and binds to it.
Returns
-------
address, port : Tuple[str, int]
The address and port of the new socket.
"""
self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serversocket.settimeout(5.0)
self.serversocket.bind((socket.gethostname(), 0))
return self.serversocket.getsockname()
[docs]
def start_streaming_thread(self) -> Tuple[str, int]:
"""
Starts the streaming thread for nonlocal connections.
Returns
-------
address, port : str, int
The address and port of the socket providing the stream.
"""
log.debug("Setting up socket for streaming")
self.stop_video.clear()
address, port = self._get_socket()
self.video_thread = threading.Thread(
target=self._remote_streaming_loop, args=()
)
self.video_thread.start()
return [address, port]
[docs]
@expose
def start_capture(self) -> Optional[Tuple[str, int]]:
"""
Signals the hardware to start capturing and sets up the streaming thread.
Not implemented in the base class. Should be overwritten by inheriting
classes.
Returns
-------
address, port : str, int
Address and port of the opened socket, if used remotely.
"""
raise NotImplementedError
[docs]
def stop_streaming_thread(self):
"""
Closes the socket connection and signals the streaming thread to shutdown.
"""
self.clientsocket.close()
self.stop_video.set()
[docs]
@expose
def stop_capture(self) -> None:
"""
Stops the capture from the camera.
This frees the memory used for storing the frames then triggers
the stop_video event which will end the parallel socket thread.
"""
raise NotImplementedError
[docs]
@expose
def close(self):
"""
Closes communication with the camera and frees memory.
This is not implemented in the base class, inheriting classes should
implement this function. They should close the socket server
and then closes serial communication with the camera.
"""
raise NotImplementedError
[docs]
@expose
def start_stream(self) -> None:
"""
Starts a camera stream.
"""
raise NotImplementedError
[docs]
@expose
def end_stream(self) -> None:
"""
Ends a camera stream.
"""
raise NotImplementedError
[docs]
@expose
def await_stream(self, timeout: float = 3.0) -> bool:
raise NotImplementedError
[docs]
class ThorCamClient:
"""
The Thorlabs camera client. Not a PyroLab :py:class:`Service` object.
Used for receiving video streamed over a socket connection from a
:py:class:`ThorCamBase`-derived service.
Any :py:class:`ThorCamBase` attribute is a valid ThorCamClient attribute.
Attributes
----------
SUB_MESSAGE_LENGTH : int
The size of the sub-message chunks used.
"""
def __init__(self):
self.remote_attributes = []
self.SUB_MESSAGE_LENGTH = 4096
self.stop_video = threading.Event()
self.video_stopped = threading.Event()
self.last_image = None
def __getattr__(self, attr):
"""
Accesses remote camera attributes as if they were local.
Examples
--------
>>> print(ThorCamClient.color)
False
>>> print(ThorCamClient.brightness)
5
"""
if attr in self.remote_attributes:
return getattr(self.cam, attr)
else:
return super().__getattr__(attr)
def __setattr__(self, attr, value):
"""
Sets remote camera attributes as if they were local.
Examples
--------
>>> ThorCamClient.color = True
>>> ThorCamClient.brightness = 8
>>> ThorCamClient.exposure = 100
"""
if attr == "remote_attributes":
return super().__setattr__(attr, value)
elif attr in self.remote_attributes:
return setattr(self.cam, attr, value)
else:
return super().__setattr__(attr, value)
[docs]
def connect(self, name: str, ns_host: str = None, ns_port: float = None) -> None:
"""
Connect to a remote PyroLab-hosted UC480 camera.
Assumes the nameserver where the camera is registered is already
configured in the environment.
Parameters
----------
name : str
The name used to register the camera on the nameserver.
Examples
--------
>>> from pyrolab.api import NameServerConfiguration
>>> from pyrolab.drivers.cameras.thorcam import ThorCamClient
>>> nscfg = NameServerConfiguration(host="my.nameserver.com")
>>> nscfg.update_pyro_config()
>>> cam = ThorCamClient()
>>> cam.connect("camera_name")
"""
if ns_host or ns_port:
args = {"host": ns_host, "port": ns_port}
else:
args = {}
with locate_ns(**args) as ns:
self.cam = Proxy(ns.lookup(name))
self.cam.autoconnect()
self.remote_attributes = self.cam._pyroAttrs
self._LOCAL_HEADERSIZE = self.HEADERSIZE
[docs]
def start_stream(self) -> None:
"""
Starts the video stream.
Sets up the remote camera to start streaming and opens a socket
connection to receive the stream. Starts a new daemon thread to
constantly receive images.
"""
address, port = self.cam.start_capture()
self.clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.clientsocket.settimeout(15.0)
self.clientsocket.connect((address, port))
self.stop_video.clear()
self.video_stopped.clear()
self.video_thread = threading.Thread(target=self._receive_video_loop, args=())
self.video_thread.daemon = True
self.video_thread.start()
def _decode_header(self, header):
"""
Decodes the header of the image.
Image header consists of four np.uintc values, ordered as [length of
message (in bytes), width, height, depth (usually 1, or 3 if color)].
Parameters
----------
header : bytes
The header of the image.
Returns
-------
length, shape : int, tuple(int, int, int)
The length in bytes of the image, and its shape (for np.reshape).
"""
length, *shape = np.frombuffer(header, dtype=np.uintc)
return length, shape
def _receive_video_loop(self) -> None:
while not self.stop_video.is_set():
message = b""
# Read size of the incoming message
try:
header = self.clientsocket.recv(self._LOCAL_HEADERSIZE)
length, shape = self._decode_header(header)
while len(message) < length:
submessage = self.clientsocket.recv(self.SUB_MESSAGE_LENGTH)
message += submessage
except TimeoutError:
print("Connection timed out!")
self.end_stream()
# Deserialize the message and break
self.last_image = cv.imdecode(
np.frombuffer(message, dtype=np.uint8).reshape(shape), 1
)
self.clientsocket.send(b"ACK")
self.clientsocket.close()
self.video_stopped.set()
[docs]
def end_stream(self) -> None:
"""
Ends the video stream.
Ends the video stream by setting the stop_video flag and closing the
socket connection. Because communication is via a flag, shutdown
may not be instantaneous.
"""
self.stop_video.set()
while not self.video_stopped.is_set():
time.sleep(0.001)
self.cam.stop_capture()
[docs]
def await_stream(self, timeout: float = 3.0) -> bool:
"""
Blocks until the first image is available from the stream.
Parameters
----------
timeout : float
The number of seconds to wait for the first image (default 3).
Returns
-------
bool
``True`` if an image is available, ``False`` otherwise.
"""
start = time.time()
while self.last_image is None:
if time.time() - start > timeout:
return False
time.sleep(0.001)
return True
[docs]
def get_frame(self) -> np.ndarray:
"""
Returns the last image received from the stream.
You should make sure to call :py:meth:`await_stream` before calling
this method.
Returns
-------
np.ndarray
The last image received from the stream.
Examples
--------
>>> cam = ThorCamClient()
>>> cam.connect("camera_name")
>>> cam.start_stream()
>>> cam.await_stream()
>>> frame = cam.get_frame()
"""
return self.last_image
[docs]
def close(self) -> None:
"""
Closes the Proxy connection to the remote camera.
"""
self.cam.close()
self.remote_attributes = []