From d576394c9957d48dbe08e4ac6f48d2275bf2ddd9 Mon Sep 17 00:00:00 2001 From: Mikhail Epifanov Date: Fri, 12 Jan 2024 20:50:23 +0100 Subject: [PATCH] run everything in a seperate process --- cookbook/connectors/connector_manager.py | 124 +++++++++++++++-------- cookbook/connectors/homeassistant.py | 42 +++++--- 2 files changed, 110 insertions(+), 56 deletions(-) diff --git a/cookbook/connectors/connector_manager.py b/cookbook/connectors/connector_manager.py index 8a50f90b..2be64732 100644 --- a/cookbook/connectors/connector_manager.py +++ b/cookbook/connectors/connector_manager.py @@ -1,7 +1,12 @@ import asyncio +import logging +import multiprocessing +from asyncio import Task +from dataclasses import dataclass from enum import Enum +from multiprocessing import Queue from types import UnionType -from typing import List, Any, Dict +from typing import List, Any, Dict, Optional from django_scopes import scope @@ -9,6 +14,11 @@ from cookbook.connectors.connector import Connector from cookbook.connectors.homeassistant import HomeAssistant from cookbook.models import ShoppingListEntry, Recipe, MealPlan, Space +multiprocessing.set_start_method('fork') # https://code.djangoproject.com/ticket/31169 + +QUEUE_MAX_SIZE = 10 +REGISTERED_CLASSES: UnionType = ShoppingListEntry | Recipe | MealPlan | Connector + class ActionType(Enum): CREATED = 1 @@ -16,38 +26,25 @@ class ActionType(Enum): DELETED = 3 +@dataclass +class Payload: + instance: REGISTERED_CLASSES + actionType: ActionType + + class ConnectorManager: - _connectors: Dict[str, List[Connector]] - _listening_to_classes: UnionType = ShoppingListEntry | Recipe | MealPlan | Connector + _queue: Queue + _listening_to_classes = REGISTERED_CLASSES def __init__(self): - self._connectors = dict() + self._queue = multiprocessing.Queue(maxsize=QUEUE_MAX_SIZE) + self._worker = multiprocessing.Process(target=self.worker, args=(self._queue,), daemon=True) + self._worker.start() def __call__(self, instance: Any, **kwargs) -> None: - if not isinstance(instance, self._listening_to_classes): + if not isinstance(instance, self._listening_to_classes) or not hasattr(instance, "space"): return - # If a Connector was changed/updated, refresh connector from the database for said space - purge_connector_cache = isinstance(instance, Connector) - - space: Space = instance.space - if space.name in self._connectors and not purge_connector_cache: - connectors: List[Connector] = self._connectors[space.name] - else: - with scope(space=space): - connectors: List[Connector] = [HomeAssistant(config) for config in space.homeassistantconfig_set.all() if config.enabled] - self._connectors[space.name] = connectors - - if len(connectors) == 0 or purge_connector_cache: - return - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(self.run_connectors(connectors, space, instance, **kwargs)) - loop.close() - - @staticmethod - async def run_connectors(connectors: List[Connector], space: Space, instance: Any, **kwargs): action_type: ActionType if "created" in kwargs and kwargs["created"]: action_type = ActionType.CREATED @@ -58,23 +55,64 @@ class ConnectorManager: else: return - tasks: List[asyncio.Task] = list() + self._queue.put_nowait(Payload(instance, action_type)) - if isinstance(instance, ShoppingListEntry): - shopping_list_entry: ShoppingListEntry = instance + def stop(self): + self._queue.close() + self._worker.join() - match action_type: - case ActionType.CREATED: - for connector in connectors: - tasks.append(asyncio.create_task(connector.on_shopping_list_entry_created(space, shopping_list_entry))) - case ActionType.UPDATED: - for connector in connectors: - tasks.append(asyncio.create_task(connector.on_shopping_list_entry_updated(space, shopping_list_entry))) - case ActionType.DELETED: - for connector in connectors: - tasks.append(asyncio.create_task(connector.on_shopping_list_entry_deleted(space, shopping_list_entry))) + @staticmethod + def worker(queue: Queue): + from django.db import connections + connections.close_all() - try: - await asyncio.gather(*tasks, return_exceptions=False) - except BaseException as e: - print("received an exception from one of the tasks: ", e) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + _connectors: Dict[str, List[Connector]] = dict() + + while True: + item: Optional[Payload] = queue.get() + if item is None: + break + + # If a Connector was changed/updated, refresh connector from the database for said space + refresh_connector_cache = isinstance(item.instance, Connector) + + space: Space = item.instance.space + connectors: Optional[List[Connector]] = _connectors.get(space.name, None) + + if connectors is None or refresh_connector_cache: + with scope(space=space): + connectors: List[Connector] = [HomeAssistant(config) for config in space.homeassistantconfig_set.all() if config.enabled] + _connectors[space.name] = connectors + + if len(connectors) == 0 or refresh_connector_cache: + return + + loop.run_until_complete(run_connectors(connectors, space, item.instance, item.actionType)) + + loop.close() + + +async def run_connectors(connectors: List[Connector], space: Space, instance: REGISTERED_CLASSES, action_type: ActionType): + tasks: List[Task] = list() + + if isinstance(instance, ShoppingListEntry): + shopping_list_entry: ShoppingListEntry = instance + + match action_type: + case ActionType.CREATED: + for connector in connectors: + tasks.append(asyncio.create_task(connector.on_shopping_list_entry_created(space, shopping_list_entry))) + case ActionType.UPDATED: + for connector in connectors: + tasks.append(asyncio.create_task(connector.on_shopping_list_entry_updated(space, shopping_list_entry))) + case ActionType.DELETED: + for connector in connectors: + tasks.append(asyncio.create_task(connector.on_shopping_list_entry_deleted(space, shopping_list_entry))) + + try: + await asyncio.gather(*tasks, return_exceptions=False) + except BaseException: + logging.exception("received an exception from one of the tasks") diff --git a/cookbook/connectors/homeassistant.py b/cookbook/connectors/homeassistant.py index 6d504d6c..f5ae454f 100644 --- a/cookbook/connectors/homeassistant.py +++ b/cookbook/connectors/homeassistant.py @@ -1,29 +1,41 @@ import logging +from collections import defaultdict +from logging import Logger +from typing import Dict, Any, Optional -from homeassistant_api import Client, HomeassistantAPIError +from homeassistant_api import Client, HomeassistantAPIError, Domain from cookbook.connectors.connector import Connector from cookbook.models import ShoppingListEntry, HomeAssistantConfig, Space class HomeAssistant(Connector): + _domains_cache: dict[str, Domain] _config: HomeAssistantConfig + _logger: Logger + _client: Client def __init__(self, config: HomeAssistantConfig): + self._domains_cache = dict() self._config = config self._logger = logging.getLogger("connector.HomeAssistant") + self._client = Client(self._config.url, self._config.token, async_cache_session=False, use_async=True) async def on_shopping_list_entry_created(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None: if not self._config.on_shopping_list_entry_created_enabled: return item, description = _format_shopping_list_entry(shopping_list_entry) - async with Client(self._config.url, self._config.token, use_async=True) as client: - try: - todo_domain = await client.async_get_domain('todo') - await todo_domain.add_item(entity_id=self._config.todo_entity, item=item) - except HomeassistantAPIError as err: - self._logger.warning(f"[HomeAssistant {self._config.name}] Received an exception from the api: {err=}, {type(err)=}") + + todo_domain = self._domains_cache.get('todo') + try: + if todo_domain is None: + todo_domain = await self._client.async_get_domain('todo') + self._domains_cache['todo'] = todo_domain + + await todo_domain.add_item(entity_id=self._config.todo_entity, item=item) + except HomeassistantAPIError as err: + self._logger.warning(f"[HomeAssistant {self._config.name}] Received an exception from the api: {err=}, {type(err)=}") async def on_shopping_list_entry_updated(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None: if not self._config.on_shopping_list_entry_updated_enabled: @@ -35,12 +47,16 @@ class HomeAssistant(Connector): return item, description = _format_shopping_list_entry(shopping_list_entry) - async with Client(self._config.url, self._config.token, use_async=True) as client: - try: - todo_domain = await client.async_get_domain('todo') - await todo_domain.remove_item(entity_id=self._config.todo_entity, item=item) - except HomeassistantAPIError as err: - self._logger.warning(f"[HomeAssistant {self._config.name}] Received an exception from the api: {err=}, {type(err)=}") + + todo_domain = self._domains_cache.get('todo') + try: + if todo_domain is None: + todo_domain = await self._client.async_get_domain('todo') + self._domains_cache['todo'] = todo_domain + + await todo_domain.remove_item(entity_id=self._config.todo_entity, item=item) + except HomeassistantAPIError as err: + self._logger.warning(f"[HomeAssistant {self._config.name}] Received an exception from the api: {err=}, {type(err)=}") def _format_shopping_list_entry(shopping_list_entry: ShoppingListEntry):