diff --git a/cookbook/connectors/connector_manager.py b/cookbook/connectors/connector_manager.py index 8a35ba74..bc3293ff 100644 --- a/cookbook/connectors/connector_manager.py +++ b/cookbook/connectors/connector_manager.py @@ -9,8 +9,8 @@ from multiprocessing import JoinableQueue from types import UnionType from typing import List, Any, Dict, Optional, Type -from django_scopes import scope from django.conf import settings +from django_scopes import scope from cookbook.connectors.connector import Connector from cookbook.connectors.homeassistant import HomeAssistant @@ -47,7 +47,7 @@ class ConnectorManager: def __init__(self): self._queue = multiprocessing.JoinableQueue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE) - self._worker = multiprocessing.Process(target=self.worker, args=(self._queue,), daemon=True) + self._worker = multiprocessing.Process(target=self.worker, args=(0, self._queue,), daemon=True) self._worker.start() # Called by post save & post delete signals @@ -77,7 +77,7 @@ class ConnectorManager: self._worker.join() @staticmethod - def worker(worker_queue: JoinableQueue): + def worker(worker_id: int, worker_queue: JoinableQueue): # https://stackoverflow.com/a/10684672 Close open connections after starting a new process to prevent re-use of same connections from django.db import connections connections.close_all() @@ -85,7 +85,9 @@ class ConnectorManager: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - # + logging.info(f"started ConnectionManager worker {worker_id}") + + # When multiple workers are used, please make sure the cache is shared across all threads, otherwise it might lead to un-expected behavior. _connectors_cache: Dict[int, List[Connector]] = dict() while True: @@ -132,6 +134,7 @@ class ConnectorManager: loop.run_until_complete(run_connectors(connectors, space, item.instance, item.actionType)) worker_queue.task_done() + logging.info(f"terminating ConnectionManager worker {worker_id}") @staticmethod def get_connected_for_config(config: ConnectorConfig) -> Optional[Connector]: