# Copyright © PyroLab Project Contributors
# Licensed under the terms of the GNU GPLv3+ License
# (see pyrolab/__init__.py for details)
"""
Thorlabs UC480 Scientific Camera
================================
Driver for a Thorlabs Microscope.
.. attention::
Presently Windows only.
Requires ThorCam software. Download it at `thorlabs.com`_.
.. _thorlabs.com: https://www.thorlabs.com/software_pages/ViewSoftwarePage.cfm?Code=ThorCam
Potential future Linux support, since ThorLabs does provide a Windows and
Linux SDK.
.. admonition:: Dependencies
:class: note
thorlabs_kinesis (:ref:`installation instructions <Thorlabs Kinesis Package>`)
"""
# TODO: Investigate Linux support
# (https://www.thorlabs.com/software_pages/ViewSoftwarePage.cfm?Code=ThorCam)
import logging
from ctypes import *
from typing import Tuple
import numpy as np
try:
from thorlabs_kinesis import thor_camera as tc
except:
pass
from pyrolab.api import expose
from pyrolab.drivers.cameras.thorcam import ThorCamBase, ThorCamClient
log = logging.getLogger(__name__)
[docs]
@expose
class UC480(ThorCamBase):
"""
The Thorlabs UC480 camera driver.
Attributes
----------
HEADERSIZE : int
brightness : int
color : bool
roi_shape : (int, int)
roi_pos : (int, int)
pixelclock : int
exposure : int
framerate : int
"""
@property
@expose
def pixelclock(self) -> int:
"""Sets the clockspeed of the camera, usually in the range of 24."""
return self._pixelclock
@pixelclock.setter
@expose
def pixelclock(self, clockspeed: int) -> None:
self._pixelclock = clockspeed
pixelclock = c_uint(clockspeed)
if hasattr(self, "handle"):
tc.PixelClock(self.handle, 6, byref(pixelclock), sizeof(pixelclock))
else:
raise ConnectionError("Cannot set pixelclock before connecting to device.")
@property
@expose
def exposure(self) -> int:
"""Sets the exposure of the camera, the time the shutter is open in
milliseconds (90 ms is a good default)."""
return self._exposure
@exposure.setter
@expose
def exposure(self, exposure: int) -> None:
self._exposure = exposure
exposure_c = c_double(exposure)
if hasattr(self, "handle"):
tc.SetExposure(self.handle, 12, exposure_c, sizeof(exposure_c))
else:
raise ConnectionError("Cannot set exposure before connecting to device.")
@property
@expose
def framerate(self) -> int:
"""The framerate of the camera (fps). You must reset the exposure
after setting the framerate."""
return self._framerate
@framerate.setter
@expose
def framerate(self, framerate: int) -> None:
self._framerate = framerate
s_framerate = c_double(0)
if hasattr(self, "handle"):
tc.SetFrameRate(self.handle, c_double(framerate), byref(s_framerate))
else:
raise ConnectionError("Cannot set framerate before connecting to device.")
[docs]
def connect(
self,
serialno: str,
local: bool = False,
bit_depth: int = 8,
pixelclock: int = 24,
color: bool = True,
colormode: int = 11,
roi_shape: Tuple[int, int] = (1024, 1280),
roi_pos: Tuple[int, int] = (0, 0),
framerate: int = 15,
exposure: int = 90,
pixelbytes: int = 8,
brightness: int = 5,
):
"""
Opens the serial communication with the Thorlabs camera and sets defaults.
Default low-level values that are set include the bit depth and camera
name.
Parameters
----------
serialno : int
The serial number of the camera that should be connected.
local : bool, optional
Whether the camera is being used as a local device; if True, will
not configure sockets for streaming when starting capature (default
False).
bit_depth : int, optional
The number of bits used for each pixel (default 8).
pixelclock: int, optional
Clock speed of the camera (default 24).
color : bool, optional
Whether the camera is in color mode or not (default True).
colormode: int, optional
Mode of color that the camera returns data in. ``11`` (default)
returns raw format, see :py:func:`set_color_mode` for more
information.
roi_shape : tuple(int, int), optional
Dimensions of the image that is taken by the camera (default
``(1024, 1280)``).
roi_pos : tuple(int, int), optional
Position of the top left corner of the roi (region of interest) in
relation to the sensor array (default ``(0,0)``).
framerate : int, optional
The framerate of the camera in frames per second (default 15).
exposure: int, optional
In milliseconds, the time the shutter is open on the camera
(default 90).
pixelbytes: int, optional
The amount of memory space allocated per pixel in bytes (default 8).
brightness : int
Integer (range 1-10) defining the brightness, where 5 leaves the
brightness unchanged.
"""
self.local = local
self.color = color
log.debug(f"Attempting to connect to camera with serialno '{serialno}'")
num = c_int(0)
tc.GetNumberOfCameras(byref(num))
log.debug(f"Found {num.value} cameras")
uci_format = tc.UC480_CAMERA_INFO * 2
uci = uci_format(tc.UC480_CAMERA_INFO(), tc.UC480_CAMERA_INFO())
dwCount = c_int(num.value)
cam_list = tc.UC480_CAMERA_LIST(dwCount=dwCount, uci=uci)
tc.GetCameraList(byref(cam_list))
# Find the camera with the given serial number. Each camera has to be
# connected to and asked its serial number. We then check that they
# match.
for i in range(num.value):
handle = c_int(cam_list.uci[i].dwCameraID)
error = tc.InitCamera(byref(handle))
# 0 means no error
if error != 0:
continue
info = tc.CAMINFO()
error = tc.GetCameraInfo(handle, byref(info))
if int(info.SerNo) == serialno:
self.handle = handle
break
elif i == num.value - 1:
raise ConnectionError("Camera not found")
else:
error = tc.ExitCamera(handle)
if error != 0:
log.error(f"Closing ThorCam failed with error code {error}")
self.bit_depth = bit_depth
tc.SetDisplayMode(self.handle, c_int(32768))
self.meminfo = None
self.pixelclock = pixelclock
self.framerate = framerate
self.exposure = exposure
self.brightness = brightness
self.set_color_mode(colormode)
self._set_hardware_roi_shape(roi_shape)
self.roi_shape = [int(roi_shape[1] / 2), int(roi_shape[0] / 2)]
self._set_hardware_roi_pos(roi_pos)
self.roi_pos = [int(roi_pos[1] / 2), int(roi_pos[0] / 2)]
self._initialize_memory(pixelbytes)
[docs]
@expose
def set_color_mode(self, mode: int = 11) -> None:
"""
Sets the color mode of the image.
This sets the mode of image that is taken. Almost always
use ``11`` which will give you the raw photosensor data in the format:
.. table::
+----+----+----+
| R | G0 |... |
+----+----+----+
| G1 | B |... |
+----+----+----+
|... |... | |
+----+----+----+
This data is interpreted in the _get_image() function.
Parameters
----------
mode : int, optional
The color mode of the pixel data. ``11``, the default, means raw
8-bit. ``6`` means gray 8-bit.
"""
tc.SetColorMode(self.handle, mode)
[docs]
def get_frame(self) -> np.array:
"""
Retrieves the last frame from the camera's memory buffer.
Retrieves the last frame from the camera memory buffer and processes it
into a computer-readable image format.
.. warning::
Not a Pyro exposed function, cannot be called from a Proxy. We
recommend using the :py:class:`ThorCamClient` for streaming
video/getting remote images.
For remote connections, the image is serialized using the pickle module
for remote connections. The header is then added to inform the client
how long the message is. This should not be called from the client. It
will be called from the function _video_loop() which is on a parallel
thread with Pyro5.
Can only be called after :py:func:`start_capture`.
Returns
-------
img : np.array
The last frame from the camera's memory buffer.
"""
log.debug("Retreiving frame from memory")
raw = np.frombuffer(self.meminfo[0], c_ubyte).reshape(
self.hardware_roi_shape[1], self.hardware_roi_shape[0]
)
log.debug(f"Retreived (size {raw.shape})")
bayer = self._bayer_convert(raw)
return self._obtain_roi(bayer)
[docs]
@expose
def start_capture(self) -> Tuple[str, str]:
"""
Starts capture from the camera.
This starts the capture from the camera to the allocated memory
location as well as starts a new thread for the socket server to stream
video from camera memory to the client.
Returns
-------
address, port : tuple(str, str)
The IP address and port of the socket serving the video stream.
"""
log.debug("Sending signal to camera to start capture")
tc.StartCapture(self.handle, tc.IS_DONT_WAIT)
log.debug("Signal sent")
if not self.local:
return self.start_streaming_thread()
[docs]
@expose
def stop_capture(self) -> None:
"""
Stops the capture from the camera.
This frees the memory used for storing the frames, then (for remote
connections) sets the stop_video flag which will close the daemon
socket thread (not necessarily immediately).
"""
tc.FreeMemory(self.handle, self.meminfo[0], self.meminfo[1])
tc.StopCapture(self.handle, 1)
if not self.local:
self.stop_streaming_thread()
def _initialize_memory(self, pixelbytes: int = 8) -> None:
"""
Initializes the memory for holding the most recent frame from the camera.
Parameters
----------
pixelbytes: int, optional
The amount of memory space allocated per pixel in bytes (default 8).
"""
if self.meminfo is not None:
tc.FreeMemory(self.handle, self.meminfo[0], self.meminfo[1])
xdim, ydim = self.hardware_roi_shape
log.debug(f"got dimenstions: {self.hardware_roi_shape}")
# ydim = self.roi_shape[1]
imgsize = xdim * ydim
log.debug(f"image size is {imgsize}")
memid = c_int(0)
c_buf = (c_ubyte * imgsize)(0)
log.debug("allocating memory...")
tc.AllocateMemory(
self.handle, xdim, ydim, c_int(pixelbytes), c_buf, byref(memid)
)
log.debug("setting image memory...")
tc.SetImageMemory(self.handle, c_buf, memid)
log.debug("setting infor...")
self.meminfo = [c_buf, memid]
log.debug("meminfo set")
def _set_hardware_roi_shape(self, roi_shape: Tuple[int, int]) -> None:
"""
Sets the dimensions of the region of interest (roi).
Parameters
----------
roi_shape : tuple(int, int)
Dimensions of the image that is taken by the camera
(usually 1024 x 1280).
"""
# Width and height
AOI_size = tc.IS_2D(roi_shape[0], roi_shape[1])
# 5 for setting size, 3 for setting position
tc.AOI(self.handle, 5, byref(AOI_size), 8)
# 6 for getting sizse, 4 for getting position
tc.AOI(self.handle, 6, byref(AOI_size), 8)
self.hardware_roi_shape = [AOI_size.s32X, AOI_size.s32Y]
def _set_hardware_roi_pos(self, roi_pos: Tuple[int, int]) -> None:
"""
Sets the origin position of the region of interest.
Parameters
----------
roi_pos : tuple(int, int)
Position of the top left corner of the roi (region of interest) in
relation to the sensor array (usually ``(0,0)``).
"""
# Width and height
AOI_pos = tc.IS_2D(roi_pos[0], roi_pos[1])
# 5 for setting size, 3 for setting position
tc.AOI(self.handle, 3, byref(AOI_pos), 8)
# 6 for getting size, 4 for getting position
tc.AOI(self.handle, 4, byref(AOI_pos), 8)
self.hardware_roi_pos = [AOI_pos.s32X, AOI_pos.s32Y]
[docs]
@expose
def close(self):
"""
Closes communication with the camera and frees memory.
Calls :py:func:`stop_capture` to free memory and end the socket server
and then closes serial communication with the camera.
"""
try:
self.handle
except AttributeError:
return
self.stop_capture()
error = tc.ExitCamera(self.handle)
if error != 0:
log.error(f"Closing ThorCam failed (code {error})")
del self.handle
self.meminfo = None
[docs]
class UC480Client(ThorCamClient):
pass