Update the code based on feedback. set Default to enabled, add to documentation how to disable it. Add extra documentation

This commit is contained in:
Mikhail Epifanov 2024-01-28 22:59:51 +01:00
parent ba169ba38d
commit 502a606534
No known key found for this signature in database
8 changed files with 46 additions and 20 deletions

1
.gitignore vendored
View File

@ -54,7 +54,6 @@ docs/_build/
target/
\.idea/dataSources/
.idea
\.idea/dataSources\.xml
\.idea/dataSources\.local\.xml

View File

@ -3,6 +3,7 @@ from abc import ABC, abstractmethod
from cookbook.models import ShoppingListEntry, Space, ConnectorConfig
# A Connector is 'destroyed' & recreated each time 'any' ConnectorConfig in a space changes.
class Connector(ABC):
@abstractmethod
def __init__(self, config: ConnectorConfig):
@ -12,6 +13,7 @@ class Connector(ABC):
async def on_shopping_list_entry_created(self, space: Space, instance: ShoppingListEntry) -> None:
pass
# This method might not trigger on 'direct' entry updates: https://stackoverflow.com/a/35238823
@abstractmethod
async def on_shopping_list_entry_updated(self, space: Space, instance: ShoppingListEntry) -> None:
pass

View File

@ -7,18 +7,18 @@ from dataclasses import dataclass
from enum import Enum
from multiprocessing import JoinableQueue
from types import UnionType
from typing import List, Any, Dict, Optional
from typing import List, Any, Dict, Optional, Type
from django_scopes import scope
from django.conf import settings
from cookbook.connectors.connector import Connector
from cookbook.connectors.homeassistant import HomeAssistant
from cookbook.models import ShoppingListEntry, Recipe, MealPlan, Space, ConnectorConfig
from cookbook.models import ShoppingListEntry, Space, ConnectorConfig
multiprocessing.set_start_method('fork') # https://code.djangoproject.com/ticket/31169
QUEUE_MAX_SIZE = 25
REGISTERED_CLASSES: UnionType = ShoppingListEntry | Recipe | MealPlan
REGISTERED_CLASSES: UnionType | Type = ShoppingListEntry
class ActionType(Enum):
@ -33,12 +33,20 @@ class Work:
actionType: ActionType
# The way ConnectionManager works is as follows:
# 1. On init, it starts a worker & creates a queue for 'Work'
# 2. Then any time its called, it verifies the type of action (create/update/delete) and if the item is of interest, pushes the Work (non blocking) to the queue.
# 3. The worker consumes said work from the queue.
# 3.1 If the work is of type ConnectorConfig, it flushes its cache of known connectors (per space.id)
# 3.2 If work is of type REGISTERED_CLASSES, it asynchronously fires of all connectors and wait for them to finish (runtime should depend on the 'slowest' connector)
# 4. Work is marked as consumed, and next entry of the queue is consumed.
# Each 'Work' is processed in sequential by the worker, so the throughput is about [workers * the slowest connector]
class ConnectorManager:
_queue: JoinableQueue
_listening_to_classes = REGISTERED_CLASSES | ConnectorConfig
def __init__(self):
self._queue = multiprocessing.JoinableQueue(maxsize=QUEUE_MAX_SIZE)
self._queue = multiprocessing.JoinableQueue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE)
self._worker = multiprocessing.Process(target=self.worker, args=(self._queue,), daemon=True)
self._worker.start()
@ -75,7 +83,7 @@ class ConnectorManager:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_connectors: Dict[str, List[Connector]] = dict()
_connectors: Dict[int, List[Connector]] = dict()
while True:
try:
@ -90,7 +98,7 @@ class ConnectorManager:
refresh_connector_cache = isinstance(item.instance, ConnectorConfig)
space: Space = item.instance.space
connectors: Optional[List[Connector]] = _connectors.get(space.name)
connectors: Optional[List[Connector]] = _connectors.get(space.id)
if connectors is None or refresh_connector_cache:
if connectors is not None:
@ -111,7 +119,7 @@ class ConnectorManager:
connectors.append(connector)
_connectors[space.name] = connectors
_connectors[space.id] = connectors
if len(connectors) == 0 or refresh_connector_cache:
worker_queue.task_done()
@ -134,6 +142,9 @@ class ConnectorManager:
async def close_connectors(connectors: List[Connector]):
tasks: List[Task] = [asyncio.create_task(connector.close()) for connector in connectors]
if len(tasks) == 0:
return
try:
await asyncio.gather(*tasks, return_exceptions=False)
except BaseException:
@ -161,6 +172,7 @@ async def run_connectors(connectors: List[Connector], space: Space, instance: RE
return
try:
# Wait for all async tasks to finish, if one fails, the others still continue.
await asyncio.gather(*tasks, return_exceptions=False)
except BaseException:
logging.exception("received an exception from one of the connectors")

View File

@ -15,7 +15,6 @@ from cookbook.helper.shopping_helper import RecipeShoppingEditor
from cookbook.managers import DICTIONARY
from cookbook.models import (Food, MealPlan, PropertyType, Recipe, SearchFields, SearchPreference,
Step, Unit, UserPreference)
from recipes.settings import ENABLE_EXTERNAL_CONNECTORS
SQLITE = True
if settings.DATABASES['default']['ENGINE'] == 'django.db.backends.postgresql':
@ -165,7 +164,7 @@ def clear_property_type_cache(sender, instance=None, created=False, **kwargs):
caches['default'].delete(CacheHelper(instance.space).PROPERTY_TYPE_CACHE_KEY)
if ENABLE_EXTERNAL_CONNECTORS:
if not settings.DISABLE_EXTERNAL_CONNECTORS:
handler = ConnectorManager()
post_save.connect(handler, dispatch_uid="connector_manager")
post_delete.connect(handler, dispatch_uid="connector_manager")

View File

@ -643,13 +643,12 @@ class FoodViewSet(viewsets.ModelViewSet, TreeMixin):
if pt.fdc_id:
for fn in data['foodNutrients']:
if fn['nutrient']['id'] == pt.fdc_id:
food_property_list.append(
Property(
property_type_id=pt.id,
property_amount=round(fn['amount'], 2),
import_food_id=food.id,
space=self.request.space,
))
food_property_list.append(Property(
property_type_id=pt.id,
property_amount=round(fn['amount'], 2),
import_food_id=food.id,
space=self.request.space,
))
Property.objects.bulk_create(food_property_list, ignore_conflicts=True, unique_fields=('space', 'import_food_id', 'property_type',))

View File

@ -78,10 +78,14 @@ class ConnectorConfigCreate(GroupRequiredMixin, CreateView):
success_url = reverse_lazy('list_connector_config')
def form_valid(self, form):
if self.request.space.demo or settings.HOSTED:
if self.request.space.demo:
messages.add_message(self.request, messages.ERROR, _('This feature is not yet available in the hosted version of tandoor!'))
return redirect('index')
if settings.DISABLE_EXTERNAL_CONNECTORS:
messages.add_message(self.request, messages.ERROR, _('This feature is not enabled by the server admin!'))
return redirect('index')
obj = form.save(commit=False)
obj.token = form.cleaned_data['update_token']
obj.created_by = self.request.user

View File

@ -437,6 +437,16 @@ key [here](https://fdc.nal.usda.gov/api-key-signup.html).
FDC_API_KEY=DEMO_KEY
```
#### External Connectors
`DISABLE_EXTERNAL_CONNECTORS` is a global switch to disable External Connectors entirely (e.g. HomeAssistant).
`EXTERNAL_CONNECTORS_QUEUE_SIZE` is the amount of changes that are kept in memory if the worker cannot keep up.
```env
DISABLE_EXTERNAL_CONNECTORS=0 // 0 = connectors enabled, 1 = connectors enabled
EXTERNAL_CONNECTORS_QUEUE_SIZE=25
```
### Debugging/Development settings
!!! warning

View File

@ -555,6 +555,7 @@ DEFAULT_FROM_EMAIL = os.getenv('DEFAULT_FROM_EMAIL', 'webmaster@localhost')
ACCOUNT_EMAIL_SUBJECT_PREFIX = os.getenv(
'ACCOUNT_EMAIL_SUBJECT_PREFIX', '[Tandoor Recipes] ') # allauth sender prefix
ENABLE_EXTERNAL_CONNECTORS = bool(int(os.getenv('ENABLE_EXTERNAL_CONNECTORS', False)))
DISABLE_EXTERNAL_CONNECTORS = bool(int(os.getenv('DISABLE_EXTERNAL_CONNECTORS', False)))
EXTERNAL_CONNECTORS_QUEUE_SIZE = int(os.getenv('EXTERNAL_CONNECTORS_QUEUE_SIZE', 25))
mimetypes.add_type("text/javascript", ".js", True)