From 962d61783991cea382855b794c8fe1779e966da7 Mon Sep 17 00:00:00 2001 From: Mikhail Epifanov Date: Tue, 6 Feb 2024 00:37:37 +0100 Subject: [PATCH] switch to threading, f multiprocessing in python --- cookbook/connectors/connector_manager.py | 20 +++++--------------- pytest.ini | 2 -- requirements.txt | 1 - 3 files changed, 5 insertions(+), 18 deletions(-) diff --git a/cookbook/connectors/connector_manager.py b/cookbook/connectors/connector_manager.py index f32e49c8..e247db30 100644 --- a/cookbook/connectors/connector_manager.py +++ b/cookbook/connectors/connector_manager.py @@ -1,12 +1,10 @@ import asyncio import logging -import multiprocessing import queue -import weakref +import threading from asyncio import Task from dataclasses import dataclass from enum import Enum -from multiprocessing import JoinableQueue from types import UnionType from typing import List, Any, Dict, Optional, Type @@ -17,8 +15,6 @@ from cookbook.connectors.connector import Connector from cookbook.connectors.homeassistant import HomeAssistant from cookbook.models import ShoppingListEntry, Space, ConnectorConfig -multiprocessing.set_start_method('fork') # https://code.djangoproject.com/ticket/31169 - REGISTERED_CLASSES: UnionType | Type = ShoppingListEntry @@ -43,14 +39,13 @@ class Work: # 4. Work is marked as consumed, and next entry of the queue is consumed. # Each 'Work' is processed in sequential by the worker, so the throughput is about [workers * the slowest connector] class ConnectorManager: - _queue: JoinableQueue + _queue: queue.Queue _listening_to_classes = REGISTERED_CLASSES | ConnectorConfig def __init__(self): - self._queue = multiprocessing.JoinableQueue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE) - self._worker = multiprocessing.Process(target=self.worker, args=(0, self._queue,), daemon=True) + self._queue = queue.Queue(maxsize=settings.EXTERNAL_CONNECTORS_QUEUE_SIZE) + self._worker = threading.Thread(target=self.worker, args=(0, self._queue,), daemon=True) self._worker.start() - weakref.finalize(self, self._worker.terminate) # Called by post save & post delete signals def __call__(self, instance: Any, **kwargs) -> None: @@ -75,15 +70,10 @@ class ConnectorManager: def stop(self): self._queue.join() - self._queue.close() self._worker.join() @staticmethod - def worker(worker_id: int, worker_queue: JoinableQueue): - # https://stackoverflow.com/a/10684672 Close open connections after starting a new process to prevent re-use of same connections - from django.db import connections - connections.close_all() - + def worker(worker_id: int, worker_queue: queue.Queue): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) diff --git a/pytest.ini b/pytest.ini index bdfb66bc..79e7a4e2 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,5 +1,3 @@ [pytest] DJANGO_SETTINGS_MODULE = recipes.settings python_files = tests.py test_*.py *_tests.py -env = - DISABLE_EXTERNAL_CONNECTORS=1 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a9d1029b..7f08008a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,7 +36,6 @@ django-scopes==2.0.0 pytest==7.4.3 pytest-asyncio==0.23.3 pytest-django==4.6.0 -pytest-env==1.1.3 django-treebeard==4.7 django-cors-headers==4.2.0 django-storages==1.14.2