add startup & termination log to worker

This commit is contained in:
Mikhail Epifanov 2024-02-05 23:40:50 +01:00
parent 0279013f72
commit 0e945f4bd7
No known key found for this signature in database

View File

@ -9,8 +9,8 @@ from multiprocessing import JoinableQueue
from types import UnionType from types import UnionType
from typing import List, Any, Dict, Optional, Type from typing import List, Any, Dict, Optional, Type
from django_scopes import scope
from django.conf import settings from django.conf import settings
from django_scopes import scope
from cookbook.connectors.connector import Connector from cookbook.connectors.connector import Connector
from cookbook.connectors.homeassistant import HomeAssistant from cookbook.connectors.homeassistant import HomeAssistant
@ -47,7 +47,7 @@ class ConnectorManager:
def __init__(self): def __init__(self):
self._queue = multiprocessing.JoinableQueue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE) self._queue = multiprocessing.JoinableQueue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE)
self._worker = multiprocessing.Process(target=self.worker, args=(self._queue,), daemon=True) self._worker = multiprocessing.Process(target=self.worker, args=(0, self._queue,), daemon=True)
self._worker.start() self._worker.start()
# Called by post save & post delete signals # Called by post save & post delete signals
@ -77,7 +77,7 @@ class ConnectorManager:
self._worker.join() self._worker.join()
@staticmethod @staticmethod
def worker(worker_queue: JoinableQueue): def worker(worker_id: int, worker_queue: JoinableQueue):
# https://stackoverflow.com/a/10684672 Close open connections after starting a new process to prevent re-use of same connections # https://stackoverflow.com/a/10684672 Close open connections after starting a new process to prevent re-use of same connections
from django.db import connections from django.db import connections
connections.close_all() connections.close_all()
@ -85,7 +85,9 @@ class ConnectorManager:
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
# logging.info(f"started ConnectionManager worker {worker_id}")
# When multiple workers are used, please make sure the cache is shared across all threads, otherwise it might lead to un-expected behavior.
_connectors_cache: Dict[int, List[Connector]] = dict() _connectors_cache: Dict[int, List[Connector]] = dict()
while True: while True:
@ -132,6 +134,7 @@ class ConnectorManager:
loop.run_until_complete(run_connectors(connectors, space, item.instance, item.actionType)) loop.run_until_complete(run_connectors(connectors, space, item.instance, item.actionType))
worker_queue.task_done() worker_queue.task_done()
logging.info(f"terminating ConnectionManager worker {worker_id}")
@staticmethod @staticmethod
def get_connected_for_config(config: ConnectorConfig) -> Optional[Connector]: def get_connected_for_config(config: ConnectorConfig) -> Optional[Connector]: