run everything in a seperate process
This commit is contained in:
parent
a61f79507b
commit
d576394c99
@ -1,7 +1,12 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import multiprocessing
|
||||
from asyncio import Task
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from multiprocessing import Queue
|
||||
from types import UnionType
|
||||
from typing import List, Any, Dict
|
||||
from typing import List, Any, Dict, Optional
|
||||
|
||||
from django_scopes import scope
|
||||
|
||||
@ -9,6 +14,11 @@ from cookbook.connectors.connector import Connector
|
||||
from cookbook.connectors.homeassistant import HomeAssistant
|
||||
from cookbook.models import ShoppingListEntry, Recipe, MealPlan, Space
|
||||
|
||||
multiprocessing.set_start_method('fork') # https://code.djangoproject.com/ticket/31169
|
||||
|
||||
QUEUE_MAX_SIZE = 10
|
||||
REGISTERED_CLASSES: UnionType = ShoppingListEntry | Recipe | MealPlan | Connector
|
||||
|
||||
|
||||
class ActionType(Enum):
|
||||
CREATED = 1
|
||||
@ -16,38 +26,25 @@ class ActionType(Enum):
|
||||
DELETED = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class Payload:
|
||||
instance: REGISTERED_CLASSES
|
||||
actionType: ActionType
|
||||
|
||||
|
||||
class ConnectorManager:
|
||||
_connectors: Dict[str, List[Connector]]
|
||||
_listening_to_classes: UnionType = ShoppingListEntry | Recipe | MealPlan | Connector
|
||||
_queue: Queue
|
||||
_listening_to_classes = REGISTERED_CLASSES
|
||||
|
||||
def __init__(self):
|
||||
self._connectors = dict()
|
||||
self._queue = multiprocessing.Queue(maxsize=QUEUE_MAX_SIZE)
|
||||
self._worker = multiprocessing.Process(target=self.worker, args=(self._queue,), daemon=True)
|
||||
self._worker.start()
|
||||
|
||||
def __call__(self, instance: Any, **kwargs) -> None:
|
||||
if not isinstance(instance, self._listening_to_classes):
|
||||
if not isinstance(instance, self._listening_to_classes) or not hasattr(instance, "space"):
|
||||
return
|
||||
|
||||
# If a Connector was changed/updated, refresh connector from the database for said space
|
||||
purge_connector_cache = isinstance(instance, Connector)
|
||||
|
||||
space: Space = instance.space
|
||||
if space.name in self._connectors and not purge_connector_cache:
|
||||
connectors: List[Connector] = self._connectors[space.name]
|
||||
else:
|
||||
with scope(space=space):
|
||||
connectors: List[Connector] = [HomeAssistant(config) for config in space.homeassistantconfig_set.all() if config.enabled]
|
||||
self._connectors[space.name] = connectors
|
||||
|
||||
if len(connectors) == 0 or purge_connector_cache:
|
||||
return
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(self.run_connectors(connectors, space, instance, **kwargs))
|
||||
loop.close()
|
||||
|
||||
@staticmethod
|
||||
async def run_connectors(connectors: List[Connector], space: Space, instance: Any, **kwargs):
|
||||
action_type: ActionType
|
||||
if "created" in kwargs and kwargs["created"]:
|
||||
action_type = ActionType.CREATED
|
||||
@ -58,23 +55,64 @@ class ConnectorManager:
|
||||
else:
|
||||
return
|
||||
|
||||
tasks: List[asyncio.Task] = list()
|
||||
self._queue.put_nowait(Payload(instance, action_type))
|
||||
|
||||
if isinstance(instance, ShoppingListEntry):
|
||||
shopping_list_entry: ShoppingListEntry = instance
|
||||
def stop(self):
|
||||
self._queue.close()
|
||||
self._worker.join()
|
||||
|
||||
match action_type:
|
||||
case ActionType.CREATED:
|
||||
for connector in connectors:
|
||||
tasks.append(asyncio.create_task(connector.on_shopping_list_entry_created(space, shopping_list_entry)))
|
||||
case ActionType.UPDATED:
|
||||
for connector in connectors:
|
||||
tasks.append(asyncio.create_task(connector.on_shopping_list_entry_updated(space, shopping_list_entry)))
|
||||
case ActionType.DELETED:
|
||||
for connector in connectors:
|
||||
tasks.append(asyncio.create_task(connector.on_shopping_list_entry_deleted(space, shopping_list_entry)))
|
||||
@staticmethod
|
||||
def worker(queue: Queue):
|
||||
from django.db import connections
|
||||
connections.close_all()
|
||||
|
||||
try:
|
||||
await asyncio.gather(*tasks, return_exceptions=False)
|
||||
except BaseException as e:
|
||||
print("received an exception from one of the tasks: ", e)
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
_connectors: Dict[str, List[Connector]] = dict()
|
||||
|
||||
while True:
|
||||
item: Optional[Payload] = queue.get()
|
||||
if item is None:
|
||||
break
|
||||
|
||||
# If a Connector was changed/updated, refresh connector from the database for said space
|
||||
refresh_connector_cache = isinstance(item.instance, Connector)
|
||||
|
||||
space: Space = item.instance.space
|
||||
connectors: Optional[List[Connector]] = _connectors.get(space.name, None)
|
||||
|
||||
if connectors is None or refresh_connector_cache:
|
||||
with scope(space=space):
|
||||
connectors: List[Connector] = [HomeAssistant(config) for config in space.homeassistantconfig_set.all() if config.enabled]
|
||||
_connectors[space.name] = connectors
|
||||
|
||||
if len(connectors) == 0 or refresh_connector_cache:
|
||||
return
|
||||
|
||||
loop.run_until_complete(run_connectors(connectors, space, item.instance, item.actionType))
|
||||
|
||||
loop.close()
|
||||
|
||||
|
||||
async def run_connectors(connectors: List[Connector], space: Space, instance: REGISTERED_CLASSES, action_type: ActionType):
|
||||
tasks: List[Task] = list()
|
||||
|
||||
if isinstance(instance, ShoppingListEntry):
|
||||
shopping_list_entry: ShoppingListEntry = instance
|
||||
|
||||
match action_type:
|
||||
case ActionType.CREATED:
|
||||
for connector in connectors:
|
||||
tasks.append(asyncio.create_task(connector.on_shopping_list_entry_created(space, shopping_list_entry)))
|
||||
case ActionType.UPDATED:
|
||||
for connector in connectors:
|
||||
tasks.append(asyncio.create_task(connector.on_shopping_list_entry_updated(space, shopping_list_entry)))
|
||||
case ActionType.DELETED:
|
||||
for connector in connectors:
|
||||
tasks.append(asyncio.create_task(connector.on_shopping_list_entry_deleted(space, shopping_list_entry)))
|
||||
|
||||
try:
|
||||
await asyncio.gather(*tasks, return_exceptions=False)
|
||||
except BaseException:
|
||||
logging.exception("received an exception from one of the tasks")
|
||||
|
@ -1,29 +1,41 @@
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from logging import Logger
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
from homeassistant_api import Client, HomeassistantAPIError
|
||||
from homeassistant_api import Client, HomeassistantAPIError, Domain
|
||||
|
||||
from cookbook.connectors.connector import Connector
|
||||
from cookbook.models import ShoppingListEntry, HomeAssistantConfig, Space
|
||||
|
||||
|
||||
class HomeAssistant(Connector):
|
||||
_domains_cache: dict[str, Domain]
|
||||
_config: HomeAssistantConfig
|
||||
_logger: Logger
|
||||
_client: Client
|
||||
|
||||
def __init__(self, config: HomeAssistantConfig):
|
||||
self._domains_cache = dict()
|
||||
self._config = config
|
||||
self._logger = logging.getLogger("connector.HomeAssistant")
|
||||
self._client = Client(self._config.url, self._config.token, async_cache_session=False, use_async=True)
|
||||
|
||||
async def on_shopping_list_entry_created(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None:
|
||||
if not self._config.on_shopping_list_entry_created_enabled:
|
||||
return
|
||||
|
||||
item, description = _format_shopping_list_entry(shopping_list_entry)
|
||||
async with Client(self._config.url, self._config.token, use_async=True) as client:
|
||||
try:
|
||||
todo_domain = await client.async_get_domain('todo')
|
||||
await todo_domain.add_item(entity_id=self._config.todo_entity, item=item)
|
||||
except HomeassistantAPIError as err:
|
||||
self._logger.warning(f"[HomeAssistant {self._config.name}] Received an exception from the api: {err=}, {type(err)=}")
|
||||
|
||||
todo_domain = self._domains_cache.get('todo')
|
||||
try:
|
||||
if todo_domain is None:
|
||||
todo_domain = await self._client.async_get_domain('todo')
|
||||
self._domains_cache['todo'] = todo_domain
|
||||
|
||||
await todo_domain.add_item(entity_id=self._config.todo_entity, item=item)
|
||||
except HomeassistantAPIError as err:
|
||||
self._logger.warning(f"[HomeAssistant {self._config.name}] Received an exception from the api: {err=}, {type(err)=}")
|
||||
|
||||
async def on_shopping_list_entry_updated(self, space: Space, shopping_list_entry: ShoppingListEntry) -> None:
|
||||
if not self._config.on_shopping_list_entry_updated_enabled:
|
||||
@ -35,12 +47,16 @@ class HomeAssistant(Connector):
|
||||
return
|
||||
|
||||
item, description = _format_shopping_list_entry(shopping_list_entry)
|
||||
async with Client(self._config.url, self._config.token, use_async=True) as client:
|
||||
try:
|
||||
todo_domain = await client.async_get_domain('todo')
|
||||
await todo_domain.remove_item(entity_id=self._config.todo_entity, item=item)
|
||||
except HomeassistantAPIError as err:
|
||||
self._logger.warning(f"[HomeAssistant {self._config.name}] Received an exception from the api: {err=}, {type(err)=}")
|
||||
|
||||
todo_domain = self._domains_cache.get('todo')
|
||||
try:
|
||||
if todo_domain is None:
|
||||
todo_domain = await self._client.async_get_domain('todo')
|
||||
self._domains_cache['todo'] = todo_domain
|
||||
|
||||
await todo_domain.remove_item(entity_id=self._config.todo_entity, item=item)
|
||||
except HomeassistantAPIError as err:
|
||||
self._logger.warning(f"[HomeAssistant {self._config.name}] Received an exception from the api: {err=}, {type(err)=}")
|
||||
|
||||
|
||||
def _format_shopping_list_entry(shopping_list_entry: ShoppingListEntry):
|
||||
|
Loading…
Reference in New Issue
Block a user