Skip to content

cellects.core.cellects_threads

cellects.core.cellects_threads

Cellects GUI module implementing threaded image/video analysis workflows.

This module provides a Qt-based interface for analyzing biological motion and growth through color space combinations, segmentation strategies, arena delineation, and video processing. Uses QThreaded workers to maintain UI responsiveness during computationally intensive tasks like segmentation, motion tracking, network detection, oscillation and fractal analysis.

Main Components LoadDataToRunCellectsQuicklyThread : Loads necessary data asynchronously for quick Cellects execution. FirstImageAnalysisThread : Analyzes first image with automatic color space selection and segmentation. LastImageAnalysisThread : Processes last frame analysis for optimized color space combinations. CropScaleSubtractDelineateThread : Handles cropping, scaling, and arena boundary detection. VideoTrackingThread : Performs complete motion analysis on a single arena or full batch analysis across multiple arenas/experiments.

Notes Uses QThread for background operations to maintain UI responsiveness. Key workflows include automated color space optimization, adaptive segmentation algorithms, multithreaded video processing, and arena delineation via geometric analysis or manual drawing. Implements special post-processing for Physarum polycephalum network detection and oscillatory activity tracking.

CompleteImageAnalysisThread

Bases: QThread

Thread for completing the last image analysis.

Signals

