Source code for cg635_clock_generator.cg635_clock_generator

"""
Module for the control of the SRS CG635 clock generator.
"""

import logging
from functools import wraps
from time import sleep, time
from typing import Optional

import pyvisa
import pyvisa.constants

from cg635_clock_generator.constants import (
    CG635CMOSStandard,
    CG635Communication,
    CG635QStandard,
    CG635Timebase,
)
from cg635_clock_generator.exceptions import (
    CG635CommandError,
    CG635CommunicationError,
    CG635OperationTimeoutError,
)

__author__ = "Leandro Lanzieri"
__copyright__ = "Deutsches Elektronen-Synchrotron, DESY"
__license__ = "LGPL-3.0"

_LOGGER = logging.getLogger(__name__)


def _verify_operation(method):
    """
    A decorator to verify that an operation has completed successfully.
    It sets the event status register to generate a service request when the operation
    is completed, and waits for the request to be fired.
    """

    @wraps(method)
    def _verify_operation_complete(instance: "CG635ClockGenerator", *args, **kwargs):
        # clear the ESR, CESR, LCKR and INSR registers and LERR error buffer
        instance._resource.write("*CLS")

        # set a bit on the event register when any of the following occurs:
        # - the completion of an operation
        # - query error
        # - device error
        # - execution error
        # - command error
        instance._resource.write("*ESE 61")

        # generate a service request when any unmasked bit on the event register is set
        instance._resource.write("*SRE 32")

        # now execute the operation
        method(instance, *args, **kwargs)

        # wait for the operation to complete,.
        # this sets the OPC bit on the event register when the operation is completed.
        instance._resource.write("*OPC")

        start = time()
        while time() - start < instance._operation_complete_timeout:
            sleep(instance._operation_complete_polling_interval)
            serial_poll_status = int(instance._resource.query("*STB?"))
            if serial_poll_status & 1 << 5:
                # instrument requests service
                break
        else:
            raise CG635OperationTimeoutError("Operation did not complete in time")

        event_status = int(instance._resource.query("*ESR?"))
        if event_status > 1:
            raise CG635CommunicationError(
                f"Operation failed with status: {event_status}"
            )

        # clear the event registers
        instance._resource.write("*ESE 0")
        instance._resource.write("*SRE 0")

    return _verify_operation_complete


