switch to threading, f multiprocessing in python

This commit is contained in:
Mikhail Epifanov 2024-02-06 00:37:37 +01:00
parent 65a7c82af9
commit 962d617839
No known key found for this signature in database
3 changed files with 5 additions and 18 deletions

View File

@ -1,12 +1,10 @@
import asyncio import asyncio
import logging import logging
import multiprocessing
import queue import queue
import weakref import threading
from asyncio import Task from asyncio import Task
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from multiprocessing import JoinableQueue
from types import UnionType from types import UnionType
from typing import List, Any, Dict, Optional, Type from typing import List, Any, Dict, Optional, Type
@ -17,8 +15,6 @@ from cookbook.connectors.connector import Connector
from cookbook.connectors.homeassistant import HomeAssistant from cookbook.connectors.homeassistant import HomeAssistant
from cookbook.models import ShoppingListEntry, Space, ConnectorConfig from cookbook.models import ShoppingListEntry, Space, ConnectorConfig
multiprocessing.set_start_method('fork') # https://code.djangoproject.com/ticket/31169
REGISTERED_CLASSES: UnionType | Type = ShoppingListEntry REGISTERED_CLASSES: UnionType | Type = ShoppingListEntry
@ -43,14 +39,13 @@ class Work:
# 4. Work is marked as consumed, and next entry of the queue is consumed. # 4. Work is marked as consumed, and next entry of the queue is consumed.
# Each 'Work' is processed in sequential by the worker, so the throughput is about [workers * the slowest connector] # Each 'Work' is processed in sequential by the worker, so the throughput is about [workers * the slowest connector]
class ConnectorManager: class ConnectorManager:
_queue: JoinableQueue _queue: queue.Queue
_listening_to_classes = REGISTERED_CLASSES | ConnectorConfig _listening_to_classes = REGISTERED_CLASSES | ConnectorConfig
def __init__(self): def __init__(self):
self._queue = multiprocessing.JoinableQueue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE) self._queue = queue.Queue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE)
self._worker = multiprocessing.Process(target=self.worker, args=(0, self._queue,), daemon=True) self._worker = threading.Thread(target=self.worker, args=(0, self._queue,), daemon=True)
self._worker.start() self._worker.start()
weakref.finalize(self, self._worker.terminate)
# Called by post save & post delete signals # Called by post save & post delete signals
def __call__(self, instance: Any, **kwargs) -> None: def __call__(self, instance: Any, **kwargs) -> None:
@ -75,15 +70,10 @@ class ConnectorManager:
def stop(self): def stop(self):
self._queue.join() self._queue.join()
self._queue.close()
self._worker.join() self._worker.join()
@staticmethod @staticmethod
def worker(worker_id: int, worker_queue: JoinableQueue): def worker(worker_id: int, worker_queue: queue.Queue):
# https://stackoverflow.com/a/10684672 Close open connections after starting a new process to prevent re-use of same connections
from django.db import connections
connections.close_all()
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)

View File

@ -1,5 +1,3 @@
[pytest] [pytest]
DJANGO_SETTINGS_MODULE = recipes.settings DJANGO_SETTINGS_MODULE = recipes.settings
python_files = tests.py test_*.py *_tests.py python_files = tests.py test_*.py *_tests.py
env =
DISABLE_EXTERNAL_CONNECTORS=1

View File

@ -36,7 +36,6 @@ django-scopes==2.0.0
pytest==7.4.3 pytest==7.4.3
pytest-asyncio==0.23.3 pytest-asyncio==0.23.3
pytest-django==4.6.0 pytest-django==4.6.0
pytest-env==1.1.3
django-treebeard==4.7 django-treebeard==4.7
django-cors-headers==4.2.0 django-cors-headers==4.2.0
django-storages==1.14.2 django-storages==1.14.2