message_when_thread_finished : Signal(bool) Signal emitted upon completion of the thread's task.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class CompleteImageAnalysisThread(QtCore.QThread):
    """
    Thread for completing the last image analysis.

    Signals
    -------
    message_when_thread_finished : Signal(bool)
        Signal emitted upon completion of the thread's task.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_when_thread_finished = QtCore.Signal(bool)

    def __init__(self, parent=None):
        """
        Initialize the worker thread for completing the last image analysis

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(CompleteImageAnalysisThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        self.parent().po.get_background_to_subtract()
        self.parent().po.save_origins_and_backgrounds_lists()
        self.parent().po.save_data_to_run_cellects_quickly()
        self.parent().po.bio_mask = None
        self.parent().po.back_mask = None
        if self.parent().imageanalysiswindow.bio_masks_number != 0:
            self.parent().po.bio_mask = np.array(np.nonzero(self.parent().imageanalysiswindow.bio_mask))
        if self.parent().imageanalysiswindow.back_masks_number != 0:
            self.parent().po.back_mask = np.array(np.nonzero(self.parent().imageanalysiswindow.back_mask))
        if not self.isInterruptionRequested():
            self.parent().po.complete_image_analysis()
            self.message_when_thread_finished.emit(True)

__init__(parent=None)

Initialize the worker thread for completing the last image analysis

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for completing the last image analysis

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(CompleteImageAnalysisThread, self).__init__(parent)
    self.setParent(parent)

CropScaleSubtractDelineateThread

Bases: QThread

Thread for detecting crop and arena coordinates.

Signals

message_from_thread : Signal(str) Signal emitted when progress messages are available. message_when_thread_finished : Signal(dict) Signal emitted upon completion of the thread's task.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class CropScaleSubtractDelineateThread(QtCore.QThread):
    """
    Thread for detecting crop and arena coordinates.

    Signals
    -------
    message_from_thread : Signal(str)
        Signal emitted when progress messages are available.
    message_when_thread_finished : Signal(dict)
        Signal emitted upon completion of the thread's task.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_from_thread = QtCore.Signal(str)
    message_when_thread_finished = QtCore.Signal(dict)

    def __init__(self, parent=None):
        """
        Initialize the worker thread for detecting crop and arena coordinates in the first image

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """

        super(CropScaleSubtractDelineateThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Start cropping if required, perform initial processing,
        and handle subsequent operations based on configuration.

        Extended Description
        --------------------
        This method initiates the cropping process if necessary,
        performs initial processing steps, and manages subsequent operations
        depending on whether multiple blobs are detected per arena.

        Notes
        -----
        This method uses several logging operations to track its progress.
        It interacts with various components of the parent object
        to perform necessary image processing tasks.
        """
        logging.info("Start cropping if required")
        self.status = {"continue": True, "message": ""}
        self.parent().po.first_image.get_setup_boundaries()
        self.parent().po.cropping(is_first_image=True)
        self.parent().po.get_average_pixel_size()
        if os.path.isfile('cellects_settings.json'):
            os.remove('cellects_settings.json')
        logging.info("Save data to run Cellects quickly")
        self.parent().po.save_data_to_run_cellects_quickly()
        if not self.parent().po.vars['several_blob_per_arena']:
            logging.info("Check whether the detected shape number is ok")
            nb, shapes, stats, centroids = cv2.connectedComponentsWithStats(self.parent().po.first_image.validated_shapes)
            y_lim = self.parent().po.first_image.y_boundaries
            if ((nb - 1) != self.parent().po.sample_number or np.any(stats[:, 4] == 1)):
                self.status['message'] = "Image analysis failed to detect the right cell(s) number: restart the analysis."
                self.status['continue'] = False
            elif y_lim is None:
                self.status['message'] = "The shapes detected in the image did not allow automatic arena delineation."
                self.status['continue'] = False
            elif (y_lim == - 1).sum() != (y_lim == 1).sum():
                self.status['message'] = "Automatic arena delineation cannot work if one cell touches the image border."
                self.parent().po.first_image.y_boundaries = None
                self.status['continue'] = False
        if self.status['continue']:
            self.parent().po.save_first_image()
            self.parent().po.save_masks()
            logging.info("Start automatic video delineation")
            self.status = self.parent().po.delineate_each_arena()
        else:
            self.parent().po.first_image.validated_shapes = np.zeros(self.parent().po.first_image.image.shape[:2], dtype=np.uint8)
            logging.info(self.status['message'])
        self.message_when_thread_finished.emit(self.status)

__init__(parent=None)

Initialize the worker thread for detecting crop and arena coordinates in the first image

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for detecting crop and arena coordinates in the first image

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """

    super(CropScaleSubtractDelineateThread, self).__init__(parent)
    self.setParent(parent)

run()

Start cropping if required, perform initial processing, and handle subsequent operations based on configuration.

Extended Description

This method initiates the cropping process if necessary, performs initial processing steps, and manages subsequent operations depending on whether multiple blobs are detected per arena.

Notes

This method uses several logging operations to track its progress. It interacts with various components of the parent object to perform necessary image processing tasks.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Start cropping if required, perform initial processing,
    and handle subsequent operations based on configuration.

    Extended Description
    --------------------
    This method initiates the cropping process if necessary,
    performs initial processing steps, and manages subsequent operations
    depending on whether multiple blobs are detected per arena.

    Notes
    -----
    This method uses several logging operations to track its progress.
    It interacts with various components of the parent object
    to perform necessary image processing tasks.
    """
    logging.info("Start cropping if required")
    self.status = {"continue": True, "message": ""}
    self.parent().po.first_image.get_setup_boundaries()
    self.parent().po.cropping(is_first_image=True)
    self.parent().po.get_average_pixel_size()
    if os.path.isfile('cellects_settings.json'):
        os.remove('cellects_settings.json')
    logging.info("Save data to run Cellects quickly")
    self.parent().po.save_data_to_run_cellects_quickly()
    if not self.parent().po.vars['several_blob_per_arena']:
        logging.info("Check whether the detected shape number is ok")
        nb, shapes, stats, centroids = cv2.connectedComponentsWithStats(self.parent().po.first_image.validated_shapes)
        y_lim = self.parent().po.first_image.y_boundaries
        if ((nb - 1) != self.parent().po.sample_number or np.any(stats[:, 4] == 1)):
            self.status['message'] = "Image analysis failed to detect the right cell(s) number: restart the analysis."
            self.status['continue'] = False
        elif y_lim is None:
            self.status['message'] = "The shapes detected in the image did not allow automatic arena delineation."
            self.status['continue'] = False
        elif (y_lim == - 1).sum() != (y_lim == 1).sum():
            self.status['message'] = "Automatic arena delineation cannot work if one cell touches the image border."
            self.parent().po.first_image.y_boundaries = None
            self.status['continue'] = False
    if self.status['continue']:
        self.parent().po.save_first_image()
        self.parent().po.save_masks()
        logging.info("Start automatic video delineation")
        self.status = self.parent().po.delineate_each_arena()
    else:
        self.parent().po.first_image.validated_shapes = np.zeros(self.parent().po.first_image.image.shape[:2], dtype=np.uint8)
        logging.info(self.status['message'])
    self.message_when_thread_finished.emit(self.status)

FirstImageAnalysisThread

Bases: QThread

Thread for analyzing the first image of a given folder.

Signals

message_from_thread : Signal(str) Signal emitted when progress messages are available. message_when_thread_finished : Signal(bool) Signal emitted upon completion of the thread's task.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class FirstImageAnalysisThread(QtCore.QThread):
    """
    Thread for analyzing the first image of a given folder.

    Signals
    -------
    message_from_thread : Signal(str)
        Signal emitted when progress messages are available.
    message_when_thread_finished : Signal(bool)
        Signal emitted upon completion of the thread's task.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_from_thread = QtCore.Signal(str)
    message_when_thread_finished = QtCore.Signal(bool)

    def __init__(self, parent=None):
        """
        Initialize the worker thread for analyzing the first image of a given folder

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(FirstImageAnalysisThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Perform image analysis and segmentation based on the current state of the application.

        This function handles both bio-mask and background mask processing, emits status messages,
        computes average pixel size if necessary, and performs image segmentation or generates
        analysis options.

        Parameters
        ----------
        self : object
            The instance of the class containing this method. Should have attributes:
            - parent: Reference to the parent object
            - message_from_thread.emit: Method to emit messages from the thread
            - message_when_thread_finished.emit: Method to signal thread completion

        Returns
        -------
        None
            This method does not return a value but emits messages and modifies the state of
            self.parent objects.
        Notes
        -----
        This method performs several complex operations involving image segmentation and
        analysis generation. It handles both bio-masks and background masks, computes average
        pixel sizes, and updates various state attributes on the parent object.
        """
        tic = default_timer()
        if self.parent().po.visualize or len(self.parent().po.first_im.shape) == 2:
            self.message_from_thread.emit("Image segmentation... Do not close until it is finished.")
        else:
            self.message_from_thread.emit("Generating segmentation options... Do not close until it is finished.")
        self.parent().po.full_first_image_segmentation(not self.parent().imageanalysiswindow.asking_first_im_parameters_flag,
                                                       self.parent().imageanalysiswindow.bio_mask, self.parent().imageanalysiswindow.back_mask)

        logging.info(f" image analysis lasted {np.floor((default_timer() - tic) / 60).astype(int)} minutes {np.round((default_timer() - tic) % 60).astype(int)} secondes")
        self.message_when_thread_finished.emit(True)

__init__(parent=None)

Initialize the worker thread for analyzing the first image of a given folder

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for analyzing the first image of a given folder

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(FirstImageAnalysisThread, self).__init__(parent)
    self.setParent(parent)

run()

Perform image analysis and segmentation based on the current state of the application.

This function handles both bio-mask and background mask processing, emits status messages, computes average pixel size if necessary, and performs image segmentation or generates analysis options.

Parameters:

Name Type Description Default
self object

The instance of the class containing this method. Should have attributes: - parent: Reference to the parent object - message_from_thread.emit: Method to emit messages from the thread - message_when_thread_finished.emit: Method to signal thread completion

required

Returns:

Type Description
None

This method does not return a value but emits messages and modifies the state of self.parent objects.

Notes

This method performs several complex operations involving image segmentation and analysis generation. It handles both bio-masks and background masks, computes average pixel sizes, and updates various state attributes on the parent object.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Perform image analysis and segmentation based on the current state of the application.

    This function handles both bio-mask and background mask processing, emits status messages,
    computes average pixel size if necessary, and performs image segmentation or generates
    analysis options.

    Parameters
    ----------
    self : object
        The instance of the class containing this method. Should have attributes:
        - parent: Reference to the parent object
        - message_from_thread.emit: Method to emit messages from the thread
        - message_when_thread_finished.emit: Method to signal thread completion

    Returns
    -------
    None
        This method does not return a value but emits messages and modifies the state of
        self.parent objects.
    Notes
    -----
    This method performs several complex operations involving image segmentation and
    analysis generation. It handles both bio-masks and background masks, computes average
    pixel sizes, and updates various state attributes on the parent object.
    """
    tic = default_timer()
    if self.parent().po.visualize or len(self.parent().po.first_im.shape) == 2:
        self.message_from_thread.emit("Image segmentation... Do not close until it is finished.")
    else:
        self.message_from_thread.emit("Generating segmentation options... Do not close until it is finished.")
    self.parent().po.full_first_image_segmentation(not self.parent().imageanalysiswindow.asking_first_im_parameters_flag,
                                                   self.parent().imageanalysiswindow.bio_mask, self.parent().imageanalysiswindow.back_mask)

    logging.info(f" image analysis lasted {np.floor((default_timer() - tic) / 60).astype(int)} minutes {np.round((default_timer() - tic) % 60).astype(int)} secondes")
    self.message_when_thread_finished.emit(True)

GetExifDataThread

Bases: QThread

Thread for loading exif data from images.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class GetExifDataThread(QtCore.QThread):
    """
    Thread for loading exif data from images.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """

    def __init__(self, parent=None):
        """
        Initialize the worker thread for looking for the exif data.

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(GetExifDataThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Do extract exif data..
        """
        self.parent().po.save_exif()

__init__(parent=None)

Initialize the worker thread for looking for the exif data.

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for looking for the exif data.

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(GetExifDataThread, self).__init__(parent)
    self.setParent(parent)

run()

Do extract exif data..

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Do extract exif data..
    """
    self.parent().po.save_exif()

GetFirstImThread

Bases: QThread

Thread for getting the first image.

Signals

message_when_thread_finished : Signal(bool) Emitted when the thread finishes execution, indicating whether data loading was successful.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class GetFirstImThread(QtCore.QThread):
    """
    Thread for getting the first image.

    Signals
    -------
    message_when_thread_finished : Signal(bool)
        Emitted when the thread finishes execution, indicating whether data loading was successful.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_when_thread_finished = QtCore.Signal(np.ndarray)
    def __init__(self, parent=None):
        """
        Initialize the worker thread for loading the first image of one folder.

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(GetFirstImThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Run the first image reading task in the parent process and emit a signal when it finishes.
        """
        self.parent().po.get_first_image()
        self.message_when_thread_finished.emit(self.parent().po.first_im)

__init__(parent=None)

Initialize the worker thread for loading the first image of one folder.

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for loading the first image of one folder.

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(GetFirstImThread, self).__init__(parent)
    self.setParent(parent)

run()

Run the first image reading task in the parent process and emit a signal when it finishes.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Run the first image reading task in the parent process and emit a signal when it finishes.
    """
    self.parent().po.get_first_image()
    self.message_when_thread_finished.emit(self.parent().po.first_im)

GetLastImThread

Bases: QThread

Thread for getting the last image.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class GetLastImThread(QtCore.QThread):
    """
    Thread for getting the last image.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    def __init__(self, parent=None):
        """
        Initialize the worker thread for loading the last image of one folder.

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(GetLastImThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Run the last image reading task in the parent process.
        """
        self.parent().po.get_last_image()

__init__(parent=None)

Initialize the worker thread for loading the last image of one folder.

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for loading the last image of one folder.

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(GetLastImThread, self).__init__(parent)
    self.setParent(parent)

run()

Run the last image reading task in the parent process.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Run the last image reading task in the parent process.
    """
    self.parent().po.get_last_image()

LastImageAnalysisThread

Bases: QThread

Thread for analyzing the last image of a given folder.

Signals

message_from_thread : Signal(str) Signal emitted when progress messages are available. message_when_thread_finished : Signal(bool) Signal emitted upon completion of the thread's task.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class LastImageAnalysisThread(QtCore.QThread):
    """
    Thread for analyzing the last image of a given folder.

    Signals
    -------
    message_from_thread : Signal(str)
        Signal emitted when progress messages are available.
    message_when_thread_finished : Signal(bool)
        Signal emitted upon completion of the thread's task.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_from_thread = QtCore.Signal(str)
    message_when_thread_finished = QtCore.Signal(bool)

    def __init__(self, parent=None):
        """
        Initialize the worker thread for analyzing the last image of a given folder

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(LastImageAnalysisThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Summary:
        Run the image processing and analysis pipeline based on current settings.

        Extended Description:
        This function initiates the workflow for image processing and analysis,
        including segmenting images, generating analysis options, and handling
        various masks and settings based on the current state of the parent object.

        Returns:
        --------
        None
            This method does not return a value. It emits signals to indicate the
            progress and completion of the processing tasks.

        Notes:
        ------
        This function uses various attributes from the parent class to determine
        how to process and analyze images. The specific behavior is heavily
        dependent on the state of these attributes.

        Attributes:
        -----------
        parent() : object
            The owner of this instance, containing necessary settings and methods.
        message_from_thread.emit(s : str) : signal
            Signal to indicate progress messages from the thread.
        message_when_thread_finished.emit(success : bool) : signal
            Signal to indicate the completion of the thread.
        """
        if self.parent().po.visualize or (len(self.parent().po.first_im.shape) == 2 and not self.parent().po.network_shaped):
            self.message_from_thread.emit("Image segmentation... Do not close until it is finished.")
        else:
            self.message_from_thread.emit("Generating analysis options... Do not close until it is finished.")
        self.parent().po.full_last_image_segmentation(self.parent().imageanalysiswindow.bio_mask, self.parent().imageanalysiswindow.back_mask)
        self.message_when_thread_finished.emit(True)

__init__(parent=None)

Initialize the worker thread for analyzing the last image of a given folder

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for analyzing the last image of a given folder

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(LastImageAnalysisThread, self).__init__(parent)
    self.setParent(parent)

run()

Summary: Run the image processing and analysis pipeline based on current settings.

Extended Description: This function initiates the workflow for image processing and analysis, including segmenting images, generating analysis options, and handling various masks and settings based on the current state of the parent object.

Returns:

None This method does not return a value. It emits signals to indicate the progress and completion of the processing tasks.

Notes:

This function uses various attributes from the parent class to determine how to process and analyze images. The specific behavior is heavily dependent on the state of these attributes.

Attributes:

parent() : object The owner of this instance, containing necessary settings and methods. message_from_thread.emit(s : str) : signal Signal to indicate progress messages from the thread. message_when_thread_finished.emit(success : bool) : signal Signal to indicate the completion of the thread.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Summary:
    Run the image processing and analysis pipeline based on current settings.

    Extended Description:
    This function initiates the workflow for image processing and analysis,
    including segmenting images, generating analysis options, and handling
    various masks and settings based on the current state of the parent object.

    Returns:
    --------
    None
        This method does not return a value. It emits signals to indicate the
        progress and completion of the processing tasks.

    Notes:
    ------
    This function uses various attributes from the parent class to determine
    how to process and analyze images. The specific behavior is heavily
    dependent on the state of these attributes.

    Attributes:
    -----------
    parent() : object
        The owner of this instance, containing necessary settings and methods.
    message_from_thread.emit(s : str) : signal
        Signal to indicate progress messages from the thread.
    message_when_thread_finished.emit(success : bool) : signal
        Signal to indicate the completion of the thread.
    """
    if self.parent().po.visualize or (len(self.parent().po.first_im.shape) == 2 and not self.parent().po.network_shaped):
        self.message_from_thread.emit("Image segmentation... Do not close until it is finished.")
    else:
        self.message_from_thread.emit("Generating analysis options... Do not close until it is finished.")
    self.parent().po.full_last_image_segmentation(self.parent().imageanalysiswindow.bio_mask, self.parent().imageanalysiswindow.back_mask)
    self.message_when_thread_finished.emit(True)

LoadDataToRunCellectsQuicklyThread

Bases: QThread

Load data to run Cellects quickly in a separate thread.

This class is responsible for loading necessary data asynchronously in order to speed up the process of running Cellects.

Signals

message_when_thread_finished : Signal(str) Emitted when the thread finishes execution, indicating whether data loading was successful.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class LoadDataToRunCellectsQuicklyThread(QtCore.QThread):
    """
    Load data to run Cellects quickly in a separate thread.

    This class is responsible for loading necessary data asynchronously
    in order to speed up the process of running Cellects.

    Signals
    -------
    message_when_thread_finished : Signal(str)
        Emitted when the thread finishes execution, indicating whether data loading was successful.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_from_thread = QtCore.Signal(str)

    def __init__(self, parent=None):
        """
        Initialize the worker thread for quickly loading data to run Cellects.

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(LoadDataToRunCellectsQuicklyThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Execute the data loading and preparation process for running cellects without setting all parameters in the GUI.

        This method triggers the parent object's methods to look for data and load it,
        then checks if the first experiment is ready. If so, it emits a message.
        """
        self.parent().po.look_for_data()
        self.parent().po.load_data_to_run_cellects_quickly()
        if self.parent().po.first_exp_ready_to_run:
            self.message_from_thread.emit("Data found, Video tracking window and Run all directly are available")
        else:
            self.message_from_thread.emit("")

__init__(parent=None)

Initialize the worker thread for quickly loading data to run Cellects.

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for quickly loading data to run Cellects.

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(LoadDataToRunCellectsQuicklyThread, self).__init__(parent)
    self.setParent(parent)

run()

Execute the data loading and preparation process for running cellects without setting all parameters in the GUI.

This method triggers the parent object's methods to look for data and load it, then checks if the first experiment is ready. If so, it emits a message.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Execute the data loading and preparation process for running cellects without setting all parameters in the GUI.

    This method triggers the parent object's methods to look for data and load it,
    then checks if the first experiment is ready. If so, it emits a message.
    """
    self.parent().po.look_for_data()
    self.parent().po.load_data_to_run_cellects_quickly()
    if self.parent().po.first_exp_ready_to_run:
        self.message_from_thread.emit("Data found, Video tracking window and Run all directly are available")
    else:
        self.message_from_thread.emit("")

LoadFirstFolderIfSeveralThread

Bases: QThread

Thread for loading data from the first folder if there are several folders.

Signals

message_when_thread_finished : Signal(bool) Emitted when the thread finishes execution, indicating whether data loading was successful.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class LoadFirstFolderIfSeveralThread(QtCore.QThread):
    """
    Thread for loading data from the first folder if there are several folders.

    Signals
    -------
    message_when_thread_finished : Signal(bool)
        Emitted when the thread finishes execution, indicating whether data loading was successful.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_when_thread_finished = QtCore.Signal(bool)
    def __init__(self, parent=None):
        """
        Initialize the worker thread for loading data and parameters to run Cellects when analyzing several folders.

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(LoadFirstFolderIfSeveralThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Run the data lookup process.
        """
        self.parent().po.load_data_to_run_cellects_quickly()
        if not self.parent().po.first_exp_ready_to_run:
            self.parent().po.get_first_image()
        self.message_when_thread_finished.emit(self.parent().po.first_exp_ready_to_run)

__init__(parent=None)

Initialize the worker thread for loading data and parameters to run Cellects when analyzing several folders.

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for loading data and parameters to run Cellects when analyzing several folders.

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(LoadFirstFolderIfSeveralThread, self).__init__(parent)
    self.setParent(parent)

run()

Run the data lookup process.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Run the data lookup process.
    """
    self.parent().po.load_data_to_run_cellects_quickly()
    if not self.parent().po.first_exp_ready_to_run:
        self.parent().po.get_first_image()
    self.message_when_thread_finished.emit(self.parent().po.first_exp_ready_to_run)

LookForDataThreadInFirstW

Bases: QThread

Find and process data in a separate thread.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class LookForDataThreadInFirstW(QtCore.QThread):
    """
    Find and process data in a separate thread.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    def __init__(self, parent=None):

        """
        Initialize the worker thread for finding data to run Cellects.

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(LookForDataThreadInFirstW, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Run the data lookup process.
        """
        self.parent().po.look_for_data()

__init__(parent=None)

Initialize the worker thread for finding data to run Cellects.

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):

    """
    Initialize the worker thread for finding data to run Cellects.

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(LookForDataThreadInFirstW, self).__init__(parent)
    self.setParent(parent)

run()

Run the data lookup process.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Run the data lookup process.
    """
    self.parent().po.look_for_data()

PrecompileNJITThread

Bases: QThread

Precompile njit functions for speed optimization.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class PrecompileNJITThread(QtCore.QThread):
    """
    Precompile njit functions for speed optimization.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """

    def __init__(self, parent=None):
        """
        Initialize the worker thread for recompiling njit functions.

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(PrecompileNJITThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Execute the main functions for basic image segmentation to make it faster for true images.
        """
        po = ProgramOrganizer()
        im = np.zeros((3, 3, 3), dtype=np.uint8)
        im[1, 1, :] = 1
        po.get_first_image(im, sample_number=1)
        if not self.isInterruptionRequested():
            po.fast_first_image_segmentation()

__init__(parent=None)

Initialize the worker thread for recompiling njit functions.

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for recompiling njit functions.

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(PrecompileNJITThread, self).__init__(parent)
    self.setParent(parent)

run()

Execute the main functions for basic image segmentation to make it faster for true images.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Execute the main functions for basic image segmentation to make it faster for true images.
    """
    po = ProgramOrganizer()
    im = np.zeros((3, 3, 3), dtype=np.uint8)
    im[1, 1, :] = 1
    po.get_first_image(im, sample_number=1)
    if not self.isInterruptionRequested():
        po.fast_first_image_segmentation()

PrepareVideoAnalysisThread

Bases: QThread

Thread for preparing video analysis.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class PrepareVideoAnalysisThread(QtCore.QThread):
    """
    Thread for preparing video analysis.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """

    def __init__(self, parent=None):
        """
        Initialize the worker thread for ending up the last image analysis and preparing video analysis.

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(PrepareVideoAnalysisThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Run the image processing pipeline for the last image of the current folder.

        This method handles background subtraction,
        image segmentation, and data saving.
        """
        self.parent().po.get_background_to_subtract()
        self.parent().po.save_origins_and_backgrounds_lists()
        if self.parent().po.last_image is None:
            self.parent().po.get_last_image()
            self.parent().po.fast_last_image_segmentation()
        self.parent().po.find_if_lighter_background()
        logging.info("The current (or the first) folder is ready to run")
        self.parent().po.first_exp_ready_to_run = True
        self.parent().po.save_data_to_run_cellects_quickly()

__init__(parent=None)

Initialize the worker thread for ending up the last image analysis and preparing video analysis.

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for ending up the last image analysis and preparing video analysis.

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(PrepareVideoAnalysisThread, self).__init__(parent)
    self.setParent(parent)

run()

Run the image processing pipeline for the last image of the current folder.

This method handles background subtraction, image segmentation, and data saving.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Run the image processing pipeline for the last image of the current folder.

    This method handles background subtraction,
    image segmentation, and data saving.
    """
    self.parent().po.get_background_to_subtract()
    self.parent().po.save_origins_and_backgrounds_lists()
    if self.parent().po.last_image is None:
        self.parent().po.get_last_image()
        self.parent().po.fast_last_image_segmentation()
    self.parent().po.find_if_lighter_background()
    logging.info("The current (or the first) folder is ready to run")
    self.parent().po.first_exp_ready_to_run = True
    self.parent().po.save_data_to_run_cellects_quickly()

SaveAllVarsThread

Bases: QThread

Thread for saving the GUI parameters and updating current folder.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class SaveAllVarsThread(QtCore.QThread):
    """
    Thread for saving the GUI parameters and updating current folder.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """

    def __init__(self, parent=None):
        """
        Initialize the worker thread for saving the GUI parameters and updating current folder

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(SaveAllVarsThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Execute a sequence of operations to save data and update the current folder.

        This method performs several steps:
            1. Save variable dictionary.
            2. Set the current folder.
            3. Save data to run Cellects quickly without creating a new one if it doesn't exist.
        """
        self.parent().po.save_masks(remove_unused_masks=False)
        self.parent().po.save_variable_dict()
        self._set_current_folder()
        self.parent().po.save_data_to_run_cellects_quickly(new_one_if_does_not_exist=False)

    def _set_current_folder(self):
        """
        Set the current folder based on conditions.

        Sets the current folder to the first one in the list if there are multiple
        folders, otherwise sets it to a reduced global pathway.
        """
        if self.parent().po.all['folder_number'] > 1: # len(self.parent().po.all['folder_list']) > 1:  # len(self.parent().po.all['folder_list']) > 0:
            logging.info(f"Use {self.parent().po.all['folder_list'][0]} folder")
            self.parent().po.update_folder_id(self.parent().po.all['sample_number_per_folder'][0],
                                              self.parent().po.all['folder_list'][0])
        else:
            curr_path = reduce_path_len(self.parent().po.all['global_pathway'], 6, 10)
            logging.info(f"Use {curr_path} folder")
            self.parent().po.update_folder_id(self.parent().po.all['first_folder_sample_number'])

__init__(parent=None)

Initialize the worker thread for saving the GUI parameters and updating current folder

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for saving the GUI parameters and updating current folder

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(SaveAllVarsThread, self).__init__(parent)
    self.setParent(parent)

run()

Execute a sequence of operations to save data and update the current folder.

This method performs several steps: 1. Save variable dictionary. 2. Set the current folder. 3. Save data to run Cellects quickly without creating a new one if it doesn't exist.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Execute a sequence of operations to save data and update the current folder.

    This method performs several steps:
        1. Save variable dictionary.
        2. Set the current folder.
        3. Save data to run Cellects quickly without creating a new one if it doesn't exist.
    """
    self.parent().po.save_masks(remove_unused_masks=False)
    self.parent().po.save_variable_dict()
    self._set_current_folder()
    self.parent().po.save_data_to_run_cellects_quickly(new_one_if_does_not_exist=False)

SaveManualDelineationThread

Bases: QThread

Thread for saving user's defined arena delineation through the GUI.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class SaveManualDelineationThread(QtCore.QThread):
    """
    Thread for saving user's defined arena delineation through the GUI.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    def __init__(self, parent=None):
        """
        Initialize the worker thread for saving the arena coordinates when the user draw them manually

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(SaveManualDelineationThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Do save the coordinates.
        """
        self.parent().po.left = []
        self.parent().po.right = []
        self.parent().po.top = []
        self.parent().po.bot = []
        for arena_i in np.arange(self.parent().po.sample_number):
            y, x = np.nonzero(self.parent().imageanalysiswindow.arena_mask == arena_i + 1)
            self.parent().po.left.append(int(np.min(x)))
            self.parent().po.right.append(int(np.max(x)))
            self.parent().po.top.append(int(np.min(y)))
            self.parent().po.bot.append(int(np.max(y)))
        self.parent().po.save_coordinates()
        self.parent().po.save_data_to_run_cellects_quickly()

        logging.info("Save manual video delineation")
        self.parent().po.vars['analyzed_individuals'] = list(range(1, self.parent().po.sample_number + 1))

__init__(parent=None)

Initialize the worker thread for saving the arena coordinates when the user draw them manually

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for saving the arena coordinates when the user draw them manually

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(SaveManualDelineationThread, self).__init__(parent)
    self.setParent(parent)

run()

Do save the coordinates.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Do save the coordinates.
    """
    self.parent().po.left = []
    self.parent().po.right = []
    self.parent().po.top = []
    self.parent().po.bot = []
    for arena_i in np.arange(self.parent().po.sample_number):
        y, x = np.nonzero(self.parent().imageanalysiswindow.arena_mask == arena_i + 1)
        self.parent().po.left.append(int(np.min(x)))
        self.parent().po.right.append(int(np.max(x)))
        self.parent().po.top.append(int(np.min(y)))
        self.parent().po.bot.append(int(np.max(y)))
    self.parent().po.save_coordinates()
    self.parent().po.save_data_to_run_cellects_quickly()

    logging.info("Save manual video delineation")
    self.parent().po.vars['analyzed_individuals'] = list(range(1, self.parent().po.sample_number + 1))

UpdateImageThread

Bases: QThread

Thread for updating GUI image.

Signals

message_when_thread_finished : Signal(bool) Emitted when the thread finishes execution, indicating whether image displaying was successful.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class UpdateImageThread(QtCore.QThread):
    """
    Thread for updating GUI image.

    Signals
    -------
    message_when_thread_finished : Signal(bool)
        Emitted when the thread finishes execution, indicating whether image displaying was successful.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_when_thread_finished = QtCore.Signal(bool)

    def __init__(self, parent=None):
        """
        Initialize the worker thread for updating the image displayed in GUI

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(UpdateImageThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Execute the image display process, including user input handling and mask application.

        This method performs several steps to analyze an image based on user input
        and saved mask coordinates. It updates the drawn image with segmentation masks,
        back masks, bio masks, and video contours.

        Other Parameters
        ----------------
        user_input : bool, optional
            Flag indicating whether user input is available.
        idx : list or numpy.ndarray, optional
            Coordinates of the user- defined region of interest.
        temp_mask_coord : list, optional
            Temporary mask coordinates.
        saved_coord : list, optional
            Saved mask coordinates.

        Notes
        -----
        - This function updates several attributes of `self.parent().imageanalysiswindow`.
        - Performance considerations include handling large images efficiently.
        - Important behavioral caveats: Ensure coordinates are within image bounds.
        """
        # I/ If this thread runs from user input, get the right coordinates
        # and convert them to fit the displayed image size
        user_input = len(self.parent().imageanalysiswindow.saved_coord) > 0 or len(self.parent().imageanalysiswindow.temporary_mask_coord) > 0
        dims = self.parent().imageanalysiswindow.drawn_image.shape
        if user_input:
            if len(self.parent().imageanalysiswindow.temporary_mask_coord) > 0:
                idx = self.parent().imageanalysiswindow.temporary_mask_coord
            else:
                idx = self.parent().imageanalysiswindow.saved_coord
            if len(idx) < 2:
                user_input = False
            else:
                # Convert coordinates:
                self.parent().imageanalysiswindow.display_image.update_image_scaling_factors()
                sf = self.parent().imageanalysiswindow.display_image.scaling_factors
                idx, min_y, max_y, min_x, max_x = scale_coordinates(coord=idx, scale=sf, dims=dims)
                minmax = min_y, max_y, min_x, max_x

        if len(self.parent().imageanalysiswindow.temporary_mask_coord) == 0:
            # not_load
            # II/ If this thread aims at saving the last user input and displaying all user inputs:
            # Update the drawn_image according to every saved masks
            # 1) The segmentation mask
            # 2) The back_mask and bio_mask
            # 3) The automatically detected video contours
            # (re-)Initialize drawn image
            self.parent().imageanalysiswindow.drawn_image = self.parent().po.current_image.copy()
            contour_width = get_contour_width_from_im_shape(dims)
            # 1) Add the segmentation mask to the image
            if self.parent().imageanalysiswindow.is_first_image_flag:
                im_combinations = self.parent().po.first_image.im_combinations
                im_mean = self.parent().po.first_image.image.mean()
            else:
                im_combinations = self.parent().po.last_image.im_combinations
                im_mean = self.parent().po.last_image.image.mean()
            # If there are image combinations, get the current corresponding binary image
            if im_combinations is not None and len(im_combinations) != 0:
                binary_idx = im_combinations[self.parent().po.current_combination_id]["binary_image"]
                # If it concerns the last image, only keep the contour coordinates
                binary_idx = cv2.dilate(get_contours(binary_idx), kernel=cross_33, iterations=contour_width)
                binary_idx = np.nonzero(binary_idx)
                # Color these coordinates in magenta on bright images, and in pink on dark images
                if im_mean > 126:
                    # Color the segmentation mask in magenta
                    self.parent().imageanalysiswindow.drawn_image[binary_idx[0], binary_idx[1], :] = np.array((20, 0, 150), dtype=np.uint8)
                else:
                    # Color the segmentation mask in pink
                    self.parent().imageanalysiswindow.drawn_image[binary_idx[0], binary_idx[1], :] = np.array((94, 0, 213), dtype=np.uint8)
            if user_input:# save
                if self.parent().imageanalysiswindow.back1_bio2 == 0:
                    mask_shape = self.parent().po.vars['arena_shape']
                elif self.parent().imageanalysiswindow.back1_bio2 == 1:
                    mask_shape = "rectangle"
                elif self.parent().imageanalysiswindow.back1_bio2 == 2:
                    mask_shape = self.parent().po.all['starting_blob_shape']
                    if mask_shape is None:
                        mask_shape = 'circle'
                # Save the user drawn mask
                mask = create_mask(dims, minmax, mask_shape)
                mask = np.nonzero(mask)

                if self.parent().imageanalysiswindow.back1_bio2 == 1:
                    self.parent().imageanalysiswindow.back_masks_number += 1
                    self.parent().imageanalysiswindow.back_mask[mask[0], mask[1]] = self.parent().imageanalysiswindow.available_back_names[0]
                elif self.parent().imageanalysiswindow.back1_bio2 == 2:
                    self.parent().imageanalysiswindow.bio_masks_number += 1
                    self.parent().imageanalysiswindow.bio_mask[mask[0], mask[1]] = self.parent().imageanalysiswindow.available_bio_names[0]
                elif self.parent().imageanalysiswindow.manual_delineation_flag:
                    self.parent().imageanalysiswindow.arena_masks_number += 1
                    self.parent().imageanalysiswindow.arena_mask[mask[0], mask[1]] = self.parent().imageanalysiswindow.available_arena_names[0]
                # 2)a) Apply all these masks to the drawn image:

            back_coord = np.nonzero(self.parent().imageanalysiswindow.back_mask)

            bio_coord = np.nonzero(self.parent().imageanalysiswindow.bio_mask)

            if self.parent().imageanalysiswindow.arena_mask is not None:
                arena_coord = np.nonzero(self.parent().imageanalysiswindow.arena_mask)
                self.parent().imageanalysiswindow.drawn_image[arena_coord[0], arena_coord[1], :] = np.repeat(self.parent().po.vars['contour_color'], 3).astype(np.uint8)

            self.parent().imageanalysiswindow.drawn_image[back_coord[0], back_coord[1], :] = np.array((224, 160, 81), dtype=np.uint8)

            self.parent().imageanalysiswindow.drawn_image[bio_coord[0], bio_coord[1], :] = np.array((17, 160, 212), dtype=np.uint8)

            image = self.parent().imageanalysiswindow.drawn_image.copy()
            # 3) The automatically detected video contours
            if self.parent().imageanalysiswindow.delineation_done:  # add a mask of the video contour
                if self.parent().po.vars['contour_color'] == 255:
                    arena_contour_col = (240, 232, 202)
                else:
                    arena_contour_col = (138, 95, 18)
                # Draw the delineation mask of each arena
                for _i, (min_cy, max_cy, min_cx, max_cx) in enumerate(zip(self.parent().po.top, self.parent().po.bot, self.parent().po.left, self.parent().po.right)):
                    position = (min_cx + 25, min_cy + (max_cy - min_cy) // 2)
                    image = cv2.putText(image, f"{_i + 1}", position, cv2.FONT_HERSHEY_SIMPLEX, 1,  arena_contour_col + (255,),2)
                    if (max_cy - min_cy) < 0 or (max_cx - min_cx) < 0:
                        self.parent().imageanalysiswindow.message.setText("Error: the shape number or the detection is wrong")
                    image = draw_img_with_mask(image, dims, (min_cy, max_cy - 1, min_cx, max_cx - 1),
                                               self.parent().po.vars['arena_shape'], arena_contour_col, True, contour_width)
        else: #load
            if user_input:
                # III/ If this thread runs from user input: update the drawn_image according to the current user input
                # Just add the mask to drawn_image as quick as possible
                # Add user defined masks
                # Take the drawn image and add the temporary mask to it
                image = self.parent().imageanalysiswindow.drawn_image.copy()
                if self.parent().imageanalysiswindow.back1_bio2 == 2:
                    color = (17, 160, 212)
                    mask_shape = self.parent().po.all['starting_blob_shape']
                    if mask_shape is None:
                        mask_shape = 'circle'
                elif self.parent().imageanalysiswindow.back1_bio2 == 1:
                    color = (224, 160, 81)
                    mask_shape = "rectangle"
                else:
                    color = (0, 0, 0)
                    mask_shape = self.parent().po.vars['arena_shape']
                image = draw_img_with_mask(image, dims, minmax, mask_shape, color)
        self.parent().imageanalysiswindow.display_image.update_image(image)
        self.message_when_thread_finished.emit(True)

__init__(parent=None)

Initialize the worker thread for updating the image displayed in GUI

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for updating the image displayed in GUI

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(UpdateImageThread, self).__init__(parent)
    self.setParent(parent)

run()

Execute the image display process, including user input handling and mask application.

This method performs several steps to analyze an image based on user input and saved mask coordinates. It updates the drawn image with segmentation masks, back masks, bio masks, and video contours.

Other Parameters:

Name Type Description
user_input bool

Flag indicating whether user input is available.

idx list or ndarray

Coordinates of the user- defined region of interest.

temp_mask_coord list

Temporary mask coordinates.

saved_coord list

Saved mask coordinates.

Notes
  • This function updates several attributes of self.parent().imageanalysiswindow.
  • Performance considerations include handling large images efficiently.
  • Important behavioral caveats: Ensure coordinates are within image bounds.
Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Execute the image display process, including user input handling and mask application.

    This method performs several steps to analyze an image based on user input
    and saved mask coordinates. It updates the drawn image with segmentation masks,
    back masks, bio masks, and video contours.

    Other Parameters
    ----------------
    user_input : bool, optional
        Flag indicating whether user input is available.
    idx : list or numpy.ndarray, optional
        Coordinates of the user- defined region of interest.
    temp_mask_coord : list, optional
        Temporary mask coordinates.
    saved_coord : list, optional
        Saved mask coordinates.

    Notes
    -----
    - This function updates several attributes of `self.parent().imageanalysiswindow`.
    - Performance considerations include handling large images efficiently.
    - Important behavioral caveats: Ensure coordinates are within image bounds.
    """
    # I/ If this thread runs from user input, get the right coordinates
    # and convert them to fit the displayed image size
    user_input = len(self.parent().imageanalysiswindow.saved_coord) > 0 or len(self.parent().imageanalysiswindow.temporary_mask_coord) > 0
    dims = self.parent().imageanalysiswindow.drawn_image.shape
    if user_input:
        if len(self.parent().imageanalysiswindow.temporary_mask_coord) > 0:
            idx = self.parent().imageanalysiswindow.temporary_mask_coord
        else:
            idx = self.parent().imageanalysiswindow.saved_coord
        if len(idx) < 2:
            user_input = False
        else:
            # Convert coordinates:
            self.parent().imageanalysiswindow.display_image.update_image_scaling_factors()
            sf = self.parent().imageanalysiswindow.display_image.scaling_factors
            idx, min_y, max_y, min_x, max_x = scale_coordinates(coord=idx, scale=sf, dims=dims)
            minmax = min_y, max_y, min_x, max_x

    if len(self.parent().imageanalysiswindow.temporary_mask_coord) == 0:
        # not_load
        # II/ If this thread aims at saving the last user input and displaying all user inputs:
        # Update the drawn_image according to every saved masks
        # 1) The segmentation mask
        # 2) The back_mask and bio_mask
        # 3) The automatically detected video contours
        # (re-)Initialize drawn image
        self.parent().imageanalysiswindow.drawn_image = self.parent().po.current_image.copy()
        contour_width = get_contour_width_from_im_shape(dims)
        # 1) Add the segmentation mask to the image
        if self.parent().imageanalysiswindow.is_first_image_flag:
            im_combinations = self.parent().po.first_image.im_combinations
            im_mean = self.parent().po.first_image.image.mean()
        else:
            im_combinations = self.parent().po.last_image.im_combinations
            im_mean = self.parent().po.last_image.image.mean()
        # If there are image combinations, get the current corresponding binary image
        if im_combinations is not None and len(im_combinations) != 0:
            binary_idx = im_combinations[self.parent().po.current_combination_id]["binary_image"]
            # If it concerns the last image, only keep the contour coordinates
            binary_idx = cv2.dilate(get_contours(binary_idx), kernel=cross_33, iterations=contour_width)
            binary_idx = np.nonzero(binary_idx)
            # Color these coordinates in magenta on bright images, and in pink on dark images
            if im_mean > 126:
                # Color the segmentation mask in magenta
                self.parent().imageanalysiswindow.drawn_image[binary_idx[0], binary_idx[1], :] = np.array((20, 0, 150), dtype=np.uint8)
            else:
                # Color the segmentation mask in pink
                self.parent().imageanalysiswindow.drawn_image[binary_idx[0], binary_idx[1], :] = np.array((94, 0, 213), dtype=np.uint8)
        if user_input:# save
            if self.parent().imageanalysiswindow.back1_bio2 == 0:
                mask_shape = self.parent().po.vars['arena_shape']
            elif self.parent().imageanalysiswindow.back1_bio2 == 1:
                mask_shape = "rectangle"
            elif self.parent().imageanalysiswindow.back1_bio2 == 2:
                mask_shape = self.parent().po.all['starting_blob_shape']
                if mask_shape is None:
                    mask_shape = 'circle'
            # Save the user drawn mask
            mask = create_mask(dims, minmax, mask_shape)
            mask = np.nonzero(mask)

            if self.parent().imageanalysiswindow.back1_bio2 == 1:
                self.parent().imageanalysiswindow.back_masks_number += 1
                self.parent().imageanalysiswindow.back_mask[mask[0], mask[1]] = self.parent().imageanalysiswindow.available_back_names[0]
            elif self.parent().imageanalysiswindow.back1_bio2 == 2:
                self.parent().imageanalysiswindow.bio_masks_number += 1
                self.parent().imageanalysiswindow.bio_mask[mask[0], mask[1]] = self.parent().imageanalysiswindow.available_bio_names[0]
            elif self.parent().imageanalysiswindow.manual_delineation_flag:
                self.parent().imageanalysiswindow.arena_masks_number += 1
                self.parent().imageanalysiswindow.arena_mask[mask[0], mask[1]] = self.parent().imageanalysiswindow.available_arena_names[0]
            # 2)a) Apply all these masks to the drawn image:

        back_coord = np.nonzero(self.parent().imageanalysiswindow.back_mask)

        bio_coord = np.nonzero(self.parent().imageanalysiswindow.bio_mask)

        if self.parent().imageanalysiswindow.arena_mask is not None:
            arena_coord = np.nonzero(self.parent().imageanalysiswindow.arena_mask)
            self.parent().imageanalysiswindow.drawn_image[arena_coord[0], arena_coord[1], :] = np.repeat(self.parent().po.vars['contour_color'], 3).astype(np.uint8)

        self.parent().imageanalysiswindow.drawn_image[back_coord[0], back_coord[1], :] = np.array((224, 160, 81), dtype=np.uint8)

        self.parent().imageanalysiswindow.drawn_image[bio_coord[0], bio_coord[1], :] = np.array((17, 160, 212), dtype=np.uint8)

        image = self.parent().imageanalysiswindow.drawn_image.copy()
        # 3) The automatically detected video contours
        if self.parent().imageanalysiswindow.delineation_done:  # add a mask of the video contour
            if self.parent().po.vars['contour_color'] == 255:
                arena_contour_col = (240, 232, 202)
            else:
                arena_contour_col = (138, 95, 18)
            # Draw the delineation mask of each arena
            for _i, (min_cy, max_cy, min_cx, max_cx) in enumerate(zip(self.parent().po.top, self.parent().po.bot, self.parent().po.left, self.parent().po.right)):
                position = (min_cx + 25, min_cy + (max_cy - min_cy) // 2)
                image = cv2.putText(image, f"{_i + 1}", position, cv2.FONT_HERSHEY_SIMPLEX, 1,  arena_contour_col + (255,),2)
                if (max_cy - min_cy) < 0 or (max_cx - min_cx) < 0:
                    self.parent().imageanalysiswindow.message.setText("Error: the shape number or the detection is wrong")
                image = draw_img_with_mask(image, dims, (min_cy, max_cy - 1, min_cx, max_cx - 1),
                                           self.parent().po.vars['arena_shape'], arena_contour_col, True, contour_width)
    else: #load
        if user_input:
            # III/ If this thread runs from user input: update the drawn_image according to the current user input
            # Just add the mask to drawn_image as quick as possible
            # Add user defined masks
            # Take the drawn image and add the temporary mask to it
            image = self.parent().imageanalysiswindow.drawn_image.copy()
            if self.parent().imageanalysiswindow.back1_bio2 == 2:
                color = (17, 160, 212)
                mask_shape = self.parent().po.all['starting_blob_shape']
                if mask_shape is None:
                    mask_shape = 'circle'
            elif self.parent().imageanalysiswindow.back1_bio2 == 1:
                color = (224, 160, 81)
                mask_shape = "rectangle"
            else:
                color = (0, 0, 0)
                mask_shape = self.parent().po.vars['arena_shape']
            image = draw_img_with_mask(image, dims, minmax, mask_shape, color)
    self.parent().imageanalysiswindow.display_image.update_image(image)
    self.message_when_thread_finished.emit(True)

VideoReaderThread

Bases: QThread

Thread for reading a video in the GUI.

Signals

message_from_thread : Signal(dict) Signal emitted during the video reading to display images to the GUI.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class VideoReaderThread(QtCore.QThread):
    """
    Thread for reading a video in the GUI.

    Signals
    --------
    message_from_thread : Signal(dict)
        Signal emitted during the video reading to display images to the GUI.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_from_thread = QtCore.Signal(dict)

    def __init__(self, parent=None):
        """
        Initialize the worker thread for reading a video in the GUI

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(VideoReaderThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Summary
        -------
        Run the video analysis process, applying segmentation and contouring to each frame.

        Extended Description
        --------------------
        This method performs video analysis by segmenting frames based on selected options and overlaying contours.
        It also updates the UI with progress messages.

        Notes
        -----
        This method emits signals to update the UI with progress messages and current images.
        It uses OpenCV for morphological operations on video frames.
        """
        video_analysis = self.parent().po.motion.visu.copy()
        self.message_from_thread.emit(
            {"current_image": video_analysis[0, ...], "message": f"Video preparation... Do not close until it is finished."})
        video_mask = np.zeros(self.parent().po.motion.dims[:3], dtype=np.uint8)
        if self.parent().po.load_quick_full > 0:
            if self.parent().po.all['compute_all_options']:
                if self.parent().po.all['video_option'] == 0:
                    video_mask = self.parent().po.motion.segmented
                else:
                    if self.parent().po.all['video_option'] == 1:
                        mask = self.parent().po.motion.luminosity_segmentation
                    elif self.parent().po.all['video_option'] == 2:
                        mask = self.parent().po.motion.gradient_segmentation
                    elif self.parent().po.all['video_option'] == 3:
                        mask = self.parent().po.motion.logical_and
                    elif self.parent().po.all['video_option'] == 4:
                        mask = self.parent().po.motion.logical_or
                    video_mask[mask[0], mask[1], mask[2]] = 1
            else:
                if self.parent().po.computed_video_options[self.parent().po.all['video_option']]:
                    video_mask = self.parent().po.motion.segmented

        frame_delay = (8 + np.log10(self.parent().po.motion.dims[0])) / self.parent().po.motion.dims[0]
        for t in np.arange(self.parent().po.motion.dims[0]):
            mask = cv2.morphologyEx(video_mask[t, ...], cv2.MORPH_GRADIENT, cross_33)
            mask = np.stack((mask, mask, mask), axis=2)
            current_image = video_analysis[t, ...].copy()
            current_image[mask > 0] = self.parent().po.vars['contour_color']
            self.message_from_thread.emit(
                {"current_image": current_image, "message": f"Reading in progress... Image number: {t}"}) #, "time": timings[t]
            time.sleep(frame_delay)
            if self.isInterruptionRequested():
                break
        self.message_from_thread.emit({"current_image": current_image, "message": ""})#, "time": timings[t]

__init__(parent=None)

Initialize the worker thread for reading a video in the GUI

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for reading a video in the GUI

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(VideoReaderThread, self).__init__(parent)
    self.setParent(parent)

run()

Summary

Run the video analysis process, applying segmentation and contouring to each frame.

Extended Description

This method performs video analysis by segmenting frames based on selected options and overlaying contours. It also updates the UI with progress messages.

Notes

This method emits signals to update the UI with progress messages and current images. It uses OpenCV for morphological operations on video frames.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Summary
    -------
    Run the video analysis process, applying segmentation and contouring to each frame.

    Extended Description
    --------------------
    This method performs video analysis by segmenting frames based on selected options and overlaying contours.
    It also updates the UI with progress messages.

    Notes
    -----
    This method emits signals to update the UI with progress messages and current images.
    It uses OpenCV for morphological operations on video frames.
    """
    video_analysis = self.parent().po.motion.visu.copy()
    self.message_from_thread.emit(
        {"current_image": video_analysis[0, ...], "message": f"Video preparation... Do not close until it is finished."})
    video_mask = np.zeros(self.parent().po.motion.dims[:3], dtype=np.uint8)
    if self.parent().po.load_quick_full > 0:
        if self.parent().po.all['compute_all_options']:
            if self.parent().po.all['video_option'] == 0:
                video_mask = self.parent().po.motion.segmented
            else:
                if self.parent().po.all['video_option'] == 1:
                    mask = self.parent().po.motion.luminosity_segmentation
                elif self.parent().po.all['video_option'] == 2:
                    mask = self.parent().po.motion.gradient_segmentation
                elif self.parent().po.all['video_option'] == 3:
                    mask = self.parent().po.motion.logical_and
                elif self.parent().po.all['video_option'] == 4:
                    mask = self.parent().po.motion.logical_or
                video_mask[mask[0], mask[1], mask[2]] = 1
        else:
            if self.parent().po.computed_video_options[self.parent().po.all['video_option']]:
                video_mask = self.parent().po.motion.segmented

    frame_delay = (8 + np.log10(self.parent().po.motion.dims[0])) / self.parent().po.motion.dims[0]
    for t in np.arange(self.parent().po.motion.dims[0]):
        mask = cv2.morphologyEx(video_mask[t, ...], cv2.MORPH_GRADIENT, cross_33)
        mask = np.stack((mask, mask, mask), axis=2)
        current_image = video_analysis[t, ...].copy()
        current_image[mask > 0] = self.parent().po.vars['contour_color']
        self.message_from_thread.emit(
            {"current_image": current_image, "message": f"Reading in progress... Image number: {t}"}) #, "time": timings[t]
        time.sleep(frame_delay)
        if self.isInterruptionRequested():
            break
    self.message_from_thread.emit({"current_image": current_image, "message": ""})#, "time": timings[t]

VideoTrackingThread

Bases: QThread

Thread for processing the complete motion analysis.

Signals

message_from_thread : Signal(str) Signal emitted when the thread successfully starts. image_from_thread : Signal(dict) Signal emitted during the video reading or analysis to display images of the current status to the GUI. when_loading_finished : Signal(bool) Signal emitted when the video is completely loaded. when_detection_finished : Signal(str) Signal emitted when the video analysis is finished.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
class VideoTrackingThread(QtCore.QThread):
    """
    Thread for processing the complete motion analysis.

    Signals
    -------
    message_from_thread : Signal(str)
        Signal emitted when the thread successfully starts.
    image_from_thread : Signal(dict)
        Signal emitted during the video reading or analysis to display images of the current status to the GUI.
    when_loading_finished : Signal(bool)
        Signal emitted when the video is completely loaded.
    when_detection_finished : Signal(str)
        Signal emitted when the video analysis is finished.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    message_from_thread = QtCore.Signal(str)
    image_from_thread = QtCore.Signal(dict)
    when_loading_finished = QtCore.Signal(bool)
    when_detection_finished = QtCore.Signal(str)

    def __init__(self, parent=None):
        """
        Initialize the worker thread

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(VideoTrackingThread, self).__init__(parent)
        self.setParent(parent)
        self.status = {"continue": True, "folder": "", "message": ""}

    def run(self):
        self.status = {"continue": True, "folder": reduce_path_len(self.parent().po.all['global_pathway'], 6, 10), "message": ""}
        if self.parent().videoanalysiswindow.video_task == 'all':
            self.run_all()
        elif self.parent().videoanalysiswindow.video_task == 'one_arena':
            self.run_one_arena()
        elif self.parent().videoanalysiswindow.video_task == 'change_one_arena_result':
            self.change_one_arena_result()

    def run_all(self):
        """
        Run the analysis process for video writing and motion analysis.

        This method manages the overall flow of the analysis including setting up
        folders, loading data, writing videos from images, and performing motion
        analysis. It handles various conditions like checking if the specimen number
        matches expectations or if multiple experiments are ready to run.

        Returns
        -------
        dict
            A dictionary containing:
            - 'continue': bool indicating if the analysis should continue.
            - 'message': str with a relevant message about the current status.
        Notes
        -----
        This method uses several internal methods like `set_current_folder`,
        `run_video_writing`, and `run_motion_analysis` to perform the analysis steps.
        It also checks various conditions based on parent object attributes.
        """
        self.parent().po.all['compute_all_options'] = False
        self.parent().po.load_quick_full = 2
        self.set_current_folder(0)
        if self.parent().po.first_exp_ready_to_run:
            self.message_from_thread.emit(f"{self.status['folder']}, Writing videos")
            if not self.parent().po.vars['several_blob_per_arena'] and self.parent().po.sample_number != len(
                    self.parent().po.bot):
                self.status["continue"] = False
                self.status["message"] = f"Wrong specimen number: restart the image analysis."
            else:
                self.run_video_writing()
                if self.status['continue']:
                    self.message_from_thread.emit(f"{self.status['folder']}, Analysing all videos")
                    self.run_motion_analysis()
                    if self.isInterruptionRequested():
                        self.status['message'] = f"Was waiting for thread interruption"
                        self.status['continue'] = False
                if self.status['continue']:
                    if self.parent().po.all['folder_number'] > 1:
                        self.parent().po.all['folder_list'] = self.parent().po.all['folder_list'][1:]
                        self.parent().po.all['sample_number_per_folder'] = self.parent().po.all[
                            'sample_number_per_folder'][1:]
        else:
            self.parent().po.look_for_data()

        if self.status['continue'] and (
                not self.parent().po.first_exp_ready_to_run or self.parent().po.all['folder_number'] > 1):
            folder_number = np.max((len(self.parent().po.all['folder_list']), 1))

            for exp_i in np.arange(folder_number):
                if len(self.parent().po.all['folder_list']) > 0:
                    logging.info(self.parent().po.all['folder_list'][exp_i])
                self.parent().po.first_im = None
                self.parent().po.first_image = None
                self.parent().po.last_im = None
                self.parent().po.last_image = None
                self.parent().po.top = None

                self.message_from_thread.emit(f"{self.status['folder']}, Pre-processing")
                self.pre_processing()
                if self.status['continue']:
                    self.message_from_thread.emit(f"{self.status['folder']}, Writing videos")
                    if not self.parent().po.vars[
                        'several_blob_per_arena'] and self.parent().po.sample_number != len(self.parent().po.bot):
                        self.status['continue'] = False
                        self.status['message'] = f"Wrong specimen number: first image analysis is mandatory."
                    else:
                        self.run_video_writing()
                        if self.status['continue']:
                            self.message_from_thread.emit(f"{self.status['folder']}, Analysing all videos")
                            self.run_motion_analysis()
                            if self.isInterruptionRequested():
                                self.status['message'] = f"Was waiting for thread interruption"
                                self.status['continue'] = False
                if not self.status['continue']:
                    break
        if self.status['continue']:
            if self.parent().po.all['folder_number'] > 1:
                self.message_from_thread.emit(
                    f"Exp {self.parent().po.all['folder_list'][0]} to {self.parent().po.all['folder_list'][-1]} analyzed")
            else:
                curr_path = reduce_path_len(self.parent().po.all['global_pathway'], 6, 10)
                self.message_from_thread.emit(f'Exp {curr_path} analyzed')
        else:
            logging.error(f"{self.status['folder']}, {self.status['message']}")
            self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message']}")

    def run_one_arena(self):
        """

        Run analysis on one arena.

        This method prepares and initiates the analysis process for a video by setting up required folders,
        loading necessary data, and performing pre-processing steps. It manages the state of running analysis and
        handles memory allocation for efficient processing.

        Notes
        -----
        - This method uses threading to handle long-running operations without blocking the main UI.
        - The memory allocation is dynamically adjusted based on available system resources.

        Attributes
        ----------
        self.parent().po.vars['convert_for_motion'] : dict
            Dictionary containing variables related to motion conversion.
        self.parent().po.first_exp_ready_to_run : bool
            Boolean indicating if the first experiment is ready to run.
        self.parent().po.cores : int
            Number of cores available for processing.
        self.parent().po.motion : object
            Object containing motion-related data and methods.
        self.parent().po.load_quick_full : int
            Number of arenas to load quickly for full detection.
        """

        self.message_from_thread.emit(f"{self.status['folder']}, Video loading, wait...")
        #Need a look for data when cellects_settings.json and 1 folder selected amon several
        self.pre_processing()
        if self.isInterruptionRequested():
            self.status['message'] = f"Was waiting for thread interruption"
            self.status['continue'] = False
        if self.status['continue']:
            memory_diff = self.parent().po.update_available_core_nb()
            if self.parent().po.cores == 0:
                self.status['message'] = f"Analyzing one arena requires {memory_diff}GB of additional RAM to run"
                self.status['continue'] = False
            else:
                if self.parent().po.motion is None or self.parent().po.load_quick_full == 0:
                    self.load_one_arena()
                if self.status['continue'] and self.parent().po.load_quick_full > 0:
                    if self.parent().po.motion.start is not None:
                        logging.info("One arena detection has started")
                        self.one_detection()
                        if self.status['continue']:
                            if self.parent().po.load_quick_full > 1:
                                logging.info("One arena post-processing has started")
                                self.post_processing()
                            else:
                                self.when_detection_finished.emit("Detection done, read to see the result")
                    else:
                        self.status['message'] = f"The current parameters failed to detect the cell(s) motion"
                        self.status['continue'] = False

        if not self.status['continue']:
            self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message']}")
            logging.error(f"{self.status['folder']}, {self.status['message']}")

    def set_current_folder(self, exp_i: int=1):
        """

        Sets the current folder based on conditions.

        This method determines which folder to use and updates the current
        folder ID accordingly. If there are multiple folders, it uses the first folder
        from the list; otherwise, it uses a reduced global pathway as the current.
        """
        if self.parent().po.all['folder_number'] > 1:
            logging.info(f"Use {self.parent().po.all['folder_list'][exp_i]} folder")
            self.status["folder"] = f"{str(self.parent().po.all['global_pathway'])[:6]} ... {self.parent().po.all['folder_list'][exp_i]}"
            self.parent().po.update_folder_id(self.parent().po.all['sample_number_per_folder'][exp_i],
                                              self.parent().po.all['folder_list'][exp_i])
        else:
            self.status["folder"] = reduce_path_len(self.parent().po.all['global_pathway'], 6, 10)
            logging.info(f"Use {self.status["folder"]} folder")
            self.parent().po.update_folder_id(self.parent().po.all['first_folder_sample_number'])

    def pre_processing(self):
        """
        Pre-processes the video data for further analysis.

        This method performs several preprocessing steps for the video tracking, including
        image segmentation, validating specimen numbers, cropping, background subtraction, and origin detection.
        It also handles errors related to image analysis and manual delineation.

        Returns
        -------
        bool
            Returns True if pre-processing completed successfully; False otherwise.
        """
        self.parent().po.load_data_to_run_cellects_quickly()
        if not self.parent().po.first_exp_ready_to_run:
            logging.info("Pre-processing has started")
            if len(self.parent().po.data_list) > 0:
                self.parent().po.get_first_image()
                self.parent().po.load_masks()
                self.parent().po.fast_first_image_segmentation()
                if len(self.parent().po.vars['analyzed_individuals']) != self.parent().po.first_image.shape_number:
                    self.status['message'] = f"Wrong specimen number: (re)do the complete analysis."
                    self.status['continue'] = False
                else:
                    self.parent().po.cropping(is_first_image=True)
                    self.parent().po.get_average_pixel_size()
                    status = self.parent().po.delineate_each_arena()
                    self.status['message'] = status['message']
                    self.status['continue'] = status['continue']

                    if self.status['continue']:
                        self.parent().po.save_exif()
                        self.parent().po.save_data_to_run_cellects_quickly()
                        self.parent().po.get_background_to_subtract()
                        if len(self.parent().po.vars['analyzed_individuals']) != len(self.parent().po.top):
                            self.status['message'] = f"Wrong specimen number: (re)do the complete analysis."
                            self.status['continue'] = False
                        elif self.parent().po.top is None and self.parent().videoanalysiswindow.video_task == 'one_arena' and self.parent().imageanalysiswindow.manual_delineation_flag:
                            self.status['message'] = f"Auto video delineation failed, use manual delineation tool"
                            self.status['continue'] = False
                        else:
                            self.parent().po.save_origins_and_backgrounds_lists()
                            self.parent().po.get_last_image()
                            self.parent().po.fast_last_image_segmentation()
                            self.parent().po.find_if_lighter_background()
                            logging.info("The current (or the first) folder is ready to run")
                            self.parent().po.first_exp_ready_to_run = True
            else:
                self.status['message'] = f"Wrong folder or parameters"
                self.status['continue'] = False

    def run_video_writing(self):
        """
        Initiate the process of writing videos from image data.

        Raises
        ------
        FileNotFoundError
            If an image file specified in `data_list` does not exist.
        OSError
            If there is an issue writing to disk, such as when the disk is full.

        Notes
        -----
        This function manages video writing in batches, checking available memory
        and handling errors related to file sizes or missing images
        """
        do_write_videos = video_writing_decision(len(self.parent().po.vars['analyzed_individuals']),
                                                 self.parent().po.all['im_or_vid'],
                                                 self.parent().po.all['overwrite_unaltered_videos'])
        if not do_write_videos:
            logging.info(f"{self.status['folder']}, Writing videos is not necessary")
        else:
            logging.info(f"Starting video writing")
            in_colors = not self.parent().po.vars['already_greyscale']
            self.parent().po.first_image.shape_number = self.parent().po.sample_number
            bunch_nb, video_nb_per_bunch, sizes, video_bunch, vid_names, rom_memory_required, analysis_status, remaining, use_list_of_vid, is_landscape = self.parent().po.prepare_video_writing(
                self.parent().po.data_list, self.parent().po.vars['min_ram_free'], in_colors)
            if self.status['continue']:
                # Check that there is enough available RAM for one video par bunch and ROM for all videos
                if video_nb_per_bunch > 0 and rom_memory_required is None:
                    pat_tracker1 = PercentAndTimeTracker(bunch_nb * self.parent().po.vars['img_number'])
                    image_percentage = 0
                    im_percent = 0
                    for bunch in np.arange(bunch_nb):
                        # Update the labels of arenas and the video_bunch to write
                        if bunch == (bunch_nb - 1) and remaining > 0:
                            arena = np.arange(bunch * video_nb_per_bunch, bunch * video_nb_per_bunch + remaining)
                        else:
                            arena = np.arange(bunch * video_nb_per_bunch, (bunch + 1) * video_nb_per_bunch)
                        if use_list_of_vid:
                            video_bunch = [np.zeros(sizes[i, :], dtype=np.uint8) for i in arena]
                        else:
                            video_bunch = np.zeros(np.append(sizes[0, :], len(arena)), dtype=np.uint8)
                        prev_img = None
                        images_done = bunch * self.parent().po.vars['img_number']
                        for image_i, image_name in enumerate(self.parent().po.data_list):
                            image_percentage, remaining_time = pat_tracker1.get_progress(image_i + images_done)
                            im_percent = np.round(image_percentage, 2)
                            self.message_from_thread.emit(f"{self.status['folder']}, Writing videos ({im_percent}%), bunch n°{bunch + 1}/{bunch_nb}")
                            if not os.path.exists(image_name):
                                raise FileNotFoundError(image_name)
                            img = read_and_rotate(image_name, prev_img, self.parent().po.all['raw_images'],
                                                  is_landscape, self.parent().po.first_image.crop_coord)
                            prev_img = img.copy()
                            if self.parent().po.vars['already_greyscale'] and self.parent().po.reduce_image_dim:
                                img = img[:, :, 0]

                            for arena_i, arena_name in enumerate(arena):
                                try:
                                    sub_img = img[self.parent().po.top[arena_name]: self.parent().po.bot[arena_name],
                                    self.parent().po.left[arena_name]: self.parent().po.right[arena_name], ...]
                                    if use_list_of_vid:
                                        video_bunch[arena_i][image_i, ...] = sub_img
                                    else:
                                        if len(video_bunch.shape) == 5:
                                            video_bunch[image_i, :, :, :, arena_i] = sub_img
                                        else:
                                            video_bunch[image_i, :, :, arena_i] = sub_img
                                except ValueError:
                                    analysis_status[
                                        "message"] = f"Some images have incorrect size, reset all settings in advanced parameters"
                                    self.status['continue'] = False
                                    logging.info(f"Reset all settings in advanced parameters")
                                if self.isInterruptionRequested():
                                    self.status['message'] = f"Was waiting for thread interruption"
                                    self.status['continue'] = False
                                if not self.status['continue']:
                                    return
                        if self.status['continue']:
                            pat_tracker2 = PercentAndTimeTracker(len(arena))
                            for arena_i, arena_name in enumerate(arena):
                                try:
                                    arena_percentage, eta = pat_tracker2.get_progress()
                                    self.message_from_thread.emit(f"{self.status['folder']}, Writing videos ({im_percent}%), Saving bunch n°{bunch + 1}/{bunch_nb} ({np.round(arena_percentage, 2)}%)")
                                    if use_list_of_vid:
                                        write_h5(vid_names[arena_name], video_bunch[arena_i], 'video')
                                    else:
                                        if len(video_bunch.shape) == 5:
                                            write_h5(vid_names[arena_name], video_bunch[:, :, :, :, arena_i], 'video')
                                        else:
                                            write_h5(vid_names[arena_name], video_bunch[:, :, :, arena_i], 'video')
                                except OSError:
                                    self.status['message'] = f"Full disk memory: clear space and retry"
                                    self.status['continue'] = False
                                if self.isInterruptionRequested():
                                    self.status['message'] = f"Was waiting for thread interruption"
                                    self.status['continue'] = False
                                if not self.status['continue']:
                                    return
                        del video_bunch
                        self.message_from_thread.emit(f"{self.status['folder']}, Writing videos: Bunch {bunch + 1} over {bunch_nb} saved")
                        logging.info(f"{self.status['folder']}, Writing videos: Bunch {bunch + 1} over {bunch_nb} saved.")
                    logging.info("When they exist, do not overwrite unaltered video")
                    self.parent().po.all['overwrite_unaltered_videos'] = False
                    self.parent().po.save_variable_dict()
                    self.parent().po.save_data_to_run_cellects_quickly()
                else:
                    self.status['continue'] = False
                    if video_nb_per_bunch == 0:
                        memory_diff = self.parent().po.update_available_core_nb()
                        ram_message = f"{memory_diff}GB of additional RAM"
                    if rom_memory_required is not None:
                        rom_message = f"at least {rom_memory_required}GB of free ROM"

                    if video_nb_per_bunch == 0 and rom_memory_required is not None:
                        self.status['message'] = f"Requires {ram_message} and {rom_message} to run"
                        # self.message_from_thread.emit(f"Analyzing {message} requires {ram_message} and {rom_message} to run")
                    elif video_nb_per_bunch == 0:
                        self.status['message'] = f"Requires {ram_message} to run"
                        # self.message_from_thread.emit(f"Analyzing {message} requires {ram_message} to run")
                    elif rom_memory_required is not None:
                        self.status['message'] = f"Requires {rom_message} to run"
                        # self.message_from_thread.emit(f"Analyzing {message} requires {rom_message} to run")
                    logging.info(f"Cellects is not writing videos: insufficient memory")

    def run_motion_analysis(self):
        """
        Run motion analysis on analyzed individuals with optional multiprocessing.

        This method processes video frames to analyze motion attributes of individuals.
        It can operate in either sequential or parallel mode based on available system
        resources and configuration settings. Analysis results are saved in multiple
        output formats.

        Raises
        ------
        MemoryError
            If there is insufficient memory to perform the analysis in parallel.

        Notes
        -----
        Sequential mode is used when multiprocessing is disabled or only one core
        is available. Parallel mode utilizes multiple CPU cores for faster processing.
        """
        logging.info(f"Starting motion analysis with the detection method n°{self.parent().po.all['video_option']}")
        self.parent().po.instantiate_tables()
        try:
            memory_diff = self.parent().po.update_available_core_nb()
            if self.parent().po.cores > 0:  # i.e. enough memory
                if not self.parent().po.all['do_multiprocessing'] or self.parent().po.cores == 1:
                    arena_nb = len(self.parent().po.vars['analyzed_individuals'])
                    self.status['message'] = f"Starting sequential analysis of {arena_nb} arena(s)"
                    self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message']}")
                    logging.info(f"{self.status['folder']}, {self.status['message'] }")
                    tiii = default_timer()
                    pat_tracker = PercentAndTimeTracker(arena_nb)
                    for i, arena in enumerate(self.parent().po.vars['analyzed_individuals']):
                        l = [i, arena, self.parent().po.vars, False, False, False, None]
                        self.parent().po.motion = MotionAnalysis(l)
                        if self.isInterruptionRequested():
                            return
                        self.parent().po.motion.assess_motion_detection()
                        if self.isInterruptionRequested():
                            return
                        if self.parent().po.motion.start is not None:
                            self.parent().po.motion.detection()
                            if self.isInterruptionRequested():
                                return
                            self.parent().po.motion.initialize_post_processing()
                            if self.isInterruptionRequested():
                                return
                            self.parent().po.motion.t = self.parent().po.motion.start
                            current_percentage, eta = pat_tracker.get_progress()
                            while not self.isInterruptionRequested() and self.parent().po.motion.t < self.parent().po.motion.dims[0]:  #200:
                                self.parent().po.motion.update_shape(False)
                                t = self.parent().po.motion.t - 1
                                contours = np.nonzero(get_contours(self.parent().po.motion.binary[t, ...]))
                                if self.parent().po.motion.visu is not None:
                                    im_to_display = self.parent().po.motion.visu[t, ...].copy()
                                    im_to_display[contours[0], contours[1], ...] = self.parent().po.vars['contour_color']
                                else:
                                    im_to_display = self.parent().po.motion.binary[t, :, :] * 255
                                self.image_from_thread.emit({"current_image": im_to_display,
                                                             "message": f"{self.status['folder']}, Analyzing arena n°{arena}/{arena_nb} ({current_percentage}%, {eta}), frame: {self.parent().po.motion.t}/{self.parent().po.motion.dims[0]}"})
                            if self.isInterruptionRequested():
                                return
                            do_continue = self.analyze_post_processing_results()
                            if do_continue:
                                self.message_from_thread.emit(self.status['folder'] + f", Analyzing arena n°{arena}/{arena_nb} ({current_percentage}%, {eta}), Saving results")
                                self.parent().po.motion.save_results()
                                if not self.parent().po.vars['several_blob_per_arena']:
                                    # Save basic statistics
                                    self.parent().po.update_one_row_per_arena(i, self.parent().po.motion.one_descriptor_per_arena)

                                    # Save descriptors in long_format
                                    self.parent().po.update_one_row_per_frame(i * self.parent().po.vars['img_number'],
                                                                              arena * self.parent().po.vars['img_number'],
                                                                              self.parent().po.motion.one_row_per_frame)

                                # Save efficiency visualization
                                self.parent().po.add_analysis_visualization_to_first_and_last_images(i,
                                                                                                     self.parent().po.motion.efficiency_test_1,
                                                                                                     self.parent().po.motion.efficiency_test_2)
                                # Emit message to the interface
                                if self.isInterruptionRequested():
                                    return
                                self.image_from_thread.emit({"current_image": self.parent().po.last_image.bgr,
                                                             "message": f"{self.status['folder']}, Analyzed {arena}/{len(self.parent().po.vars['analyzed_individuals'])} arenas ({current_percentage}%){eta}"})
                        self.parent().po.motion = None
                    duration = np.round((default_timer() - tiii) / 60, 2)
                    self.status['message'] = f"Sequential analysis lasted {duration} minutes"
                    logging.info(f"{self.status['folder']}, {self.status['message'] }")
                else:
                    self.status['message'] = f"Analyse all videos using {self.parent().po.cores} cores..."
                    self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message'] }")
                    logging.info(f"{self.status['folder']}, {self.status['message'] }")

                    tiii = default_timer()
                    arena_number = len(self.parent().po.vars['analyzed_individuals'])
                    self.advance = 0
                    self.pat_tracker = PercentAndTimeTracker(len(self.parent().po.vars['analyzed_individuals']),
                                                             core_number=self.parent().po.cores)

                    fair_core_workload = arena_number // self.parent().po.cores
                    cores_with_1_more = arena_number % self.parent().po.cores
                    EXTENTS_OF_SUBRANGES = []
                    bound: int = 0
                    parallel_organization = [fair_core_workload + 1 for _ in range(int(cores_with_1_more))] + [
                        fair_core_workload for _ in range(int(self.parent().po.cores - cores_with_1_more))]
                    # Emit message to the interface
                    self.image_from_thread.emit({"current_image": self.parent().po.last_image.bgr,
                                                 "message": f"{self.status['folder']}, Analysis running on {self.parent().po.cores} CPU cores"})
                    for i, extent_size in enumerate(parallel_organization):
                        EXTENTS_OF_SUBRANGES.append((bound, bound := bound + extent_size))

                    try:
                        PROCESSES = []
                        subtotals = Manager().Queue()  # Queue()
                        started_processes: int = 0
                        for extent in EXTENTS_OF_SUBRANGES:
                            if self.isInterruptionRequested():
                                break
                            p = Process(target=motion_analysis_process,
                                        args=(int(extent[0]), int(extent[1]), self.parent().po.vars, subtotals))
                            p.start()
                            PROCESSES.append(p)
                            started_processes += 1

                        finished_processes: int = 0
                        while finished_processes < started_processes:
                            if self.isInterruptionRequested():
                                for p in PROCESSES:
                                    if p.is_alive():
                                        p.terminate()
                                for p in PROCESSES:
                                    p.join(timeout=1)
                                self.status['message'] = f"Was waiting for thread interruption"
                                self.status['continue'] = False
                                return

                            for p in PROCESSES:
                                if hasattr(p, "_counted"):
                                    continue
                                if not p.is_alive():
                                    p.join(timeout=0)
                                    p._counted = True
                                    finished_processes += 1
                            self.msleep(50)
                        self.message_from_thread.emit(f"{self.status['folder']},  Saving all results")
                        for _ in range(finished_processes):
                            grouped_results = subtotals.get()
                            for j, results_i in enumerate(grouped_results):
                                if not self.parent().po.vars['several_blob_per_arena']:
                                    # Save basic statistics
                                    self.parent().po.update_one_row_per_arena(results_i['i'],
                                                                              results_i['one_row_per_arena'])
                                    # Save descriptors in long_format
                                    self.parent().po.update_one_row_per_frame(
                                        results_i['i'] * self.parent().po.vars['img_number'],
                                        (results_i['i'] + 1) * self.parent().po.vars['img_number'],
                                        results_i['one_row_per_frame'])

                                self.parent().po.add_analysis_visualization_to_first_and_last_images(results_i['i'],
                                                                                                     results_i[
                                                                                                         'efficiency_test_1'],
                                                                                                     results_i[
                                                                                                         'efficiency_test_2'])
                            del grouped_results
                        del subtotals
                        duration = np.round((default_timer() - tiii) / 60, 2)
                        self.image_from_thread.emit(
                            {"current_image": self.parent().po.last_image.bgr,
                             "message": f"{self.status['folder']}, Analyzed {len(self.parent().po.vars['analyzed_individuals'])}/{len(self.parent().po.vars['analyzed_individuals'])} arenas (100%), Parallel analysis took {duration} minutes"})
                        logging.info(f"Parallel analysis lasted {duration} minutes")
                    except MemoryError:
                        self.status['continue'] = False
                        self.status['message'] = f"Not enough memory, reduce the core number for parallel analysis"
                        return
                self.parent().po.save_tables()
            else:
                self.status['continue'] = False
                self.status['message'] = f"Requires an additional {memory_diff}GB of RAM to run"
        except MemoryError:
            self.status['continue'] = False
            self.status['message'] = f"Requires additional memory to run"

    def load_one_arena(self):
        """
        Load a single arena from images or video to perform motion analysis.
        """
        if self.parent().po.first_im is None:
            self.pre_processing()
        if self.status['continue']:
            arena = self.parent().po.all['arena']
            i = np.nonzero(np.array(self.parent().po.vars['analyzed_individuals']) == arena)[0][0]
            true_frame_width = self.parent().po.right[i] - self.parent().po.left[i]
            if self.parent().po.all['overwrite_unaltered_videos'] and os.path.isfile(f'ind_{arena}.h5'):
                remove_h5_key(f'ind_{arena}.h5', 'video')
            background = None
            background2 = None
            if self.parent().po.vars['subtract_background']:
                background = read_h5(f'ind_{arena}.h5', 'background')
                if self.parent().po.vars['convert_for_motion']['logical'] != 'None':
                    background2 = read_h5(f'ind_{arena}.h5', 'background2')
            vid_name = None
            if self.parent().po.vars['video_list'] is not None:
                vid_name = self.parent().po.vars['video_list'][i]
            visu, converted_video, converted_video2 = read_one_arena(self.parent().po.all['arena'],
                self.parent().po.vars['already_greyscale'], self.parent().po.vars['convert_for_motion'],
                None, true_frame_width, vid_name, background, background2)

            save_loaded_video: bool = False
            if visu is None or (self.parent().po.vars['already_greyscale'] and converted_video is None):
                logging.info(f"{self.status['folder']}, Starting to load arena n°{arena} from images")
                cr = [self.parent().po.top[i], self.parent().po.bot[i],
                      self.parent().po.left[i], self.parent().po.right[i]]
                vids = create_empty_videos(self.parent().po.data_list, cr,
                    self.parent().po.vars['lose_accuracy_to_save_memory'], self.parent().po.vars['already_greyscale'],
                    self.parent().po.vars['convert_for_motion'])
                self.parent().po.visu, self.parent().po.converted_video, self.parent().po.converted_video2 = vids

                prev_img = None
                pat_tracker = PercentAndTimeTracker(self.parent().po.vars['img_number'])
                is_landscape = self.parent().po.first_image.image.shape[0] < self.parent().po.first_image.image.shape[1]
                for image_i, image_name in enumerate(self.parent().po.data_list):
                    current_percentage, eta = pat_tracker.get_progress()
                    reduce_image_dim = self.parent().po.vars['already_greyscale'] and self.parent().po.reduce_image_dim
                    img, prev_img = read_rotate_crop_and_reduce_image(image_name, prev_img,
                        self.parent().po.first_image.crop_coord, cr, self.parent().po.all['raw_images'], is_landscape,
                        reduce_image_dim)
                    self.image_from_thread.emit(
                        {"message": f"{self.status['folder']}, Loading arena n°{arena} ({current_percentage}%{eta})", "current_image": img})
                    if self.parent().po.vars['already_greyscale']:
                        self.parent().po.converted_video[image_i, ...] = img
                    else:
                        self.parent().po.visu[image_i, ...] = img
                    if self.isInterruptionRequested():
                        self.status['message'] = f"Was waiting for thread interruption"
                        self.status['continue'] = False
                        return
                if not self.parent().po.vars['already_greyscale']:
                    msg = "Starting: video conversion"
                    if background is not None :
                        msg += ", background subtraction"
                    if self.parent().po.vars['filter_spec'] is not None:
                        msg += ", filtering"
                    msg += "..."
                    self.image_from_thread.emit({"message": f"{self.status['folder']}, {msg}", "current_image": img})
                    converted_videos = convert_subtract_and_filter_video(self.parent().po.visu,
                                                                            self.parent().po.vars['convert_for_motion'],
                                                                            background, background2,
                                                                            self.parent().po.vars['lose_accuracy_to_save_memory'],
                                                                            self.parent().po.vars['filter_spec'])
                    self.parent().po.converted_video, self.parent().po.converted_video2 = converted_videos

                save_loaded_video = True
                if self.parent().po.vars['already_greyscale']:
                    self.videos_in_ram = self.parent().po.converted_video
                else:
                    if self.parent().po.vars['convert_for_motion']['logical'] == 'None':
                        self.videos_in_ram = [self.parent().po.visu, self.parent().po.converted_video.copy()]
                    else:
                        self.videos_in_ram = [self.parent().po.visu, self.parent().po.converted_video.copy(),
                                              self.parent().po.converted_video2.copy()]
            else:
                logging.info(f"{self.status['folder']}, Starting to load arena n°{arena} from .h5 saved file")
                self.videos_in_ram = None
            l = [i, arena, self.parent().po.vars, False, False, False, self.videos_in_ram]
            self.parent().po.motion = MotionAnalysis(l)

            if self.videos_in_ram is None:
                self.parent().po.converted_video = self.parent().po.motion.converted_video.copy()
                if self.parent().po.vars['convert_for_motion']['logical'] != 'None':
                    self.parent().po.converted_video2 = self.parent().po.motion.converted_video2.copy()
            if self.isInterruptionRequested():
                self.status['message'] = f"Was waiting for thread interruption"
                self.status['continue'] = False
                return
            self.parent().po.motion.assess_motion_detection()
            self.when_loading_finished.emit(save_loaded_video)

            if self.parent().po.motion.visu is None:
                visu = bracket_to_uint8_image_contrast(self.parent().po.motion.converted_video)
                if len(visu.shape) == 3:
                    visu = np.stack((visu, visu, visu), axis=3)
                self.parent().po.motion.visu = visu

    def one_detection(self):
        """
        Perform quick video segmentation and update motion detection parameters.

        This method is responsible for initiating a quick video segmentation process and updating the motion detection
        parameters accordingly. It handles duplicate video conversion based on certain logical conditions and computes
        video options.
        """
        if self.parent().po.motion is None:
            self.load_one_arena()
        if self.status['continue']:
            self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Video segmentation")
            self.parent().po.motion.converted_video = self.parent().po.converted_video.copy()
            if self.parent().po.vars['convert_for_motion']['logical'] != 'None':
                self.parent().po.motion.converted_video2 = self.parent().po.converted_video2.copy()
            self.parent().po.motion.detection(compute_all_possibilities=self.parent().po.all['compute_all_options'])
            if self.parent().po.all['compute_all_options']:
                self.parent().po.computed_video_options = np.ones(5, bool)
            else:
                self.parent().po.computed_video_options = np.zeros(5, bool)
                self.parent().po.computed_video_options[self.parent().po.all['video_option']] = True

    def post_processing(self):
        """
        Handle post-processing operations for motion analysis and video processing.

        Extended Description
        --------------------
        This method is responsible for managing various post-processing steps,
        including video segmentation, contour detection, and updating motion analysis
        parameters. It processes different video options based on the configuration
        settings and handles motion detection failures by emitting appropriate signals.

        Notes
        -----
        This method performs a series of operations that are computationally intensive.
        It leverages NumPy and OpenCV for image processing tasks. The method assumes
        that the parent object has been properly initialized with all required attributes
        and configurations.

        Attributes
        ----------
        self.parent().po.vars['already_greyscale'] : bool
            Indicates if the video is already in greyscale format.
        self.parent().po.vars['convert_for_motion']['logical'] : str
            Indicates the logical conversion method for motion analysis.
        self.parent().po.converted_video : ndarray
            The converted video data for motion analysis.
        self.parent().po.converted_video2 : ndarray
            Another converted video data for motion analysis.
        self.parent().po.visu : ndarray
            The visual representation of the video data.
        self.videos_in_ram : list or tuple
            The videos currently in RAM, either a single video or multiple.
        self.parent().po.vars['color_number'] : int
            The number of colors in the video.
        self.parent().po.all['compute_all_options'] : bool
            Indicates if all options should be computed.
        self.parent().po.all['video_option'] : int
            The current video option to be processed.
        self.parent().po.motion.start : int
            The start frame for motion analysis.
        self.parent().po.motion.step : int
            The step interval in frames for motion analysis.
        self.parent().po.motion.lost_frames : int
            The number of lost frames during motion analysis.
        self.parent().po.motion.substantial_growth : int
            The substantial growth threshold for motion detection.
        self.parent().po.all['arena'] : int
            The arena identifier used in motion analysis.
        self.parent().po.vars['specimen_activity'] : str
            Indicates if fading effects should be applied.
        self.parent().po.motion.dims : tuple
            The dimensions of the motion data.
        analyses_to_compute : list or ndarray
            List of analysis options to compute based on configuration settings.
        args : list
            Arguments used for initializing the MotionAnalysis object.
        analysis_i : MotionAnalysis
            An instance of MotionAnalysis for each segment to be processed.
        mask : tuple or NoneType
            The mask used for different segmentation options.

        """
        if self.parent().po.vars['color_number'] > 2:
            analyses_to_compute = [0]
        else:
            if self.parent().po.all['compute_all_options']:
                logging.info(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Computing all options")
                analyses_to_compute = np.arange(5)
            else:
                logging.info(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Computing option n°{self.parent().po.all['video_option']}")
                analyses_to_compute = [self.parent().po.all['video_option']]
        time_parameters = [self.parent().po.motion.start, self.parent().po.motion.step,
                           self.parent().po.motion.lost_frames, self.parent().po.motion.substantial_growth]

        args = [self.parent().po.all['arena'] - 1, self.parent().po.all['arena'], self.parent().po.vars,
                False, False, False, self.videos_in_ram]
        for seg_i in analyses_to_compute:
            analysis_i = MotionAnalysis(args)
            analysis_i.segmented = np.zeros(analysis_i.converted_video.shape[:3], dtype=np.uint8)
            if self.parent().po.all['compute_all_options']:
                if seg_i == 0:
                    analysis_i.segmented = self.parent().po.motion.segmented
                else:
                    if seg_i == 1:
                        mask = self.parent().po.motion.luminosity_segmentation
                    elif seg_i == 2:
                        mask = self.parent().po.motion.gradient_segmentation
                    elif seg_i == 3:
                        mask = self.parent().po.motion.logical_and
                    elif seg_i == 4:
                        mask = self.parent().po.motion.logical_or
                    analysis_i.segmented[mask[0], mask[1], mask[2]] = 1
            else:
                if self.parent().po.computed_video_options[self.parent().po.all['video_option']]:
                    if self.parent().po.motion is None:
                        self.load_one_arena()
                    if self.status['continue']:
                        if self.parent().po.motion.segmented is None:
                            self.one_detection()
                        if self.status['continue']:
                            analysis_i.segmented = self.parent().po.motion.segmented
            if self.status['continue']:
                analysis_i.start = time_parameters[0]
                analysis_i.step = time_parameters[1]
                analysis_i.lost_frames = time_parameters[2]
                analysis_i.substantial_growth = time_parameters[3]
                analysis_i.origin_idx = self.parent().po.motion.origin_idx
                analysis_i.initialize_post_processing()
                analysis_i.t = analysis_i.start

                while not self.isInterruptionRequested() and analysis_i.t < analysis_i.binary.shape[0]:
                    analysis_i.update_shape(False)
                    contours = np.nonzero(get_contours(analysis_i.binary[analysis_i.t - 1, :, :]))
                    current_image = self.parent().po.motion.visu[analysis_i.t - 1, :, :, :].copy()
                    current_image[contours[0], contours[1], :] = self.parent().po.vars['contour_color']
                    self.image_from_thread.emit(
                        {"message": f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}, Option n°{seg_i + 1}, Frame n°{analysis_i.t}/{analysis_i.binary.shape[0]}",
                         "current_image": current_image})
                if self.isInterruptionRequested():
                    self.status['message'] = f"Was waiting for thread interruption"
                    self.status['continue'] = False
                    break
                if analysis_i.start is None:
                    analysis_i.binary = np.repeat(np.expand_dims(analysis_i.origin, 0),
                                               analysis_i.converted_video.shape[0], axis=0)
                    if self.parent().po.vars['color_number'] > 2:
                        self.message_from_thread.emit(
                            f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}, Failed to detect motion. Redo image analysis (with only 2 colors?)")
                    else:
                        self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}, Option n°{seg_i + 1} failed to detect motion")

                if self.parent().po.all['compute_all_options']:
                    if seg_i == 0:
                        self.parent().po.motion.segmented = analysis_i.binary
                    elif seg_i == 1:
                        self.parent().po.motion.luminosity_segmentation = np.nonzero(analysis_i.binary)
                    elif seg_i == 2:
                        self.parent().po.motion.gradient_segmentation = np.nonzero(analysis_i.binary)
                    elif seg_i == 3:
                        self.parent().po.motion.logical_and = np.nonzero(analysis_i.binary)
                    elif seg_i == 4:
                        self.parent().po.motion.logical_or = np.nonzero(analysis_i.binary)
                else:
                    self.parent().po.motion.segmented = analysis_i.binary
        self.when_detection_finished.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}, Post processing done: read, save or run all arenas")

    def change_one_arena_result(self):
        """
        Modify the motion and results of an arena.

        Extended Description
        --------------------
        This method performs various operations on the motion data of an arena,
        including binary mask creation, descriptor computation, and transition
        detection. It also handles optional computations like fading effects and
        segmentation based on different video options.
        """
        self.message_from_thread.emit(f"{self.status['folder']}, Starting modifying Arena n°{self.parent().po.all['arena']} results")
        if self.parent().po.motion.start is None:
            self.parent().po.motion.binary = np.repeat(np.expand_dims(self.parent().po.motion.origin, 0),
                                                     self.parent().po.motion.converted_video.shape[0], axis=0).astype(np.uint8)
        else:
            if self.parent().po.all['compute_all_options']:
                if self.parent().po.all['video_option'] == 0:
                    self.parent().po.motion.binary = self.parent().po.motion.segmented
                else:
                    if self.parent().po.all['video_option'] == 1:
                        mask = self.parent().po.motion.luminosity_segmentation
                    elif self.parent().po.all['video_option'] == 2:
                        mask = self.parent().po.motion.gradient_segmentation
                    elif self.parent().po.all['video_option'] == 3:
                        mask = self.parent().po.motion.logical_and
                    elif self.parent().po.all['video_option'] == 4:
                        mask = self.parent().po.motion.logical_or
                    self.parent().po.motion.binary = np.zeros(self.parent().po.motion.dims, dtype=np.uint8)
                    self.parent().po.motion.binary[mask[0], mask[1], mask[2]] = 1
            else:
                self.parent().po.motion.binary = np.zeros(self.parent().po.motion.dims[:3], dtype=np.uint8)
                if self.parent().po.computed_video_options[self.parent().po.all['video_option']]:
                    self.parent().po.motion.binary = self.parent().po.motion.segmented
        do_continue = self.analyze_post_processing_results()
        if not do_continue:
            self.status['message'] = "Was waiting for thread interruption"
            return
        self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Saving results")
        self.parent().po.motion.change_results_of_one_arena()
        self.parent().po.motion = None
        self.status['message'] = f"Arena n°{self.parent().po.all['arena']}: analysis finished"
        self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message']}")

    def analyze_post_processing_results(self) -> bool:
        """
        Analyze post-processing results for motion data by performing a sequence of operations if no interruption is requested.

        Extended Description
        --------------------
        This method performs several post-processing steps on motion data, including descriptor extraction,
        growth transition detection, network analysis, Oscillation study, fractal descriptions, and updating results.
        Each step is conditional on the absence of an interruption request.
        """
        if self.isInterruptionRequested():
            return False
        self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Computing descriptors")
        self.parent().po.motion.get_descriptors_from_binary(release_memory=False)
        if self.isInterruptionRequested():
            return False
        self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Detecting growth transitions")
        self.parent().po.motion.detect_growth_transitions()
        if self.isInterruptionRequested():
            return False
        self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Detecting network and graph")
        self.parent().po.motion.networks_analysis(False)
        if self.isInterruptionRequested():
            return False
        self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Detecting oscillatory patterns")
        self.parent().po.motion.study_cytoscillations(False)
        if self.isInterruptionRequested():
            return False
        self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Computing fractal dimension")
        self.parent().po.motion.fractal_descriptions()
        if self.isInterruptionRequested():
            return False
        else:
            return True

__init__(parent=None)

Initialize the worker thread

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(VideoTrackingThread, self).__init__(parent)
    self.setParent(parent)
    self.status = {"continue": True, "folder": "", "message": ""}

analyze_post_processing_results()

Analyze post-processing results for motion data by performing a sequence of operations if no interruption is requested.

Extended Description

This method performs several post-processing steps on motion data, including descriptor extraction, growth transition detection, network analysis, Oscillation study, fractal descriptions, and updating results. Each step is conditional on the absence of an interruption request.

Source code in src/cellects/core/cellects_threads.py
def analyze_post_processing_results(self) -> bool:
    """
    Analyze post-processing results for motion data by performing a sequence of operations if no interruption is requested.

    Extended Description
    --------------------
    This method performs several post-processing steps on motion data, including descriptor extraction,
    growth transition detection, network analysis, Oscillation study, fractal descriptions, and updating results.
    Each step is conditional on the absence of an interruption request.
    """
    if self.isInterruptionRequested():
        return False
    self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Computing descriptors")
    self.parent().po.motion.get_descriptors_from_binary(release_memory=False)
    if self.isInterruptionRequested():
        return False
    self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Detecting growth transitions")
    self.parent().po.motion.detect_growth_transitions()
    if self.isInterruptionRequested():
        return False
    self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Detecting network and graph")
    self.parent().po.motion.networks_analysis(False)
    if self.isInterruptionRequested():
        return False
    self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Detecting oscillatory patterns")
    self.parent().po.motion.study_cytoscillations(False)
    if self.isInterruptionRequested():
        return False
    self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Computing fractal dimension")
    self.parent().po.motion.fractal_descriptions()
    if self.isInterruptionRequested():
        return False
    else:
        return True

change_one_arena_result()

Modify the motion and results of an arena.

Extended Description

This method performs various operations on the motion data of an arena, including binary mask creation, descriptor computation, and transition detection. It also handles optional computations like fading effects and segmentation based on different video options.

Source code in src/cellects/core/cellects_threads.py
def change_one_arena_result(self):
    """
    Modify the motion and results of an arena.

    Extended Description
    --------------------
    This method performs various operations on the motion data of an arena,
    including binary mask creation, descriptor computation, and transition
    detection. It also handles optional computations like fading effects and
    segmentation based on different video options.
    """
    self.message_from_thread.emit(f"{self.status['folder']}, Starting modifying Arena n°{self.parent().po.all['arena']} results")
    if self.parent().po.motion.start is None:
        self.parent().po.motion.binary = np.repeat(np.expand_dims(self.parent().po.motion.origin, 0),
                                                 self.parent().po.motion.converted_video.shape[0], axis=0).astype(np.uint8)
    else:
        if self.parent().po.all['compute_all_options']:
            if self.parent().po.all['video_option'] == 0:
                self.parent().po.motion.binary = self.parent().po.motion.segmented
            else:
                if self.parent().po.all['video_option'] == 1:
                    mask = self.parent().po.motion.luminosity_segmentation
                elif self.parent().po.all['video_option'] == 2:
                    mask = self.parent().po.motion.gradient_segmentation
                elif self.parent().po.all['video_option'] == 3:
                    mask = self.parent().po.motion.logical_and
                elif self.parent().po.all['video_option'] == 4:
                    mask = self.parent().po.motion.logical_or
                self.parent().po.motion.binary = np.zeros(self.parent().po.motion.dims, dtype=np.uint8)
                self.parent().po.motion.binary[mask[0], mask[1], mask[2]] = 1
        else:
            self.parent().po.motion.binary = np.zeros(self.parent().po.motion.dims[:3], dtype=np.uint8)
            if self.parent().po.computed_video_options[self.parent().po.all['video_option']]:
                self.parent().po.motion.binary = self.parent().po.motion.segmented
    do_continue = self.analyze_post_processing_results()
    if not do_continue:
        self.status['message'] = "Was waiting for thread interruption"
        return
    self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Saving results")
    self.parent().po.motion.change_results_of_one_arena()
    self.parent().po.motion = None
    self.status['message'] = f"Arena n°{self.parent().po.all['arena']}: analysis finished"
    self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message']}")

load_one_arena()

Load a single arena from images or video to perform motion analysis.

Source code in src/cellects/core/cellects_threads.py
def load_one_arena(self):
    """
    Load a single arena from images or video to perform motion analysis.
    """
    if self.parent().po.first_im is None:
        self.pre_processing()
    if self.status['continue']:
        arena = self.parent().po.all['arena']
        i = np.nonzero(np.array(self.parent().po.vars['analyzed_individuals']) == arena)[0][0]
        true_frame_width = self.parent().po.right[i] - self.parent().po.left[i]
        if self.parent().po.all['overwrite_unaltered_videos'] and os.path.isfile(f'ind_{arena}.h5'):
            remove_h5_key(f'ind_{arena}.h5', 'video')
        background = None
        background2 = None
        if self.parent().po.vars['subtract_background']:
            background = read_h5(f'ind_{arena}.h5', 'background')
            if self.parent().po.vars['convert_for_motion']['logical'] != 'None':
                background2 = read_h5(f'ind_{arena}.h5', 'background2')
        vid_name = None
        if self.parent().po.vars['video_list'] is not None:
            vid_name = self.parent().po.vars['video_list'][i]
        visu, converted_video, converted_video2 = read_one_arena(self.parent().po.all['arena'],
            self.parent().po.vars['already_greyscale'], self.parent().po.vars['convert_for_motion'],
            None, true_frame_width, vid_name, background, background2)

        save_loaded_video: bool = False
        if visu is None or (self.parent().po.vars['already_greyscale'] and converted_video is None):
            logging.info(f"{self.status['folder']}, Starting to load arena n°{arena} from images")
            cr = [self.parent().po.top[i], self.parent().po.bot[i],
                  self.parent().po.left[i], self.parent().po.right[i]]
            vids = create_empty_videos(self.parent().po.data_list, cr,
                self.parent().po.vars['lose_accuracy_to_save_memory'], self.parent().po.vars['already_greyscale'],
                self.parent().po.vars['convert_for_motion'])
            self.parent().po.visu, self.parent().po.converted_video, self.parent().po.converted_video2 = vids

            prev_img = None
            pat_tracker = PercentAndTimeTracker(self.parent().po.vars['img_number'])
            is_landscape = self.parent().po.first_image.image.shape[0] < self.parent().po.first_image.image.shape[1]
            for image_i, image_name in enumerate(self.parent().po.data_list):
                current_percentage, eta = pat_tracker.get_progress()
                reduce_image_dim = self.parent().po.vars['already_greyscale'] and self.parent().po.reduce_image_dim
                img, prev_img = read_rotate_crop_and_reduce_image(image_name, prev_img,
                    self.parent().po.first_image.crop_coord, cr, self.parent().po.all['raw_images'], is_landscape,
                    reduce_image_dim)
                self.image_from_thread.emit(
                    {"message": f"{self.status['folder']}, Loading arena n°{arena} ({current_percentage}%{eta})", "current_image": img})
                if self.parent().po.vars['already_greyscale']:
                    self.parent().po.converted_video[image_i, ...] = img
                else:
                    self.parent().po.visu[image_i, ...] = img
                if self.isInterruptionRequested():
                    self.status['message'] = f"Was waiting for thread interruption"
                    self.status['continue'] = False
                    return
            if not self.parent().po.vars['already_greyscale']:
                msg = "Starting: video conversion"
                if background is not None :
                    msg += ", background subtraction"
                if self.parent().po.vars['filter_spec'] is not None:
                    msg += ", filtering"
                msg += "..."
                self.image_from_thread.emit({"message": f"{self.status['folder']}, {msg}", "current_image": img})
                converted_videos = convert_subtract_and_filter_video(self.parent().po.visu,
                                                                        self.parent().po.vars['convert_for_motion'],
                                                                        background, background2,
                                                                        self.parent().po.vars['lose_accuracy_to_save_memory'],
                                                                        self.parent().po.vars['filter_spec'])
                self.parent().po.converted_video, self.parent().po.converted_video2 = converted_videos

            save_loaded_video = True
            if self.parent().po.vars['already_greyscale']:
                self.videos_in_ram = self.parent().po.converted_video
            else:
                if self.parent().po.vars['convert_for_motion']['logical'] == 'None':
                    self.videos_in_ram = [self.parent().po.visu, self.parent().po.converted_video.copy()]
                else:
                    self.videos_in_ram = [self.parent().po.visu, self.parent().po.converted_video.copy(),
                                          self.parent().po.converted_video2.copy()]
        else:
            logging.info(f"{self.status['folder']}, Starting to load arena n°{arena} from .h5 saved file")
            self.videos_in_ram = None
        l = [i, arena, self.parent().po.vars, False, False, False, self.videos_in_ram]
        self.parent().po.motion = MotionAnalysis(l)

        if self.videos_in_ram is None:
            self.parent().po.converted_video = self.parent().po.motion.converted_video.copy()
            if self.parent().po.vars['convert_for_motion']['logical'] != 'None':
                self.parent().po.converted_video2 = self.parent().po.motion.converted_video2.copy()
        if self.isInterruptionRequested():
            self.status['message'] = f"Was waiting for thread interruption"
            self.status['continue'] = False
            return
        self.parent().po.motion.assess_motion_detection()
        self.when_loading_finished.emit(save_loaded_video)

        if self.parent().po.motion.visu is None:
            visu = bracket_to_uint8_image_contrast(self.parent().po.motion.converted_video)
            if len(visu.shape) == 3:
                visu = np.stack((visu, visu, visu), axis=3)
            self.parent().po.motion.visu = visu

one_detection()

Perform quick video segmentation and update motion detection parameters.

This method is responsible for initiating a quick video segmentation process and updating the motion detection parameters accordingly. It handles duplicate video conversion based on certain logical conditions and computes video options.

Source code in src/cellects/core/cellects_threads.py
def one_detection(self):
    """
    Perform quick video segmentation and update motion detection parameters.

    This method is responsible for initiating a quick video segmentation process and updating the motion detection
    parameters accordingly. It handles duplicate video conversion based on certain logical conditions and computes
    video options.
    """
    if self.parent().po.motion is None:
        self.load_one_arena()
    if self.status['continue']:
        self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Video segmentation")
        self.parent().po.motion.converted_video = self.parent().po.converted_video.copy()
        if self.parent().po.vars['convert_for_motion']['logical'] != 'None':
            self.parent().po.motion.converted_video2 = self.parent().po.converted_video2.copy()
        self.parent().po.motion.detection(compute_all_possibilities=self.parent().po.all['compute_all_options'])
        if self.parent().po.all['compute_all_options']:
            self.parent().po.computed_video_options = np.ones(5, bool)
        else:
            self.parent().po.computed_video_options = np.zeros(5, bool)
            self.parent().po.computed_video_options[self.parent().po.all['video_option']] = True

post_processing()

Handle post-processing operations for motion analysis and video processing.

Extended Description

This method is responsible for managing various post-processing steps, including video segmentation, contour detection, and updating motion analysis parameters. It processes different video options based on the configuration settings and handles motion detection failures by emitting appropriate signals.

Notes

This method performs a series of operations that are computationally intensive. It leverages NumPy and OpenCV for image processing tasks. The method assumes that the parent object has been properly initialized with all required attributes and configurations.

Attributes:

Name Type Description
self.parent().po.vars['already_greyscale'] bool

Indicates if the video is already in greyscale format.

self.parent().po.vars['convert_for_motion']['logical'] str

Indicates the logical conversion method for motion analysis.

self.parent().po.converted_video ndarray

The converted video data for motion analysis.

self.parent().po.converted_video2 ndarray

Another converted video data for motion analysis.

self.parent().po.visu ndarray

The visual representation of the video data.

self.videos_in_ram list or tuple

The videos currently in RAM, either a single video or multiple.

self.parent().po.vars['color_number'] int

The number of colors in the video.

self.parent().po.all['compute_all_options'] bool

Indicates if all options should be computed.

self.parent().po.all['video_option'] int

The current video option to be processed.

self.parent().po.motion.start int

The start frame for motion analysis.

self.parent().po.motion.step int

The step interval in frames for motion analysis.

self.parent().po.motion.lost_frames int

The number of lost frames during motion analysis.

self.parent().po.motion.substantial_growth int

The substantial growth threshold for motion detection.

self.parent().po.all['arena'] int

The arena identifier used in motion analysis.

self.parent().po.vars['specimen_activity'] str

Indicates if fading effects should be applied.

self.parent().po.motion.dims tuple

The dimensions of the motion data.

analyses_to_compute list or ndarray

List of analysis options to compute based on configuration settings.

args list

Arguments used for initializing the MotionAnalysis object.

analysis_i MotionAnalysis

An instance of MotionAnalysis for each segment to be processed.

mask tuple or NoneType

The mask used for different segmentation options.

Source code in src/cellects/core/cellects_threads.py
def post_processing(self):
    """
    Handle post-processing operations for motion analysis and video processing.

    Extended Description
    --------------------
    This method is responsible for managing various post-processing steps,
    including video segmentation, contour detection, and updating motion analysis
    parameters. It processes different video options based on the configuration
    settings and handles motion detection failures by emitting appropriate signals.

    Notes
    -----
    This method performs a series of operations that are computationally intensive.
    It leverages NumPy and OpenCV for image processing tasks. The method assumes
    that the parent object has been properly initialized with all required attributes
    and configurations.

    Attributes
    ----------
    self.parent().po.vars['already_greyscale'] : bool
        Indicates if the video is already in greyscale format.
    self.parent().po.vars['convert_for_motion']['logical'] : str
        Indicates the logical conversion method for motion analysis.
    self.parent().po.converted_video : ndarray
        The converted video data for motion analysis.
    self.parent().po.converted_video2 : ndarray
        Another converted video data for motion analysis.
    self.parent().po.visu : ndarray
        The visual representation of the video data.
    self.videos_in_ram : list or tuple
        The videos currently in RAM, either a single video or multiple.
    self.parent().po.vars['color_number'] : int
        The number of colors in the video.
    self.parent().po.all['compute_all_options'] : bool
        Indicates if all options should be computed.
    self.parent().po.all['video_option'] : int
        The current video option to be processed.
    self.parent().po.motion.start : int
        The start frame for motion analysis.
    self.parent().po.motion.step : int
        The step interval in frames for motion analysis.
    self.parent().po.motion.lost_frames : int
        The number of lost frames during motion analysis.
    self.parent().po.motion.substantial_growth : int
        The substantial growth threshold for motion detection.
    self.parent().po.all['arena'] : int
        The arena identifier used in motion analysis.
    self.parent().po.vars['specimen_activity'] : str
        Indicates if fading effects should be applied.
    self.parent().po.motion.dims : tuple
        The dimensions of the motion data.
    analyses_to_compute : list or ndarray
        List of analysis options to compute based on configuration settings.
    args : list
        Arguments used for initializing the MotionAnalysis object.
    analysis_i : MotionAnalysis
        An instance of MotionAnalysis for each segment to be processed.
    mask : tuple or NoneType
        The mask used for different segmentation options.

    """
    if self.parent().po.vars['color_number'] > 2:
        analyses_to_compute = [0]
    else:
        if self.parent().po.all['compute_all_options']:
            logging.info(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Computing all options")
            analyses_to_compute = np.arange(5)
        else:
            logging.info(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}: Computing option n°{self.parent().po.all['video_option']}")
            analyses_to_compute = [self.parent().po.all['video_option']]
    time_parameters = [self.parent().po.motion.start, self.parent().po.motion.step,
                       self.parent().po.motion.lost_frames, self.parent().po.motion.substantial_growth]

    args = [self.parent().po.all['arena'] - 1, self.parent().po.all['arena'], self.parent().po.vars,
            False, False, False, self.videos_in_ram]
    for seg_i in analyses_to_compute:
        analysis_i = MotionAnalysis(args)
        analysis_i.segmented = np.zeros(analysis_i.converted_video.shape[:3], dtype=np.uint8)
        if self.parent().po.all['compute_all_options']:
            if seg_i == 0:
                analysis_i.segmented = self.parent().po.motion.segmented
            else:
                if seg_i == 1:
                    mask = self.parent().po.motion.luminosity_segmentation
                elif seg_i == 2:
                    mask = self.parent().po.motion.gradient_segmentation
                elif seg_i == 3:
                    mask = self.parent().po.motion.logical_and
                elif seg_i == 4:
                    mask = self.parent().po.motion.logical_or
                analysis_i.segmented[mask[0], mask[1], mask[2]] = 1
        else:
            if self.parent().po.computed_video_options[self.parent().po.all['video_option']]:
                if self.parent().po.motion is None:
                    self.load_one_arena()
                if self.status['continue']:
                    if self.parent().po.motion.segmented is None:
                        self.one_detection()
                    if self.status['continue']:
                        analysis_i.segmented = self.parent().po.motion.segmented
        if self.status['continue']:
            analysis_i.start = time_parameters[0]
            analysis_i.step = time_parameters[1]
            analysis_i.lost_frames = time_parameters[2]
            analysis_i.substantial_growth = time_parameters[3]
            analysis_i.origin_idx = self.parent().po.motion.origin_idx
            analysis_i.initialize_post_processing()
            analysis_i.t = analysis_i.start

            while not self.isInterruptionRequested() and analysis_i.t < analysis_i.binary.shape[0]:
                analysis_i.update_shape(False)
                contours = np.nonzero(get_contours(analysis_i.binary[analysis_i.t - 1, :, :]))
                current_image = self.parent().po.motion.visu[analysis_i.t - 1, :, :, :].copy()
                current_image[contours[0], contours[1], :] = self.parent().po.vars['contour_color']
                self.image_from_thread.emit(
                    {"message": f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}, Option n°{seg_i + 1}, Frame n°{analysis_i.t}/{analysis_i.binary.shape[0]}",
                     "current_image": current_image})
            if self.isInterruptionRequested():
                self.status['message'] = f"Was waiting for thread interruption"
                self.status['continue'] = False
                break
            if analysis_i.start is None:
                analysis_i.binary = np.repeat(np.expand_dims(analysis_i.origin, 0),
                                           analysis_i.converted_video.shape[0], axis=0)
                if self.parent().po.vars['color_number'] > 2:
                    self.message_from_thread.emit(
                        f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}, Failed to detect motion. Redo image analysis (with only 2 colors?)")
                else:
                    self.message_from_thread.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}, Option n°{seg_i + 1} failed to detect motion")

            if self.parent().po.all['compute_all_options']:
                if seg_i == 0:
                    self.parent().po.motion.segmented = analysis_i.binary
                elif seg_i == 1:
                    self.parent().po.motion.luminosity_segmentation = np.nonzero(analysis_i.binary)
                elif seg_i == 2:
                    self.parent().po.motion.gradient_segmentation = np.nonzero(analysis_i.binary)
                elif seg_i == 3:
                    self.parent().po.motion.logical_and = np.nonzero(analysis_i.binary)
                elif seg_i == 4:
                    self.parent().po.motion.logical_or = np.nonzero(analysis_i.binary)
            else:
                self.parent().po.motion.segmented = analysis_i.binary
    self.when_detection_finished.emit(f"{self.status['folder']}, Arena n°{self.parent().po.all['arena']}, Post processing done: read, save or run all arenas")

pre_processing()

Pre-processes the video data for further analysis.

This method performs several preprocessing steps for the video tracking, including image segmentation, validating specimen numbers, cropping, background subtraction, and origin detection. It also handles errors related to image analysis and manual delineation.

Returns:

Type Description
bool

Returns True if pre-processing completed successfully; False otherwise.

Source code in src/cellects/core/cellects_threads.py
def pre_processing(self):
    """
    Pre-processes the video data for further analysis.

    This method performs several preprocessing steps for the video tracking, including
    image segmentation, validating specimen numbers, cropping, background subtraction, and origin detection.
    It also handles errors related to image analysis and manual delineation.

    Returns
    -------
    bool
        Returns True if pre-processing completed successfully; False otherwise.
    """
    self.parent().po.load_data_to_run_cellects_quickly()
    if not self.parent().po.first_exp_ready_to_run:
        logging.info("Pre-processing has started")
        if len(self.parent().po.data_list) > 0:
            self.parent().po.get_first_image()
            self.parent().po.load_masks()
            self.parent().po.fast_first_image_segmentation()
            if len(self.parent().po.vars['analyzed_individuals']) != self.parent().po.first_image.shape_number:
                self.status['message'] = f"Wrong specimen number: (re)do the complete analysis."
                self.status['continue'] = False
            else:
                self.parent().po.cropping(is_first_image=True)
                self.parent().po.get_average_pixel_size()
                status = self.parent().po.delineate_each_arena()
                self.status['message'] = status['message']
                self.status['continue'] = status['continue']

                if self.status['continue']:
                    self.parent().po.save_exif()
                    self.parent().po.save_data_to_run_cellects_quickly()
                    self.parent().po.get_background_to_subtract()
                    if len(self.parent().po.vars['analyzed_individuals']) != len(self.parent().po.top):
                        self.status['message'] = f"Wrong specimen number: (re)do the complete analysis."
                        self.status['continue'] = False
                    elif self.parent().po.top is None and self.parent().videoanalysiswindow.video_task == 'one_arena' and self.parent().imageanalysiswindow.manual_delineation_flag:
                        self.status['message'] = f"Auto video delineation failed, use manual delineation tool"
                        self.status['continue'] = False
                    else:
                        self.parent().po.save_origins_and_backgrounds_lists()
                        self.parent().po.get_last_image()
                        self.parent().po.fast_last_image_segmentation()
                        self.parent().po.find_if_lighter_background()
                        logging.info("The current (or the first) folder is ready to run")
                        self.parent().po.first_exp_ready_to_run = True
        else:
            self.status['message'] = f"Wrong folder or parameters"
            self.status['continue'] = False

run_all()

Run the analysis process for video writing and motion analysis.

This method manages the overall flow of the analysis including setting up folders, loading data, writing videos from images, and performing motion analysis. It handles various conditions like checking if the specimen number matches expectations or if multiple experiments are ready to run.

Returns:

Type Description
dict

A dictionary containing: - 'continue': bool indicating if the analysis should continue. - 'message': str with a relevant message about the current status.

Notes

This method uses several internal methods like set_current_folder, run_video_writing, and run_motion_analysis to perform the analysis steps. It also checks various conditions based on parent object attributes.

Source code in src/cellects/core/cellects_threads.py
def run_all(self):
    """
    Run the analysis process for video writing and motion analysis.

    This method manages the overall flow of the analysis including setting up
    folders, loading data, writing videos from images, and performing motion
    analysis. It handles various conditions like checking if the specimen number
    matches expectations or if multiple experiments are ready to run.

    Returns
    -------
    dict
        A dictionary containing:
        - 'continue': bool indicating if the analysis should continue.
        - 'message': str with a relevant message about the current status.
    Notes
    -----
    This method uses several internal methods like `set_current_folder`,
    `run_video_writing`, and `run_motion_analysis` to perform the analysis steps.
    It also checks various conditions based on parent object attributes.
    """
    self.parent().po.all['compute_all_options'] = False
    self.parent().po.load_quick_full = 2
    self.set_current_folder(0)
    if self.parent().po.first_exp_ready_to_run:
        self.message_from_thread.emit(f"{self.status['folder']}, Writing videos")
        if not self.parent().po.vars['several_blob_per_arena'] and self.parent().po.sample_number != len(
                self.parent().po.bot):
            self.status["continue"] = False
            self.status["message"] = f"Wrong specimen number: restart the image analysis."
        else:
            self.run_video_writing()
            if self.status['continue']:
                self.message_from_thread.emit(f"{self.status['folder']}, Analysing all videos")
                self.run_motion_analysis()
                if self.isInterruptionRequested():
                    self.status['message'] = f"Was waiting for thread interruption"
                    self.status['continue'] = False
            if self.status['continue']:
                if self.parent().po.all['folder_number'] > 1:
                    self.parent().po.all['folder_list'] = self.parent().po.all['folder_list'][1:]
                    self.parent().po.all['sample_number_per_folder'] = self.parent().po.all[
                        'sample_number_per_folder'][1:]
    else:
        self.parent().po.look_for_data()

    if self.status['continue'] and (
            not self.parent().po.first_exp_ready_to_run or self.parent().po.all['folder_number'] > 1):
        folder_number = np.max((len(self.parent().po.all['folder_list']), 1))

        for exp_i in np.arange(folder_number):
            if len(self.parent().po.all['folder_list']) > 0:
                logging.info(self.parent().po.all['folder_list'][exp_i])
            self.parent().po.first_im = None
            self.parent().po.first_image = None
            self.parent().po.last_im = None
            self.parent().po.last_image = None
            self.parent().po.top = None

            self.message_from_thread.emit(f"{self.status['folder']}, Pre-processing")
            self.pre_processing()
            if self.status['continue']:
                self.message_from_thread.emit(f"{self.status['folder']}, Writing videos")
                if not self.parent().po.vars[
                    'several_blob_per_arena'] and self.parent().po.sample_number != len(self.parent().po.bot):
                    self.status['continue'] = False
                    self.status['message'] = f"Wrong specimen number: first image analysis is mandatory."
                else:
                    self.run_video_writing()
                    if self.status['continue']:
                        self.message_from_thread.emit(f"{self.status['folder']}, Analysing all videos")
                        self.run_motion_analysis()
                        if self.isInterruptionRequested():
                            self.status['message'] = f"Was waiting for thread interruption"
                            self.status['continue'] = False
            if not self.status['continue']:
                break
    if self.status['continue']:
        if self.parent().po.all['folder_number'] > 1:
            self.message_from_thread.emit(
                f"Exp {self.parent().po.all['folder_list'][0]} to {self.parent().po.all['folder_list'][-1]} analyzed")
        else:
            curr_path = reduce_path_len(self.parent().po.all['global_pathway'], 6, 10)
            self.message_from_thread.emit(f'Exp {curr_path} analyzed')
    else:
        logging.error(f"{self.status['folder']}, {self.status['message']}")
        self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message']}")

run_motion_analysis()

Run motion analysis on analyzed individuals with optional multiprocessing.

This method processes video frames to analyze motion attributes of individuals. It can operate in either sequential or parallel mode based on available system resources and configuration settings. Analysis results are saved in multiple output formats.

Raises:

Type Description
MemoryError

If there is insufficient memory to perform the analysis in parallel.

Notes

Sequential mode is used when multiprocessing is disabled or only one core is available. Parallel mode utilizes multiple CPU cores for faster processing.

Source code in src/cellects/core/cellects_threads.py
def run_motion_analysis(self):
    """
    Run motion analysis on analyzed individuals with optional multiprocessing.

    This method processes video frames to analyze motion attributes of individuals.
    It can operate in either sequential or parallel mode based on available system
    resources and configuration settings. Analysis results are saved in multiple
    output formats.

    Raises
    ------
    MemoryError
        If there is insufficient memory to perform the analysis in parallel.

    Notes
    -----
    Sequential mode is used when multiprocessing is disabled or only one core
    is available. Parallel mode utilizes multiple CPU cores for faster processing.
    """
    logging.info(f"Starting motion analysis with the detection method n°{self.parent().po.all['video_option']}")
    self.parent().po.instantiate_tables()
    try:
        memory_diff = self.parent().po.update_available_core_nb()
        if self.parent().po.cores > 0:  # i.e. enough memory
            if not self.parent().po.all['do_multiprocessing'] or self.parent().po.cores == 1:
                arena_nb = len(self.parent().po.vars['analyzed_individuals'])
                self.status['message'] = f"Starting sequential analysis of {arena_nb} arena(s)"
                self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message']}")
                logging.info(f"{self.status['folder']}, {self.status['message'] }")
                tiii = default_timer()
                pat_tracker = PercentAndTimeTracker(arena_nb)
                for i, arena in enumerate(self.parent().po.vars['analyzed_individuals']):
                    l = [i, arena, self.parent().po.vars, False, False, False, None]
                    self.parent().po.motion = MotionAnalysis(l)
                    if self.isInterruptionRequested():
                        return
                    self.parent().po.motion.assess_motion_detection()
                    if self.isInterruptionRequested():
                        return
                    if self.parent().po.motion.start is not None:
                        self.parent().po.motion.detection()
                        if self.isInterruptionRequested():
                            return
                        self.parent().po.motion.initialize_post_processing()
                        if self.isInterruptionRequested():
                            return
                        self.parent().po.motion.t = self.parent().po.motion.start
                        current_percentage, eta = pat_tracker.get_progress()
                        while not self.isInterruptionRequested() and self.parent().po.motion.t < self.parent().po.motion.dims[0]:  #200:
                            self.parent().po.motion.update_shape(False)
                            t = self.parent().po.motion.t - 1
                            contours = np.nonzero(get_contours(self.parent().po.motion.binary[t, ...]))
                            if self.parent().po.motion.visu is not None:
                                im_to_display = self.parent().po.motion.visu[t, ...].copy()
                                im_to_display[contours[0], contours[1], ...] = self.parent().po.vars['contour_color']
                            else:
                                im_to_display = self.parent().po.motion.binary[t, :, :] * 255
                            self.image_from_thread.emit({"current_image": im_to_display,
                                                         "message": f"{self.status['folder']}, Analyzing arena n°{arena}/{arena_nb} ({current_percentage}%, {eta}), frame: {self.parent().po.motion.t}/{self.parent().po.motion.dims[0]}"})
                        if self.isInterruptionRequested():
                            return
                        do_continue = self.analyze_post_processing_results()
                        if do_continue:
                            self.message_from_thread.emit(self.status['folder'] + f", Analyzing arena n°{arena}/{arena_nb} ({current_percentage}%, {eta}), Saving results")
                            self.parent().po.motion.save_results()
                            if not self.parent().po.vars['several_blob_per_arena']:
                                # Save basic statistics
                                self.parent().po.update_one_row_per_arena(i, self.parent().po.motion.one_descriptor_per_arena)

                                # Save descriptors in long_format
                                self.parent().po.update_one_row_per_frame(i * self.parent().po.vars['img_number'],
                                                                          arena * self.parent().po.vars['img_number'],
                                                                          self.parent().po.motion.one_row_per_frame)

                            # Save efficiency visualization
                            self.parent().po.add_analysis_visualization_to_first_and_last_images(i,
                                                                                                 self.parent().po.motion.efficiency_test_1,
                                                                                                 self.parent().po.motion.efficiency_test_2)
                            # Emit message to the interface
                            if self.isInterruptionRequested():
                                return
                            self.image_from_thread.emit({"current_image": self.parent().po.last_image.bgr,
                                                         "message": f"{self.status['folder']}, Analyzed {arena}/{len(self.parent().po.vars['analyzed_individuals'])} arenas ({current_percentage}%){eta}"})
                    self.parent().po.motion = None
                duration = np.round((default_timer() - tiii) / 60, 2)
                self.status['message'] = f"Sequential analysis lasted {duration} minutes"
                logging.info(f"{self.status['folder']}, {self.status['message'] }")
            else:
                self.status['message'] = f"Analyse all videos using {self.parent().po.cores} cores..."
                self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message'] }")
                logging.info(f"{self.status['folder']}, {self.status['message'] }")

                tiii = default_timer()
                arena_number = len(self.parent().po.vars['analyzed_individuals'])
                self.advance = 0
                self.pat_tracker = PercentAndTimeTracker(len(self.parent().po.vars['analyzed_individuals']),
                                                         core_number=self.parent().po.cores)

                fair_core_workload = arena_number // self.parent().po.cores
                cores_with_1_more = arena_number % self.parent().po.cores
                EXTENTS_OF_SUBRANGES = []
                bound: int = 0
                parallel_organization = [fair_core_workload + 1 for _ in range(int(cores_with_1_more))] + [
                    fair_core_workload for _ in range(int(self.parent().po.cores - cores_with_1_more))]
                # Emit message to the interface
                self.image_from_thread.emit({"current_image": self.parent().po.last_image.bgr,
                                             "message": f"{self.status['folder']}, Analysis running on {self.parent().po.cores} CPU cores"})
                for i, extent_size in enumerate(parallel_organization):
                    EXTENTS_OF_SUBRANGES.append((bound, bound := bound + extent_size))

                try:
                    PROCESSES = []
                    subtotals = Manager().Queue()  # Queue()
                    started_processes: int = 0
                    for extent in EXTENTS_OF_SUBRANGES:
                        if self.isInterruptionRequested():
                            break
                        p = Process(target=motion_analysis_process,
                                    args=(int(extent[0]), int(extent[1]), self.parent().po.vars, subtotals))
                        p.start()
                        PROCESSES.append(p)
                        started_processes += 1

                    finished_processes: int = 0
                    while finished_processes < started_processes:
                        if self.isInterruptionRequested():
                            for p in PROCESSES:
                                if p.is_alive():
                                    p.terminate()
                            for p in PROCESSES:
                                p.join(timeout=1)
                            self.status['message'] = f"Was waiting for thread interruption"
                            self.status['continue'] = False
                            return

                        for p in PROCESSES:
                            if hasattr(p, "_counted"):
                                continue
                            if not p.is_alive():
                                p.join(timeout=0)
                                p._counted = True
                                finished_processes += 1
                        self.msleep(50)
                    self.message_from_thread.emit(f"{self.status['folder']},  Saving all results")
                    for _ in range(finished_processes):
                        grouped_results = subtotals.get()
                        for j, results_i in enumerate(grouped_results):
                            if not self.parent().po.vars['several_blob_per_arena']:
                                # Save basic statistics
                                self.parent().po.update_one_row_per_arena(results_i['i'],
                                                                          results_i['one_row_per_arena'])
                                # Save descriptors in long_format
                                self.parent().po.update_one_row_per_frame(
                                    results_i['i'] * self.parent().po.vars['img_number'],
                                    (results_i['i'] + 1) * self.parent().po.vars['img_number'],
                                    results_i['one_row_per_frame'])

                            self.parent().po.add_analysis_visualization_to_first_and_last_images(results_i['i'],
                                                                                                 results_i[
                                                                                                     'efficiency_test_1'],
                                                                                                 results_i[
                                                                                                     'efficiency_test_2'])
                        del grouped_results
                    del subtotals
                    duration = np.round((default_timer() - tiii) / 60, 2)
                    self.image_from_thread.emit(
                        {"current_image": self.parent().po.last_image.bgr,
                         "message": f"{self.status['folder']}, Analyzed {len(self.parent().po.vars['analyzed_individuals'])}/{len(self.parent().po.vars['analyzed_individuals'])} arenas (100%), Parallel analysis took {duration} minutes"})
                    logging.info(f"Parallel analysis lasted {duration} minutes")
                except MemoryError:
                    self.status['continue'] = False
                    self.status['message'] = f"Not enough memory, reduce the core number for parallel analysis"
                    return
            self.parent().po.save_tables()
        else:
            self.status['continue'] = False
            self.status['message'] = f"Requires an additional {memory_diff}GB of RAM to run"
    except MemoryError:
        self.status['continue'] = False
        self.status['message'] = f"Requires additional memory to run"

run_one_arena()

Run analysis on one arena.

This method prepares and initiates the analysis process for a video by setting up required folders, loading necessary data, and performing pre-processing steps. It manages the state of running analysis and handles memory allocation for efficient processing.

Notes
  • This method uses threading to handle long-running operations without blocking the main UI.
  • The memory allocation is dynamically adjusted based on available system resources.

Attributes:

Name Type Description
self.parent().po.vars['convert_for_motion'] dict

Dictionary containing variables related to motion conversion.

self.parent().po.first_exp_ready_to_run bool

Boolean indicating if the first experiment is ready to run.

self.parent().po.cores int

Number of cores available for processing.

self.parent().po.motion object

Object containing motion-related data and methods.

self.parent().po.load_quick_full int

Number of arenas to load quickly for full detection.

Source code in src/cellects/core/cellects_threads.py
def run_one_arena(self):
    """

    Run analysis on one arena.

    This method prepares and initiates the analysis process for a video by setting up required folders,
    loading necessary data, and performing pre-processing steps. It manages the state of running analysis and
    handles memory allocation for efficient processing.

    Notes
    -----
    - This method uses threading to handle long-running operations without blocking the main UI.
    - The memory allocation is dynamically adjusted based on available system resources.

    Attributes
    ----------
    self.parent().po.vars['convert_for_motion'] : dict
        Dictionary containing variables related to motion conversion.
    self.parent().po.first_exp_ready_to_run : bool
        Boolean indicating if the first experiment is ready to run.
    self.parent().po.cores : int
        Number of cores available for processing.
    self.parent().po.motion : object
        Object containing motion-related data and methods.
    self.parent().po.load_quick_full : int
        Number of arenas to load quickly for full detection.
    """

    self.message_from_thread.emit(f"{self.status['folder']}, Video loading, wait...")
    #Need a look for data when cellects_settings.json and 1 folder selected amon several
    self.pre_processing()
    if self.isInterruptionRequested():
        self.status['message'] = f"Was waiting for thread interruption"
        self.status['continue'] = False
    if self.status['continue']:
        memory_diff = self.parent().po.update_available_core_nb()
        if self.parent().po.cores == 0:
            self.status['message'] = f"Analyzing one arena requires {memory_diff}GB of additional RAM to run"
            self.status['continue'] = False
        else:
            if self.parent().po.motion is None or self.parent().po.load_quick_full == 0:
                self.load_one_arena()
            if self.status['continue'] and self.parent().po.load_quick_full > 0:
                if self.parent().po.motion.start is not None:
                    logging.info("One arena detection has started")
                    self.one_detection()
                    if self.status['continue']:
                        if self.parent().po.load_quick_full > 1:
                            logging.info("One arena post-processing has started")
                            self.post_processing()
                        else:
                            self.when_detection_finished.emit("Detection done, read to see the result")
                else:
                    self.status['message'] = f"The current parameters failed to detect the cell(s) motion"
                    self.status['continue'] = False

    if not self.status['continue']:
        self.message_from_thread.emit(f"{self.status['folder']}, {self.status['message']}")
        logging.error(f"{self.status['folder']}, {self.status['message']}")

run_video_writing()

Initiate the process of writing videos from image data.

Raises:

Type Description
FileNotFoundError

If an image file specified in data_list does not exist.

OSError

If there is an issue writing to disk, such as when the disk is full.

Notes

This function manages video writing in batches, checking available memory and handling errors related to file sizes or missing images

Source code in src/cellects/core/cellects_threads.py
def run_video_writing(self):
    """
    Initiate the process of writing videos from image data.

    Raises
    ------
    FileNotFoundError
        If an image file specified in `data_list` does not exist.
    OSError
        If there is an issue writing to disk, such as when the disk is full.

    Notes
    -----
    This function manages video writing in batches, checking available memory
    and handling errors related to file sizes or missing images
    """
    do_write_videos = video_writing_decision(len(self.parent().po.vars['analyzed_individuals']),
                                             self.parent().po.all['im_or_vid'],
                                             self.parent().po.all['overwrite_unaltered_videos'])
    if not do_write_videos:
        logging.info(f"{self.status['folder']}, Writing videos is not necessary")
    else:
        logging.info(f"Starting video writing")
        in_colors = not self.parent().po.vars['already_greyscale']
        self.parent().po.first_image.shape_number = self.parent().po.sample_number
        bunch_nb, video_nb_per_bunch, sizes, video_bunch, vid_names, rom_memory_required, analysis_status, remaining, use_list_of_vid, is_landscape = self.parent().po.prepare_video_writing(
            self.parent().po.data_list, self.parent().po.vars['min_ram_free'], in_colors)
        if self.status['continue']:
            # Check that there is enough available RAM for one video par bunch and ROM for all videos
            if video_nb_per_bunch > 0 and rom_memory_required is None:
                pat_tracker1 = PercentAndTimeTracker(bunch_nb * self.parent().po.vars['img_number'])
                image_percentage = 0
                im_percent = 0
                for bunch in np.arange(bunch_nb):
                    # Update the labels of arenas and the video_bunch to write
                    if bunch == (bunch_nb - 1) and remaining > 0:
                        arena = np.arange(bunch * video_nb_per_bunch, bunch * video_nb_per_bunch + remaining)
                    else:
                        arena = np.arange(bunch * video_nb_per_bunch, (bunch + 1) * video_nb_per_bunch)
                    if use_list_of_vid:
                        video_bunch = [np.zeros(sizes[i, :], dtype=np.uint8) for i in arena]
                    else:
                        video_bunch = np.zeros(np.append(sizes[0, :], len(arena)), dtype=np.uint8)
                    prev_img = None
                    images_done = bunch * self.parent().po.vars['img_number']
                    for image_i, image_name in enumerate(self.parent().po.data_list):
                        image_percentage, remaining_time = pat_tracker1.get_progress(image_i + images_done)
                        im_percent = np.round(image_percentage, 2)
                        self.message_from_thread.emit(f"{self.status['folder']}, Writing videos ({im_percent}%), bunch n°{bunch + 1}/{bunch_nb}")
                        if not os.path.exists(image_name):
                            raise FileNotFoundError(image_name)
                        img = read_and_rotate(image_name, prev_img, self.parent().po.all['raw_images'],
                                              is_landscape, self.parent().po.first_image.crop_coord)
                        prev_img = img.copy()
                        if self.parent().po.vars['already_greyscale'] and self.parent().po.reduce_image_dim:
                            img = img[:, :, 0]

                        for arena_i, arena_name in enumerate(arena):
                            try:
                                sub_img = img[self.parent().po.top[arena_name]: self.parent().po.bot[arena_name],
                                self.parent().po.left[arena_name]: self.parent().po.right[arena_name], ...]
                                if use_list_of_vid:
                                    video_bunch[arena_i][image_i, ...] = sub_img
                                else:
                                    if len(video_bunch.shape) == 5:
                                        video_bunch[image_i, :, :, :, arena_i] = sub_img
                                    else:
                                        video_bunch[image_i, :, :, arena_i] = sub_img
                            except ValueError:
                                analysis_status[
                                    "message"] = f"Some images have incorrect size, reset all settings in advanced parameters"
                                self.status['continue'] = False
                                logging.info(f"Reset all settings in advanced parameters")
                            if self.isInterruptionRequested():
                                self.status['message'] = f"Was waiting for thread interruption"
                                self.status['continue'] = False
                            if not self.status['continue']:
                                return
                    if self.status['continue']:
                        pat_tracker2 = PercentAndTimeTracker(len(arena))
                        for arena_i, arena_name in enumerate(arena):
                            try:
                                arena_percentage, eta = pat_tracker2.get_progress()
                                self.message_from_thread.emit(f"{self.status['folder']}, Writing videos ({im_percent}%), Saving bunch n°{bunch + 1}/{bunch_nb} ({np.round(arena_percentage, 2)}%)")
                                if use_list_of_vid:
                                    write_h5(vid_names[arena_name], video_bunch[arena_i], 'video')
                                else:
                                    if len(video_bunch.shape) == 5:
                                        write_h5(vid_names[arena_name], video_bunch[:, :, :, :, arena_i], 'video')
                                    else:
                                        write_h5(vid_names[arena_name], video_bunch[:, :, :, arena_i], 'video')
                            except OSError:
                                self.status['message'] = f"Full disk memory: clear space and retry"
                                self.status['continue'] = False
                            if self.isInterruptionRequested():
                                self.status['message'] = f"Was waiting for thread interruption"
                                self.status['continue'] = False
                            if not self.status['continue']:
                                return
                    del video_bunch
                    self.message_from_thread.emit(f"{self.status['folder']}, Writing videos: Bunch {bunch + 1} over {bunch_nb} saved")
                    logging.info(f"{self.status['folder']}, Writing videos: Bunch {bunch + 1} over {bunch_nb} saved.")
                logging.info("When they exist, do not overwrite unaltered video")
                self.parent().po.all['overwrite_unaltered_videos'] = False
                self.parent().po.save_variable_dict()
                self.parent().po.save_data_to_run_cellects_quickly()
            else:
                self.status['continue'] = False
                if video_nb_per_bunch == 0:
                    memory_diff = self.parent().po.update_available_core_nb()
                    ram_message = f"{memory_diff}GB of additional RAM"
                if rom_memory_required is not None:
                    rom_message = f"at least {rom_memory_required}GB of free ROM"

                if video_nb_per_bunch == 0 and rom_memory_required is not None:
                    self.status['message'] = f"Requires {ram_message} and {rom_message} to run"
                    # self.message_from_thread.emit(f"Analyzing {message} requires {ram_message} and {rom_message} to run")
                elif video_nb_per_bunch == 0:
                    self.status['message'] = f"Requires {ram_message} to run"
                    # self.message_from_thread.emit(f"Analyzing {message} requires {ram_message} to run")
                elif rom_memory_required is not None:
                    self.status['message'] = f"Requires {rom_message} to run"
                    # self.message_from_thread.emit(f"Analyzing {message} requires {rom_message} to run")
                logging.info(f"Cellects is not writing videos: insufficient memory")

set_current_folder(exp_i=1)

Sets the current folder based on conditions.

This method determines which folder to use and updates the current folder ID accordingly. If there are multiple folders, it uses the first folder from the list; otherwise, it uses a reduced global pathway as the current.

Source code in src/cellects/core/cellects_threads.py
def set_current_folder(self, exp_i: int=1):
    """

    Sets the current folder based on conditions.

    This method determines which folder to use and updates the current
    folder ID accordingly. If there are multiple folders, it uses the first folder
    from the list; otherwise, it uses a reduced global pathway as the current.
    """
    if self.parent().po.all['folder_number'] > 1:
        logging.info(f"Use {self.parent().po.all['folder_list'][exp_i]} folder")
        self.status["folder"] = f"{str(self.parent().po.all['global_pathway'])[:6]} ... {self.parent().po.all['folder_list'][exp_i]}"
        self.parent().po.update_folder_id(self.parent().po.all['sample_number_per_folder'][exp_i],
                                          self.parent().po.all['folder_list'][exp_i])
    else:
        self.status["folder"] = reduce_path_len(self.parent().po.all['global_pathway'], 6, 10)
        logging.info(f"Use {self.status["folder"]} folder")
        self.parent().po.update_folder_id(self.parent().po.all['first_folder_sample_number'])

WriteVideoThread

Bases: QThread

Thread for writing one video per arena in the current folder.

Notes

This class uses QThread to manage the process asynchronously.

Source code in src/cellects/core/cellects_threads.py
class WriteVideoThread(QtCore.QThread):
    """
    Thread for writing one video per arena in the current folder.

    Notes
    -----
    This class uses `QThread` to manage the process asynchronously.
    """
    def __init__(self, parent=None):
        """
        Initialize the worker thread for writing the video corresponding to the current arena

        Parameters
        ----------
        parent : QObject, optional
            The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
        """
        super(WriteVideoThread, self).__init__(parent)
        self.setParent(parent)

    def run(self):
        """
        Run the visualization or converted video for a specific arena and save it as an .h5 file.

        Parameters
        ----------
        self : object
            The instance of the class containing this method.

        Other Parameters
        ----------------
        arena : str
            Name of the arena.

        already_greyscale : bool
            Flag indicating if the video is already in greyscale format.

        Raises
        ------
        FileNotFoundError
            When the path to write the video is not specified.
        """
        arena = self.parent().po.all['arena']
        if not self.parent().po.vars['already_greyscale']:
            write_video(self.parent().po.visu, f'ind_{arena}.h5')
        else:
            write_video(self.parent().po.converted_video, f'ind_{arena}.h5', is_color=False)

__init__(parent=None)

Initialize the worker thread for writing the video corresponding to the current arena

Parameters:

Name Type Description Default
parent QObject

The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.

None
Source code in src/cellects/core/cellects_threads.py
def __init__(self, parent=None):
    """
    Initialize the worker thread for writing the video corresponding to the current arena

    Parameters
    ----------
    parent : QObject, optional
        The parent object of this thread instance. In use, an instance of CellectsMainWidget class. Default is None.
    """
    super(WriteVideoThread, self).__init__(parent)
    self.setParent(parent)

run()

Run the visualization or converted video for a specific arena and save it as an .h5 file.

Parameters:

Name Type Description Default
self object

The instance of the class containing this method.

required

Other Parameters:

Name Type Description
arena str

Name of the arena.

already_greyscale bool

Flag indicating if the video is already in greyscale format.

Raises:

Type Description
FileNotFoundError

When the path to write the video is not specified.

Source code in src/cellects/core/cellects_threads.py
def run(self):
    """
    Run the visualization or converted video for a specific arena and save it as an .h5 file.

    Parameters
    ----------
    self : object
        The instance of the class containing this method.

    Other Parameters
    ----------------
    arena : str
        Name of the arena.

    already_greyscale : bool
        Flag indicating if the video is already in greyscale format.

    Raises
    ------
    FileNotFoundError
        When the path to write the video is not specified.
    """
    arena = self.parent().po.all['arena']
    if not self.parent().po.vars['already_greyscale']:
        write_video(self.parent().po.visu, f'ind_{arena}.h5')
    else:
        write_video(self.parent().po.converted_video, f'ind_{arena}.h5', is_color=False)

motion_analysis_process(lower_bound, upper_bound, vars, subtotals)

Motion Analysis Process for parallel computing

Process a group of motion analysis results and store them in a queue.

Parameters:

Name Type Description Default
lower_bound int

The lower bound index for the range of analysis.

required
upper_bound int

The upper bound index (exclusive) for the range of analysis.

required
vars dict

Dictionary containing variables and configurations for the motion analysis process.

required
subtotals Queue

A queue to store intermediate results.

required
Notes

This function processes a range of motion analysis results based on the provided configuration variables and stores the intermediate results in a queue.

Source code in src/cellects/core/cellects_threads.py
def motion_analysis_process(lower_bound: int, upper_bound: int, vars: dict, subtotals: Queue) -> None:
    """
    Motion Analysis Process for parallel computing

    Process a group of motion analysis results and store them in a queue.

    Parameters
    ----------
    lower_bound : int
        The lower bound index for the range of analysis.
    upper_bound : int
        The upper bound index (exclusive) for the range of analysis.
    vars : dict
        Dictionary containing variables and configurations for the motion analysis process.
    subtotals : Queue
        A queue to store intermediate results.
    Notes
    -----
    This function processes a range of motion analysis results based on the provided configuration variables and
    stores the intermediate results in a queue.
    """
    grouped_results = []
    for i in range(lower_bound, upper_bound):
        analysis_i = MotionAnalysis([i, i + 1, vars, True, True, False, None])
        results_i = dict()
        results_i['arena'] = analysis_i.one_descriptor_per_arena['arena']
        results_i['i'] = analysis_i.one_descriptor_per_arena['arena'] - 1
        arena = results_i['arena']
        i = arena - 1
        if not vars['several_blob_per_arena']:
            # Save basic statistics
            results_i['one_row_per_arena'] = analysis_i.one_descriptor_per_arena
            # Save descriptors in long_format
            results_i['one_row_per_frame'] = analysis_i.one_row_per_frame

        results_i['first_move'] = analysis_i.one_descriptor_per_arena["first_move"]
        # Save efficiency visualization
        results_i['efficiency_test_1'] = analysis_i.efficiency_test_1
        results_i['efficiency_test_2'] = analysis_i.efficiency_test_2
        grouped_results.append(results_i)
        del analysis_i

    subtotals.put(grouped_results)