[docs] class CG635ClockGenerator: """ Implements the control of the SRS CG635 clock generator. Args: communication_type (CG635Communication): The communication method to use. serial_device (Optional[str]): The serial device to use for communication, in case RS-232 or USB (via an adapter) is used. Cannot be used together with gpib_address nor resource_path. Default is None. gpib_address (Optional[int]): The GPIB address to use for communication, in case GPIB is used. Cannot be used together with serial_device nor resource_path. Default is None. gpib_card (Optional[int]): The number of the GPIB card to use for communication. Default is None. resource_path (Optional[str]): The resource path to use for communication. Cannot be used together with serial_device nor gpib_device. Default is None. resource_manager (pyvisa.Optional[ResourceManager]): The resource manager to use for communication. If None, a new resource manager will be created. Default is None. communication_timeout (int): The communication timeout in milliseconds. Default is 1000. operation_complete_timeout (int): Maximum time to wait for an operation to complete (in seconds). Default is 5. operation_complete_polling_interval (int): Interval between checks for operation completion (in seconds). Default is 0.1. """ RS232_BAUDRATE = 9600 """Baud rate for the RS232 communication.""" RS323_BITS = 8 """Number of bits for the RS232 communication.""" RS232_STOP_BITS = pyvisa.constants.StopBits.one """Number of stop bits for the RS232 communication.""" RS232_PARITY = pyvisa.constants.Parity.none """Parity for the RS232 communication.""" RS232_FLOW_CONTROL = pyvisa.constants.VI_ASRL_FLOW_RTS_CTS """Flow control for the RS232 communication.""" LINE_TERMINATION = "\n" """Line termination.""" GPIB_DEFAULT_ADDRESS = 23 """GPIB default address.""" GPIP_DEFAULT_CARD = 0 """GPIB default card.""" def __init__( self, communication_type: CG635Communication, serial_device: Optional[str] = None, gpib_address: Optional[int] = None, gpib_card: Optional[int] = None, resource_path: Optional[str] = None, resource_manager: Optional[pyvisa.ResourceManager] = None, communication_timeout: int = 1000, operation_complete_timeout: int = 5, operation_complete_polling_interval: float = 0.1, ): self._communication_timeout = communication_timeout self._communication_type = communication_type self._operation_complete_timeout = operation_complete_timeout self._operation_complete_polling_interval = operation_complete_polling_interval self._resource_manager = resource_manager or pyvisa.ResourceManager("@py") if self._communication_type == CG635Communication.RS232: assert serial_device is not None self._serial_device = serial_device self._gpib_address = None self._resource_path = f"ASRL{serial_device}::INSTR" if self._communication_type == CG635Communication.GPIB: self._gpib_address = gpib_address or self.GPIB_DEFAULT_ADDRESS self._gpib_card = gpib_card or self.GPIP_DEFAULT_CARD self._serial_device = None self._resource_path = f"GPIB{self._gpib_card}::{self._gpib_address}::INSTR" if resource_path is not None: self._resource_path = resource_path self._resource: pyvisa.resources.MessageBasedResource = ( self._resource_manager.open_resource(self._resource_path) ) self._resource.timeout = self._communication_timeout self._resource.read_termination = self.LINE_TERMINATION self._resource.write_termination = self.LINE_TERMINATION if self._communication_type == CG635Communication.RS232: self._resource.baud_rate = self.RS232_BAUDRATE self._resource.data_bits = self.RS323_BITS self._resource.stop_bits = self.RS232_STOP_BITS self._resource.parity = self.RS232_PARITY self._resource.flow_control = self.RS232_FLOW_CONTROL
[docs] def get_identification(self) -> str: """ Gets the identification of the device. """ return self._resource.query("*IDN?")
[docs] def reset(self): """ Resets the device to factory settings. This is equivalent to pressing the keys 'SHIFT', 'INIT', 'Hz' on the front panel. The remote interface, GPIB address and the power-on status clear are not affected by this command. """ self._resource.write("*RST")
[docs] @_verify_operation def set_cmos_standard(self, standard: CG635CMOSStandard): """ Sets the CMOS standard. Args: standard: The CMOS standard to set. """ if not isinstance(standard, CG635CMOSStandard): _LOGGER.debug(f"The type of the parameter is {type(standard)}") raise CG635CommandError(f"Invalid CMOS standard: {standard}") self._resource.write(f"STDC {standard.value}")
[docs] def get_cmos_standard(self) -> Optional[CG635CMOSStandard]: """ Gets the CMOS standard. If the current CMOS levels are not standard, returns None. Raises: CG635CommunicationError: If the CMOS standard is invalid. """ response: str = self._resource.query("STDC?").strip() if response == "-1": return None try: level = CG635CMOSStandard(int(response)) except ValueError: raise CG635CommunicationError(f"Invalid CMOS standard: {response}") return level
[docs] @_verify_operation def set_cmos_low_level(self, level: float): """ Sets the low level of the CMOS output. To set to standard levels, use the set_cmos_standard method. Args: level: The low level of the CMOS output in volts. The range is -1.0 to 1.0 V. Raises: CG635CommandError: If the level is invalid. """ if not -1.0 <= level <= 1.0: raise CG635CommandError(f"Invalid CMOS low level: {level}") self._resource.write(f"CMOS 0,{level:.2f}")
[docs] def get_cmos_low_level(self) -> float: """ Gets the low level of the CMOS output in volts. """ return float(self._resource.query("CMOS?0"))
[docs] @_verify_operation def set_cmos_high_level(self, level: float): """ Sets the high level of the CMOS output. To set to standard levels, use the set_cmos_standard method. Args: level: The high level of the CMOS output in volts. The range is -0.5 to 6.0 V. Raises: CG635CommandError: If the level is invalid. """ if not -0.5 <= level <= 6.0: raise CG635CommandError(f"Invalid CMOS high level: {level}") self._resource.write(f"CMOS 1,{level:.2f}")
[docs] def get_cmos_high_level(self) -> float: """ Gets the high level of the CMOS output in volts. """ return float(self._resource.query("CMOS?1"))
[docs] @_verify_operation def set_frequency(self, frequency: float): """ Sets the frequency of the output signal. Args: frequency: The frequency of the output signal in Hz. """ self._resource.write(f"FREQ {frequency}")
[docs] def get_frequency(self) -> float: """ Gets the frequency of the output signal in Hz. """ return float(self._resource.query("FREQ?"))
[docs] @_verify_operation def set_output(self, enabled: bool): """ Enables or disables the output signal. Args: enabled: Whether to enable the output signals. """ self._resource.write(f"RUNS {1 if enabled else 0}")
[docs] def get_output(self) -> bool: """ Gets whether the output signal is enabled. """ return bool(int(self._resource.query("RUNS?")))
[docs] @_verify_operation def set_phase(self, phase: float): """ Sets the phase of the output signal. Args: phase: The phase of the output signal in degrees. Raises: CG635TimeoutError: If the operation does not complete in time. """ self._resource.write(f"PHAS {phase}")
[docs] def get_phase(self) -> float: """ Gets the phase of the output signal in degrees. """ return float(self._resource.query("PHAS?"))
[docs] @_verify_operation def set_relative_phase(self): """ Sets the current phase to be zero degrees. This doesn't change the phase of the output signal. """ self._resource.write("RPHS")
[docs] @_verify_operation def set_q_low_voltage(self, voltage: float): """ Sets the low voltage of the Q output. Args: voltage: The low voltage of the Q output in volts. The range is -3.00 to 4.80 V. """ if not -3.00 <= voltage <= 4.80: raise CG635CommandError(f"Invalid Q low voltage: {voltage}") self._resource.write(f"QOUT 0,{voltage:.2f}")
[docs] def get_q_low_voltage(self) -> float: """ Gets the low voltage of the Q output in volts. """ return float(self._resource.query("QOUT?0"))
[docs] @_verify_operation def set_q_high_voltage(self, voltage: float): """ Sets the high voltage of the Q output. Args: voltage: The high voltage of the Q output in volts. The range is -2.00 to 5.00 V. """ if not -2.00 <= voltage <= 5.00: raise CG635CommandError(f"Invalid Q high voltage: {voltage}") self._resource.write(f"QOUT 1,{voltage:.2f}")
[docs] def get_q_high_voltage(self) -> float: """ Gets the high voltage of the Q output in volts. """ return float(self._resource.query("QOUT?1"))
[docs] @_verify_operation def set_q_standard(self, standard: CG635QStandard): """ Sets the Q standard. Args: standard: The Q standard to set. """ if not isinstance(standard, CG635QStandard): raise CG635CommandError(f"Invalid Q standard: {standard}") self._resource.write(f"STDQ {standard.value}")
[docs] def get_q_standard(self) -> Optional[CG635QStandard]: """ Gets the Q standard. If the current Q levels are not standard, returns None. Raises: CG635CommunicationError: If the Q standard is invalid. """ response = int(self._resource.query("STDQ?").strip()) if response == -1: return None try: level = CG635QStandard(response) except ValueError: raise CG635CommunicationError(f"Invalid Q standard: {response}") return level
[docs] def get_timebase(self) -> CG635Timebase: """ Gets the timebase of the device. Raises: CG635CommunicationError: If the timebase is invalid. """ response = int(self._resource.query("TIMB?").strip()) try: timebase = CG635Timebase(response) except ValueError: raise CG635CommunicationError(f"Invalid timebase: {response}") return timebase
[docs] def close(self): """ Closes the connection to the device. """ self._resource.close()
def __del__(self): self.close()