diff --git a/cookbook/helper/automation_helper.py b/cookbook/helper/automation_helper.py new file mode 100644 index 00000000..a86d405b --- /dev/null +++ b/cookbook/helper/automation_helper.py @@ -0,0 +1,227 @@ +import re + +from django.core.cache import caches +from django.db.models.functions import Lower + +from cookbook.models import Automation + + +class AutomationEngine: + request = None + source = None + use_cache = None + food_aliases = None + keyword_aliases = None + unit_aliases = None + never_unit = None + transpose_words = None + regex_replace = { + Automation.DESCRIPTION_REPLACE: None, + Automation.INSTRUCTION_REPLACE: None, + Automation.FOOD_REPLACE: None, + Automation.UNIT_REPLACE: None, + Automation.NAME_REPLACE: None, + } + + def __init__(self, request, use_cache=True, source=None): + self.request = request + self.use_cache = use_cache + if not source: + self.source = "default_string_to_avoid_false_regex_match" + else: + self.source = source + + def apply_keyword_automation(self, keyword): + keyword = keyword.strip() + if self.use_cache and self.keyword_aliases is None: + self.keyword_aliases = {} + KEYWORD_CACHE_KEY = f'automation_keyword_alias_{self.request.space.pk}' + if c := caches['default'].get(KEYWORD_CACHE_KEY, None): + self.keyword_aliases = c + caches['default'].touch(KEYWORD_CACHE_KEY, 30) + else: + for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.KEYWORD_ALIAS).only('param_1', 'param_2').order_by('order').all(): + self.keyword_aliases[a.param_1.lower()] = a.param_2 + caches['default'].set(KEYWORD_CACHE_KEY, self.keyword_aliases, 30) + else: + self.keyword_aliases = {} + if self.keyword_aliases: + try: + keyword = self.keyword_aliases[keyword.lower()] + except KeyError: + pass + else: + if automation := Automation.objects.filter(space=self.request.space, type=Automation.KEYWORD_ALIAS, param_1__iexact=keyword, disabled=False).order_by('order').first(): + return automation.param_2 + return keyword + + def apply_unit_automation(self, unit): + unit = unit.strip() + if self.use_cache and self.unit_aliases is None: + self.unit_aliases = {} + UNIT_CACHE_KEY = f'automation_unit_alias_{self.request.space.pk}' + if c := caches['default'].get(UNIT_CACHE_KEY, None): + self.unit_aliases = c + caches['default'].touch(UNIT_CACHE_KEY, 30) + else: + for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.UNIT_ALIAS).only('param_1', 'param_2').order_by('order').all(): + self.unit_aliases[a.param_1.lower()] = a.param_2 + caches['default'].set(UNIT_CACHE_KEY, self.unit_aliases, 30) + else: + self.unit_aliases = {} + if self.unit_aliases: + try: + unit = self.unit_aliases[unit.lower()] + except KeyError: + pass + else: + if automation := Automation.objects.filter(space=self.request.space, type=Automation.UNIT_ALIAS, param_1__iexact=unit, disabled=False).order_by('order').first(): + return automation.param_2 + return self.apply_regex_replace_automation(unit, Automation.UNIT_REPLACE) + + def apply_food_automation(self, food): + food = food.strip() + if self.use_cache and self.food_aliases is None: + self.food_aliases = {} + FOOD_CACHE_KEY = f'automation_food_alias_{self.request.space.pk}' + if c := caches['default'].get(FOOD_CACHE_KEY, None): + self.food_aliases = c + caches['default'].touch(FOOD_CACHE_KEY, 30) + else: + for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.FOOD_ALIAS).only('param_1', 'param_2').order_by('order').all(): + self.food_aliases[a.param_1.lower()] = a.param_2 + caches['default'].set(FOOD_CACHE_KEY, self.food_aliases, 30) + else: + self.food_aliases = {} + + if self.food_aliases: + try: + return self.food_aliases[food.lower()] + except KeyError: + return food + else: + if automation := Automation.objects.filter(space=self.request.space, type=Automation.FOOD_ALIAS, param_1__iexact=food, disabled=False).order_by('order').first(): + return automation.param_2 + return self.apply_regex_replace_automation(food, Automation.FOOD_REPLACE) + + def apply_never_unit_automation(self, tokens): + """ + Moves a string that should never be treated as a unit to next token and optionally replaced with default unit + e.g. NEVER_UNIT: param1: egg, param2: None would modify ['1', 'egg', 'white'] to ['1', '', 'egg', 'white'] + or NEVER_UNIT: param1: egg, param2: pcs would modify ['1', 'egg', 'yolk'] to ['1', 'pcs', 'egg', 'yolk'] + :param1 string: string that should never be considered a unit, will be moved to token[2] + :param2 (optional) unit as string: will insert unit string into token[1] + :return: unit as string (possibly changed by automation) + """ + + if self.use_cache and self.never_unit is None: + self.never_unit = {} + NEVER_UNIT_CACHE_KEY = f'automation_never_unit_{self.request.space.pk}' + if c := caches['default'].get(NEVER_UNIT_CACHE_KEY, None): + self.never_unit = c + caches['default'].touch(NEVER_UNIT_CACHE_KEY, 30) + else: + for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.NEVER_UNIT).only('param_1', 'param_2').order_by('order').all(): + self.never_unit[a.param_1.lower()] = a.param_2 + caches['default'].set(NEVER_UNIT_CACHE_KEY, self.never_unit, 30) + else: + self.never_unit = {} + + new_unit = None + alt_unit = self.apply_unit_automation(tokens[1]) + never_unit = False + if self.never_unit: + try: + new_unit = self.never_unit[tokens[1].lower()] + never_unit = True + except KeyError: + return tokens + else: + if a := Automation.objects.annotate(param_1_lower=Lower('param_1')).filter(space=self.request.space, type=Automation.NEVER_UNIT, param_1_lower__in=[ + tokens[1].lower(), alt_unit.lower()], disabled=False).order_by('order').first(): + new_unit = a.param_2 + never_unit = True + + if never_unit: + tokens.insert(1, new_unit) + return tokens + + def apply_transpose_automation(self, string): + """ + If two words (param_1 & param_2) are detected in sequence, swap their position in the ingredient string + :param 1: first word to detect + :param 2: second word to detect + return: new ingredient string + """ + if self.use_cache and self.transpose_words is None: + self.transpose_words = {} + TRANSPOSE_WORDS_CACHE_KEY = f'automation_transpose_words_{self.request.space.pk}' + if c := caches['default'].get(TRANSPOSE_WORDS_CACHE_KEY, None): + self.transpose_words = c + caches['default'].touch(TRANSPOSE_WORDS_CACHE_KEY, 30) + else: + i = 0 + for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.TRANSPOSE_WORDS).only( + 'param_1', 'param_2').order_by('order').all()[:512]: + self.transpose_words[i] = [a.param_1.lower(), a.param_2.lower()] + i += 1 + caches['default'].set(TRANSPOSE_WORDS_CACHE_KEY, self.transpose_words, 30) + else: + self.transpose_words = {} + + tokens = [x.lower() for x in string.replace(',', ' ').split()] + if self.transpose_words: + for key, value in self.transpose_words.items(): + if value[0] in tokens and value[1] in tokens: + string = re.sub(rf"\b({value[0]})\W*({value[1]})\b", r"\2 \1", string, flags=re.IGNORECASE) + else: + for rule in Automation.objects.filter(space=self.request.space, type=Automation.TRANSPOSE_WORDS, disabled=False) \ + .annotate(param_1_lower=Lower('param_1'), param_2_lower=Lower('param_2')) \ + .filter(param_1_lower__in=tokens, param_2_lower__in=tokens).order_by('order')[:512]: + if rule.param_1 in tokens and rule.param_2 in tokens: + string = re.sub(rf"\b({rule.param_1})\W*({rule.param_2})\b", r"\2 \1", string, flags=re.IGNORECASE) + return string + + def apply_regex_replace_automation(self, string, automation_type): + # TODO add warning - maybe on SPACE page? when a max of 512 automations of a specific type is exceeded (ALIAS types excluded?) + """ + Replaces strings in a recipe field that are from a matched source + field_type are Automation.type that apply regex replacements + Automation.DESCRIPTION_REPLACE + Automation.INSTRUCTION_REPLACE + Automation.FOOD_REPLACE + Automation.UNIT_REPLACE + Automation.NAME_REPLACE + + regex replacment utilized the following fields from the Automation model + :param 1: source that should apply the automation in regex format ('.*' for all) + :param 2: regex pattern to match () + :param 3: replacement string (leave blank to delete) + return: new string + """ + if self.use_cache and self.regex_replace[automation_type] is None: + self.regex_replace[automation_type] = {} + REGEX_REPLACE_CACHE_KEY = f'automation_regex_replace_{self.request.space.pk}' + if c := caches['default'].get(REGEX_REPLACE_CACHE_KEY, None): + self.regex_replace[automation_type] = c[automation_type] + caches['default'].touch(REGEX_REPLACE_CACHE_KEY, 30) + else: + i = 0 + for a in Automation.objects.filter(space=self.request.space, disabled=False, type=automation_type).only( + 'param_1', 'param_2', 'param_3').order_by('order').all()[:512]: + self.regex_replace[automation_type][i] = [a.param_1, a.param_2, a.param_3] + i += 1 + caches['default'].set(REGEX_REPLACE_CACHE_KEY, self.regex_replace, 30) + else: + self.regex_replace[automation_type] = {} + + if self.regex_replace[automation_type]: + for rule in self.regex_replace[automation_type].values(): + if re.match(rule[0], (self.source)[:512]): + string = re.sub(rule[1], rule[2], string, flags=re.IGNORECASE) + else: + for rule in Automation.objects.filter(space=self.request.space, disabled=False, type=automation_type).only( + 'param_1', 'param_2', 'param_3').order_by('order').all()[:512]: + if re.match(rule.param_1, (self.source)[:512]): + string = re.sub(rule.param_2, rule.param_3, string, flags=re.IGNORECASE) + return string diff --git a/cookbook/helper/ingredient_parser.py b/cookbook/helper/ingredient_parser.py index 57b70f44..f944e416 100644 --- a/cookbook/helper/ingredient_parser.py +++ b/cookbook/helper/ingredient_parser.py @@ -2,22 +2,16 @@ import re import string import unicodedata -from django.core.cache import caches -from django.db.models import Q -from django.db.models.functions import Lower - -from cookbook.models import Automation, Food, Ingredient, Unit +from cookbook.helper.automation_helper import AutomationEngine +from cookbook.models import Food, Ingredient, Unit class IngredientParser: request = None ignore_rules = False - food_aliases = {} - unit_aliases = {} - never_unit = {} - transpose_words = {} + automation = None - def __init__(self, request, cache_mode, ignore_automations=False): + def __init__(self, request, cache_mode=True, ignore_automations=False): """ Initialize ingredient parser :param request: request context (to control caching, rule ownership, etc.) @@ -26,87 +20,8 @@ class IngredientParser: """ self.request = request self.ignore_rules = ignore_automations - if cache_mode: - FOOD_CACHE_KEY = f'automation_food_alias_{self.request.space.pk}' - if c := caches['default'].get(FOOD_CACHE_KEY, None): - self.food_aliases = c - caches['default'].touch(FOOD_CACHE_KEY, 30) - else: - for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.FOOD_ALIAS).only('param_1', 'param_2').order_by('order').all(): - self.food_aliases[a.param_1.lower()] = a.param_2 - caches['default'].set(FOOD_CACHE_KEY, self.food_aliases, 30) - - UNIT_CACHE_KEY = f'automation_unit_alias_{self.request.space.pk}' - if c := caches['default'].get(UNIT_CACHE_KEY, None): - self.unit_aliases = c - caches['default'].touch(UNIT_CACHE_KEY, 30) - else: - for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.UNIT_ALIAS).only('param_1', 'param_2').order_by('order').all(): - self.unit_aliases[a.param_1.lower()] = a.param_2 - caches['default'].set(UNIT_CACHE_KEY, self.unit_aliases, 30) - - NEVER_UNIT_CACHE_KEY = f'automation_never_unit_{self.request.space.pk}' - if c := caches['default'].get(NEVER_UNIT_CACHE_KEY, None): - self.never_unit = c - caches['default'].touch(NEVER_UNIT_CACHE_KEY, 30) - else: - for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.NEVER_UNIT).only('param_1', 'param_2').order_by('order').all(): - self.never_unit[a.param_1.lower()] = a.param_2 - caches['default'].set(NEVER_UNIT_CACHE_KEY, self.never_unit, 30) - - TRANSPOSE_WORDS_CACHE_KEY = f'automation_transpose_words_{self.request.space.pk}' - if c := caches['default'].get(TRANSPOSE_WORDS_CACHE_KEY, None): - self.transpose_words = c - caches['default'].touch(TRANSPOSE_WORDS_CACHE_KEY, 30) - else: - i = 0 - for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.TRANSPOSE_WORDS).only('param_1', 'param_2').order_by('order').all(): - self.transpose_words[i] = [a.param_1.lower(), a.param_2.lower()] - i += 1 - caches['default'].set(TRANSPOSE_WORDS_CACHE_KEY, self.transpose_words, 30) - else: - self.food_aliases = {} - self.unit_aliases = {} - self.never_unit = {} - self.transpose_words = {} - - def apply_food_automation(self, food): - """ - Apply food alias automations to passed food - :param food: unit as string - :return: food as string (possibly changed by automation) - """ - if self.ignore_rules: - return food - else: - if self.food_aliases: - try: - return self.food_aliases[food.lower()] - except KeyError: - return food - else: - if automation := Automation.objects.filter(space=self.request.space, type=Automation.FOOD_ALIAS, param_1__iexact=food, disabled=False).order_by('order').first(): - return automation.param_2 - return food - - def apply_unit_automation(self, unit): - """ - Apply unit alias automations to passed unit - :param unit: unit as string - :return: unit as string (possibly changed by automation) - """ - if self.ignore_rules: - return unit - else: - if self.transpose_words: - try: - return self.unit_aliases[unit.lower()] - except KeyError: - return unit - else: - if automation := Automation.objects.filter(space=self.request.space, type=Automation.UNIT_ALIAS, param_1__iexact=unit, disabled=False).order_by('order').first(): - return automation.param_2 - return unit + if not self.ignore_rules: + self.automation = AutomationEngine(self.request, use_cache=cache_mode) def get_unit(self, unit): """ @@ -117,7 +32,10 @@ class IngredientParser: if not unit: return None if len(unit) > 0: - u, created = Unit.objects.get_or_create(name=self.apply_unit_automation(unit), space=self.request.space) + if self.ignore_rules: + u, created = Unit.objects.get_or_create(name=unit.strip(), space=self.request.space) + else: + u, created = Unit.objects.get_or_create(name=self.automation.apply_unit_automation(unit), space=self.request.space) return u return None @@ -130,7 +48,10 @@ class IngredientParser: if not food: return None if len(food) > 0: - f, created = Food.objects.get_or_create(name=self.apply_food_automation(food), space=self.request.space) + if self.ignore_rules: + f, created = Food.objects.get_or_create(name=food.strip(), space=self.request.space) + else: + f, created = Food.objects.get_or_create(name=self.automation.apply_food_automation(food), space=self.request.space) return f return None @@ -232,67 +153,6 @@ class IngredientParser: food, note = self.parse_food_with_comma(tokens) return food, note - def apply_never_unit_automations(self, tokens): - """ - Moves a string that should never be treated as a unit to next token and optionally replaced with default unit - e.g. NEVER_UNIT: param1: egg, param2: None would modify ['1', 'egg', 'white'] to ['1', '', 'egg', 'white'] - or NEVER_UNIT: param1: egg, param2: pcs would modify ['1', 'egg', 'yolk'] to ['1', 'pcs', 'egg', 'yolk'] - :param1 string: string that should never be considered a unit, will be moved to token[2] - :param2 (optional) unit as string: will insert unit string into token[1] - :return: unit as string (possibly changed by automation) - """ - - if self.ignore_rules: - return tokens - - new_unit = None - alt_unit = self.apply_unit_automation(tokens[1]) - never_unit = False - if self.never_unit: - try: - new_unit = self.never_unit[tokens[1].lower()] - never_unit = True - except KeyError: - return tokens - - else: - if automation := Automation.objects.annotate(param_1_lower=Lower('param_1')).filter(space=self.request.space, type=Automation.NEVER_UNIT, param_1_lower__in=[ - tokens[1].lower(), alt_unit.lower()], disabled=False).order_by('order').first(): - new_unit = automation.param_2 - never_unit = True - - if never_unit: - tokens.insert(1, new_unit) - - return tokens - - def apply_transpose_words_automations(self, ingredient): - """ - If two words (param_1 & param_2) are detected in sequence, swap their position in the ingredient string - :param 1: first word to detect - :param 2: second word to detect - return: new ingredient string - """ - - if self.ignore_rules: - return ingredient - - else: - tokens = [x.lower() for x in ingredient.replace(',', ' ').split()] - if self.transpose_words: - filtered_rules = {} - for key, value in self.transpose_words.items(): - if value[0] in tokens and value[1] in tokens: - filtered_rules[key] = value - for k, v in filtered_rules.items(): - ingredient = re.sub(rf"\b({v[0]})\W*({v[1]})\b", r"\2 \1", ingredient, flags=re.IGNORECASE) - else: - for rule in Automation.objects.filter(space=self.request.space, type=Automation.TRANSPOSE_WORDS, disabled=False) \ - .annotate(param_1_lower=Lower('param_1'), param_2_lower=Lower('param_2')) \ - .filter(Q(Q(param_1_lower__in=tokens) | Q(param_2_lower__in=tokens))).order_by('order'): - ingredient = re.sub(rf"\b({rule.param_1})\W*({rule.param_1})\b", r"\2 \1", ingredient, flags=re.IGNORECASE) - return ingredient - def parse(self, ingredient): """ Main parsing function, takes an ingredient string (e.g. '1 l Water') and extracts amount, unit, food, ... @@ -333,7 +193,8 @@ class IngredientParser: if re.match('([0-9])+([A-z])+\\s', ingredient): ingredient = re.sub(r'(?<=([a-z])|\d)(?=(?(1)\d|[a-z]))', ' ', ingredient) - ingredient = self.apply_transpose_words_automations(ingredient) + if not self.ignore_rules: + ingredient = self.automation.apply_transpose_automation(ingredient) tokens = ingredient.split() # split at each space into tokens if len(tokens) == 1: @@ -347,7 +208,8 @@ class IngredientParser: # three arguments if it already has a unit there can't be # a fraction for the amount if len(tokens) > 2: - tokens = self.apply_never_unit_automations(tokens) + if not self.ignore_rules: + tokens = self.automation.apply_never_unit_automation(tokens) try: if unit is not None: # a unit is already found, no need to try the second argument for a fraction @@ -394,10 +256,11 @@ class IngredientParser: if unit_note not in note: note += ' ' + unit_note - if unit: - unit = self.apply_unit_automation(unit.strip()) + if unit and not self.ignore_rules: + unit = self.automation.apply_unit_automation(unit) - food = self.apply_food_automation(food.strip()) + if food and not self.ignore_rules: + food = self.automation.apply_food_automation(food) if len(food) > Food._meta.get_field('name').max_length: # test if food name is to long # try splitting it at a space and taking only the first arg if len(food.split()) > 1 and len(food.split()[0]) < Food._meta.get_field('name').max_length: diff --git a/cookbook/helper/recipe_html_import.py b/cookbook/helper/recipe_html_import.py deleted file mode 100644 index 95f115b7..00000000 --- a/cookbook/helper/recipe_html_import.py +++ /dev/null @@ -1,191 +0,0 @@ -# import json -# import re -# from json import JSONDecodeError -# from urllib.parse import unquote - -# from bs4 import BeautifulSoup -# from bs4.element import Tag -# from recipe_scrapers import scrape_html, scrape_me -# from recipe_scrapers._exceptions import NoSchemaFoundInWildMode -# from recipe_scrapers._utils import get_host_name, normalize_string - -# from cookbook.helper import recipe_url_import as helper -# from cookbook.helper.scrapers.scrapers import text_scraper - - -# def get_recipe_from_source(text, url, request): -# def build_node(k, v): -# if isinstance(v, dict): -# node = { -# 'name': k, -# 'value': k, -# 'children': get_children_dict(v) -# } -# elif isinstance(v, list): -# node = { -# 'name': k, -# 'value': k, -# 'children': get_children_list(v) -# } -# else: -# node = { -# 'name': k + ": " + normalize_string(str(v)), -# 'value': normalize_string(str(v)) -# } -# return node - -# def get_children_dict(children): -# kid_list = [] -# for k, v in children.items(): -# kid_list.append(build_node(k, v)) -# return kid_list - -# def get_children_list(children): -# kid_list = [] -# for kid in children: -# if type(kid) == list: -# node = { -# 'name': "unknown list", -# 'value': "unknown list", -# 'children': get_children_list(kid) -# } -# kid_list.append(node) -# elif type(kid) == dict: -# for k, v in kid.items(): -# kid_list.append(build_node(k, v)) -# else: -# kid_list.append({ -# 'name': normalize_string(str(kid)), -# 'value': normalize_string(str(kid)) -# }) -# return kid_list - -# recipe_tree = [] -# parse_list = [] -# soup = BeautifulSoup(text, "html.parser") -# html_data = get_from_html(soup) -# images = get_images_from_source(soup, url) -# text = unquote(text) -# scrape = None - -# if url and not text: -# try: -# scrape = scrape_me(url_path=url, wild_mode=True) -# except(NoSchemaFoundInWildMode): -# pass - -# if not scrape: -# try: -# parse_list.append(remove_graph(json.loads(text))) -# if not url and 'url' in parse_list[0]: -# url = parse_list[0]['url'] -# scrape = text_scraper("", url=url) - -# except JSONDecodeError: -# for el in soup.find_all('script', type='application/ld+json'): -# el = remove_graph(el) -# if not url and 'url' in el: -# url = el['url'] -# if type(el) == list: -# for le in el: -# parse_list.append(le) -# elif type(el) == dict: -# parse_list.append(el) -# for el in soup.find_all(type='application/json'): -# el = remove_graph(el) -# if type(el) == list: -# for le in el: -# parse_list.append(le) -# elif type(el) == dict: -# parse_list.append(el) -# scrape = text_scraper(text, url=url) - -# recipe_json = helper.get_from_scraper(scrape, request) - -# # TODO: DEPRECATE recipe_tree & html_data. first validate it isn't used anywhere -# for el in parse_list: -# temp_tree = [] -# if isinstance(el, Tag): -# try: -# el = json.loads(el.string) -# except TypeError: -# continue - -# for k, v in el.items(): -# if isinstance(v, dict): -# node = { -# 'name': k, -# 'value': k, -# 'children': get_children_dict(v) -# } -# elif isinstance(v, list): -# node = { -# 'name': k, -# 'value': k, -# 'children': get_children_list(v) -# } -# else: -# node = { -# 'name': k + ": " + normalize_string(str(v)), -# 'value': normalize_string(str(v)) -# } -# temp_tree.append(node) - -# if '@type' in el and el['@type'] == 'Recipe': -# recipe_tree += [{'name': 'ld+json', 'children': temp_tree}] -# else: -# recipe_tree += [{'name': 'json', 'children': temp_tree}] - -# return recipe_json, recipe_tree, html_data, images - - -# def get_from_html(soup): -# INVISIBLE_ELEMS = ('style', 'script', 'head', 'title') -# html = [] -# for s in soup.strings: -# if ((s.parent.name not in INVISIBLE_ELEMS) and (len(s.strip()) > 0)): -# html.append(s) -# return html - - -# def get_images_from_source(soup, url): -# sources = ['src', 'srcset', 'data-src'] -# images = [] -# img_tags = soup.find_all('img') -# if url: -# site = get_host_name(url) -# prot = url.split(':')[0] - -# urls = [] -# for img in img_tags: -# for src in sources: -# try: -# urls.append(img[src]) -# except KeyError: -# pass - -# for u in urls: -# u = u.split('?')[0] -# filename = re.search(r'/([\w_-]+[.](jpg|jpeg|gif|png))$', u) -# if filename: -# if (('http' not in u) and (url)): -# # sometimes an image source can be relative -# # if it is provide the base url -# u = '{}://{}{}'.format(prot, site, u) -# if 'http' in u: -# images.append(u) -# return images - - -# def remove_graph(el): -# # recipes type might be wrapped in @graph type -# if isinstance(el, Tag): -# try: -# el = json.loads(el.string) -# if '@graph' in el: -# for x in el['@graph']: -# if '@type' in x and x['@type'] == 'Recipe': -# el = x -# except (TypeError, JSONDecodeError): -# pass -# return el diff --git a/cookbook/helper/recipe_url_import.py b/cookbook/helper/recipe_url_import.py index b84c9f65..8794c01a 100644 --- a/cookbook/helper/recipe_url_import.py +++ b/cookbook/helper/recipe_url_import.py @@ -2,7 +2,6 @@ import re import traceback from html import unescape -from django.core.cache import caches from django.utils.dateparse import parse_duration from django.utils.translation import gettext as _ from isodate import parse_duration as iso_parse_duration @@ -10,13 +9,37 @@ from isodate.isoerror import ISO8601Error from pytube import YouTube from recipe_scrapers._utils import get_host_name, get_minutes +from cookbook.helper.automation_helper import AutomationEngine from cookbook.helper.ingredient_parser import IngredientParser from cookbook.models import Automation, Keyword, PropertyType def get_from_scraper(scrape, request): # converting the scrape_me object to the existing json format based on ld+json - recipe_json = {} + + recipe_json = { + 'steps': [], + 'internal': True + } + keywords = [] + + # assign source URL + try: + source_url = scrape.canonical_url() + except Exception: + try: + source_url = scrape.url + except Exception: + pass + if source_url: + recipe_json['source_url'] = source_url + try: + keywords.append(source_url.replace('http://', '').replace('https://', '').split('/')[0]) + except Exception: + recipe_json['source_url'] = '' + + automation_engine = AutomationEngine(request, source=recipe_json.get('source_url')) + # assign recipe name try: recipe_json['name'] = parse_name(scrape.title()[:128] or None) except Exception: @@ -30,6 +53,10 @@ def get_from_scraper(scrape, request): if isinstance(recipe_json['name'], list) and len(recipe_json['name']) > 0: recipe_json['name'] = recipe_json['name'][0] + recipe_json['name'] = automation_engine.apply_regex_replace_automation(recipe_json['name'], Automation.NAME_REPLACE) + + # assign recipe description + # TODO notify user about limit if reached - >256 description will be truncated try: description = scrape.description() or None except Exception: @@ -40,8 +67,10 @@ def get_from_scraper(scrape, request): except Exception: description = '' - recipe_json['internal'] = True + recipe_json['description'] = parse_description(description) + recipe_json['description'] = automation_engine.apply_regex_replace_automation(recipe_json['description'], Automation.DESCRIPTION_REPLACE) + # assign servings attributes try: # dont use scrape.yields() as this will always return "x servings" or "x items", should be improved in scrapers directly servings = scrape.schema.data.get('recipeYield') or 1 @@ -51,6 +80,7 @@ def get_from_scraper(scrape, request): recipe_json['servings'] = parse_servings(servings) recipe_json['servings_text'] = parse_servings_text(servings) + # assign time attributes try: recipe_json['working_time'] = get_minutes(scrape.prep_time()) or 0 except Exception: @@ -75,6 +105,7 @@ def get_from_scraper(scrape, request): except Exception: pass + # assign image try: recipe_json['image'] = parse_image(scrape.image()) or None except Exception: @@ -85,7 +116,7 @@ def get_from_scraper(scrape, request): except Exception: recipe_json['image'] = '' - keywords = [] + # assign keywords try: if scrape.schema.data.get("keywords"): keywords += listify_keywords(scrape.schema.data.get("keywords")) @@ -110,20 +141,6 @@ def get_from_scraper(scrape, request): except Exception: pass - try: - source_url = scrape.canonical_url() - except Exception: - try: - source_url = scrape.url - except Exception: - pass - if source_url: - recipe_json['source_url'] = source_url - try: - keywords.append(source_url.replace('http://', '').replace('https://', '').split('/')[0]) - except Exception: - recipe_json['source_url'] = '' - try: if scrape.author(): keywords.append(scrape.author()) @@ -131,13 +148,13 @@ def get_from_scraper(scrape, request): pass try: - recipe_json['keywords'] = parse_keywords(list(set(map(str.casefold, keywords))), request.space) + recipe_json['keywords'] = parse_keywords(list(set(map(str.casefold, keywords))), request) except AttributeError: recipe_json['keywords'] = keywords ingredient_parser = IngredientParser(request, True) - recipe_json['steps'] = [] + # assign steps try: for i in parse_instructions(scrape.instructions()): recipe_json['steps'].append({'instruction': i, 'ingredients': [], 'show_ingredients_table': request.user.userpreference.show_step_ingredients, }) @@ -146,25 +163,10 @@ def get_from_scraper(scrape, request): if len(recipe_json['steps']) == 0: recipe_json['steps'].append({'instruction': '', 'ingredients': [], }) - parsed_description = parse_description(description) - # TODO notify user about limit if reached - # limits exist to limit the attack surface for dos style attacks - automations = Automation.objects.filter( - type=Automation.DESCRIPTION_REPLACE, - space=request.space, - disabled=False).only( - 'param_1', - 'param_2', - 'param_3').all().order_by('order')[ - :512] - for a in automations: - if re.match(a.param_1, (recipe_json['source_url'])[:512]): - parsed_description = re.sub(a.param_2, a.param_3, parsed_description, count=1) - - if len(parsed_description) > 256: # split at 256 as long descriptions don't look good on recipe cards - recipe_json['steps'][0]['instruction'] = f'*{parsed_description}* \n\n' + recipe_json['steps'][0]['instruction'] + if len(recipe_json['description']) > 256: # split at 256 as long descriptions don't look good on recipe cards + recipe_json['steps'][0]['instruction'] = f"*{recipe_json['description']}* \n\n" + recipe_json['steps'][0]['instruction'] else: - recipe_json['description'] = parsed_description[:512] + recipe_json['description'] = recipe_json['description'][:512] try: for x in scrape.ingredients(): @@ -205,19 +207,9 @@ def get_from_scraper(scrape, request): traceback.print_exc() pass - if 'source_url' in recipe_json and recipe_json['source_url']: - automations = Automation.objects.filter( - type=Automation.INSTRUCTION_REPLACE, - space=request.space, - disabled=False).only( - 'param_1', - 'param_2', - 'param_3').order_by('order').all()[ - :512] - for a in automations: - if re.match(a.param_1, (recipe_json['source_url'])[:512]): - for s in recipe_json['steps']: - s['instruction'] = re.sub(a.param_2, a.param_3, s['instruction']) + for s in recipe_json['steps']: + s['instruction'] = automation_engine.apply_regex_replace_automation(s['instruction'], Automation.INSTRUCTION_REPLACE) + # re.sub(a.param_2, a.param_3, s['instruction']) return recipe_json @@ -267,11 +259,14 @@ def get_from_youtube_scraper(url, request): ] } + # TODO add automation here try: + automation_engine = AutomationEngine(request, source=url) video = YouTube(url=url) - default_recipe_json['name'] = video.title + default_recipe_json['name'] = automation_engine.apply_regex_replace_automation(video.title, Automation.NAME_REPLACE) default_recipe_json['image'] = video.thumbnail_url - default_recipe_json['steps'][0]['instruction'] = video.description + default_recipe_json['steps'][0]['instruction'] = automation_engine.apply_regex_replace_automation(video.description, Automation.INSTRUCTION_REPLACE) + except Exception: pass @@ -410,18 +405,19 @@ def parse_time(recipe_time): return recipe_time -def parse_keywords(keyword_json, space): +def parse_keywords(keyword_json, request): keywords = [] - keyword_aliases = {} + automation_engine = AutomationEngine(request) + # keyword_aliases = {} # retrieve keyword automation cache if it exists, otherwise build from database - KEYWORD_CACHE_KEY = f'automation_keyword_alias_{space.pk}' - if c := caches['default'].get(KEYWORD_CACHE_KEY, None): - keyword_aliases = c - caches['default'].touch(KEYWORD_CACHE_KEY, 30) - else: - for a in Automation.objects.filter(space=space, disabled=False, type=Automation.KEYWORD_ALIAS).only('param_1', 'param_2').order_by('order').all(): - keyword_aliases[a.param_1.lower()] = a.param_2 - caches['default'].set(KEYWORD_CACHE_KEY, keyword_aliases, 30) + # KEYWORD_CACHE_KEY = f'automation_keyword_alias_{space.pk}' + # if c := caches['default'].get(KEYWORD_CACHE_KEY, None): + # keyword_aliases = c + # caches['default'].touch(KEYWORD_CACHE_KEY, 30) + # else: + # for a in Automation.objects.filter(space=space, disabled=False, type=Automation.KEYWORD_ALIAS).only('param_1', 'param_2').order_by('order').all(): + # keyword_aliases[a.param_1.lower()] = a.param_2 + # caches['default'].set(KEYWORD_CACHE_KEY, keyword_aliases, 30) # keywords as list for kw in keyword_json: @@ -429,12 +425,13 @@ def parse_keywords(keyword_json, space): # if alias exists use that instead if len(kw) != 0: - if keyword_aliases: - try: - kw = keyword_aliases[kw.lower()] - except KeyError: - pass - if k := Keyword.objects.filter(name=kw, space=space).first(): + # if keyword_aliases: + # try: + # kw = keyword_aliases[kw.lower()] + # except KeyError: + # pass + automation_engine.apply_keyword_automation(kw) + if k := Keyword.objects.filter(name=kw, space=request.space).first(): keywords.append({'label': str(k), 'name': k.name, 'id': k.id}) else: keywords.append({'label': kw, 'name': kw}) diff --git a/cookbook/migrations/0199_alter_propertytype_options_alter_automation_type_and_more.py b/cookbook/migrations/0199_alter_propertytype_options_alter_automation_type_and_more.py index 56da9d2a..39734349 100644 --- a/cookbook/migrations/0199_alter_propertytype_options_alter_automation_type_and_more.py +++ b/cookbook/migrations/0199_alter_propertytype_options_alter_automation_type_and_more.py @@ -1,4 +1,4 @@ -# Generated by Django 4.1.10 on 2023-08-25 13:05 +# Generated by Django 4.1.10 on 2023-09-01 17:03 from django.db import migrations, models @@ -15,20 +15,16 @@ class Migration(migrations.Migration): name='type', field=models.CharField( choices=[ - ('FOOD_ALIAS', - 'Food Alias'), - ('UNIT_ALIAS', - 'Unit Alias'), - ('KEYWORD_ALIAS', - 'Keyword Alias'), - ('DESCRIPTION_REPLACE', - 'Description Replace'), - ('INSTRUCTION_REPLACE', - 'Instruction Replace'), - ('NEVER_UNIT', - 'Never Unit'), - ('TRANSPOSE_WORDS', - 'Transpose Words')], + ('FOOD_ALIAS', 'Food Alias'), + ('UNIT_ALIAS', 'Unit Alias'), + ('KEYWORD_ALIAS', 'Keyword Alias'), + ('DESCRIPTION_REPLACE', 'Description Replace'), + ('INSTRUCTION_REPLACE', 'Instruction Replace'), + ('NEVER_UNIT', 'Never Unit'), + ('TRANSPOSE_WORDS', 'Transpose Words'), + ('FOOD_REPLACE', 'Food Replace'), + ('UNIT_REPLACE', 'Unit Replace'), + ('NAME_REPLACE', 'Name Replace')], max_length=128), ), ] diff --git a/cookbook/models.py b/cookbook/models.py index 205845b1..f2869d5f 100644 --- a/cookbook/models.py +++ b/cookbook/models.py @@ -1314,11 +1314,23 @@ class Automation(ExportModelOperationsMixin('automations'), models.Model, Permis INSTRUCTION_REPLACE = 'INSTRUCTION_REPLACE' NEVER_UNIT = 'NEVER_UNIT' TRANSPOSE_WORDS = 'TRANSPOSE_WORDS' + FOOD_REPLACE = 'FOOD_REPLACE' + UNIT_REPLACE = 'UNIT_REPLACE' + NAME_REPLACE = 'NAME_REPLACE' type = models.CharField(max_length=128, - choices=((FOOD_ALIAS, _('Food Alias')), (UNIT_ALIAS, _('Unit Alias')), (KEYWORD_ALIAS, _('Keyword Alias')), - (DESCRIPTION_REPLACE, _('Description Replace')), (INSTRUCTION_REPLACE, _('Instruction Replace')), - (NEVER_UNIT, _('Never Unit')), (TRANSPOSE_WORDS, _('Transpose Words')),)) + choices=( + (FOOD_ALIAS, _('Food Alias')), + (UNIT_ALIAS, _('Unit Alias')), + (KEYWORD_ALIAS, _('Keyword Alias')), + (DESCRIPTION_REPLACE, _('Description Replace')), + (INSTRUCTION_REPLACE, _('Instruction Replace')), + (NEVER_UNIT, _('Never Unit')), + (TRANSPOSE_WORDS, _('Transpose Words')), + (FOOD_REPLACE, _('Food Replace')), + (UNIT_REPLACE, _('Unit Replace')), + (NAME_REPLACE, _('Name Replace')), + )) name = models.CharField(max_length=128, default='') description = models.TextField(blank=True, null=True) diff --git a/cookbook/tests/other/test_automations.py b/cookbook/tests/other/test_automations.py index df908fdb..48416caa 100644 --- a/cookbook/tests/other/test_automations.py +++ b/cookbook/tests/other/test_automations.py @@ -1,50 +1,173 @@ -import pytest -from django.contrib import auth -from django.urls import reverse -from django_scopes import scopes_disabled - -from cookbook.forms import ImportExportBase -from cookbook.helper.ingredient_parser import IngredientParser -from cookbook.models import ExportLog, Automation -import json import os import pytest -from django.urls import reverse +from django.contrib import auth +from django.test import RequestFactory +from django_scopes import scope -from cookbook.tests.conftest import validate_recipe +from cookbook.helper.automation_helper import AutomationEngine +from cookbook.helper.recipe_url_import import get_from_scraper +from cookbook.helper.scrapers.scrapers import text_scraper +from cookbook.models import Automation -IMPORT_SOURCE_URL = 'api_recipe_from_source' +DATA_DIR = "cookbook/tests/other/test_data/" -# for some reason this tests cant run due to some kind of encoding issue, needs to be fixed -# def test_description_replace_automation(u1_s1, space_1): -# if 'cookbook' in os.getcwd(): -# test_file = os.path.join(os.getcwd(), 'other', 'test_data', 'chefkoch2.html') -# else: -# test_file = os.path.join(os.getcwd(), 'cookbook', 'tests', 'other', 'test_data', 'chefkoch2.html') -# -# # original description -# # Brokkoli - Bratlinge. Über 91 Bewertungen und für vorzüglich befunden. Mit ► Portionsrechner ► Kochbuch ► Video-Tipps! Jetzt entdecken und ausprobieren! -# -# with scopes_disabled(): -# Automation.objects.create( -# name='test1', -# created_by=auth.get_user(u1_s1), -# space=space_1, -# param_1='.*', -# param_2='.*', -# param_3='', -# order=1000, -# ) -# -# with open(test_file, 'r', encoding='UTF-8') as d: -# response = u1_s1.post( -# reverse(IMPORT_SOURCE_URL), -# { -# 'data': d.read(), -# 'url': 'https://www.chefkoch.de/rezepte/804871184310070/Brokkoli-Bratlinge.html', -# }, -# content_type='application/json') -# recipe = json.loads(response.content)['recipe_json'] -# assert recipe['description'] == '' +@pytest.mark.parametrize("arg", [ + ['Match', True], + ['mAtCh', True], + ['No Match', False], + ['Màtch', False], +]) +def test_food_automation(u1_s1, arg): + target_name = "Matched Automation" + user = auth.get_user(u1_s1) + space = user.userspace_set.first().space + request = RequestFactory() + request.user = user + request.space = space + automation = AutomationEngine(request, False) + + with scope(space=space): + Automation.objects.get_or_create(name='food test', type=Automation.FOOD_ALIAS, param_1=arg[0], param_2=target_name, created_by=user, space=space) + assert (automation.apply_food_automation(arg[0]) == target_name) is True + + +@pytest.mark.parametrize("arg", [ + ['Match', True], + ['mAtCh', True], + ['No Match', False], + ['Màtch', False], +]) +def test_keyword_automation(u1_s1, arg): + target_name = "Matched Automation" + user = auth.get_user(u1_s1) + space = user.userspace_set.first().space + request = RequestFactory() + request.user = user + request.space = space + automation = AutomationEngine(request, False) + + with scope(space=space): + Automation.objects.get_or_create(name='keyword test', type=Automation.KEYWORD_ALIAS, param_1=arg[0], param_2=target_name, created_by=user, space=space) + assert (automation.apply_keyword_automation(arg[0]) == target_name) is True + + +@pytest.mark.parametrize("arg", [ + ['Match', True], + ['mAtCh', True], + ['No Match', False], + ['Màtch', False], +]) +def test_unit_automation(u1_s1, arg): + target_name = "Matched Automation" + user = auth.get_user(u1_s1) + space = user.userspace_set.first().space + request = RequestFactory() + request.user = user + request.space = space + automation = AutomationEngine(request, False) + + with scope(space=space): + Automation.objects.get_or_create(name='unit test', type=Automation.UNIT_ALIAS, param_1=arg[0], param_2=target_name, created_by=user, space=space) + assert (automation.apply_unit_automation(arg[0]) == target_name) is True + + +@pytest.mark.parametrize("arg", [ + [[1, 'egg', 'white'], '', [1, '', 'egg', 'white']], + [[1, 'Egg', 'white'], '', [1, '', 'Egg', 'white']], + [[1, 'êgg', 'white'], '', [1, 'êgg', 'white']], + [[1, 'egg', 'white'], 'whole', [1, 'whole', 'egg', 'white']], +]) +def test_never_unit_automation(u1_s1, arg): + user = auth.get_user(u1_s1) + space = user.userspace_set.first().space + request = RequestFactory() + request.user = user + request.space = space + automation = AutomationEngine(request, False) + + with scope(space=space): + Automation.objects.get_or_create(name='never unit test', type=Automation.NEVER_UNIT, param_1='egg', param_2=arg[1], created_by=user, space=space) + assert automation.apply_never_unit_automation(arg[0]) == arg[2] + + +@pytest.mark.parametrize("source", [ + ['.*', True], + ['.*allrecipes.*', True], + ['.*google.*', False], +]) +@pytest.mark.parametrize("arg", [ + [Automation.DESCRIPTION_REPLACE], + [Automation.INSTRUCTION_REPLACE], + [Automation.NAME_REPLACE], + [Automation.FOOD_REPLACE], + [Automation.UNIT_REPLACE], +]) +def test_regex_automation(u1_s1, arg, source): + user = auth.get_user(u1_s1) + space = user.userspace_set.first().space + request = RequestFactory() + request.user = user + request.space = space + automation = AutomationEngine(request, use_cache=False, source='https://www.allrecipes.com/recipe/24010/easy-chicken-marsala/') + middle = 'test_remove_phrase' + beginning = 'remove_test phrase' + fail = 'test remove_phrase' + target = 'test phrase' + + with scope(space=space): + Automation.objects.get_or_create(name='regex middle test', type=arg[0], param_1=source[0], param_2='_remove_', param_3=' ', created_by=user, space=space) + Automation.objects.get_or_create(name='regex beginning test', type=arg[0], param_1=source[0], param_2='^remove_', param_3='', created_by=user, space=space) + assert (automation.apply_regex_replace_automation(middle, arg[0]) == target) == source[1] + assert (automation.apply_regex_replace_automation(beginning, arg[0]) == target) == source[1] + assert (automation.apply_regex_replace_automation(fail, arg[0]) == target) == False + + +@pytest.mark.parametrize("arg", [ + ['second first', 'first second'], + ['longer string second first longer string', 'longer string first second longer string'], + ['second fails first', 'second fails first'], +]) +def test_transpose_automation(u1_s1, arg): + user = auth.get_user(u1_s1) + space = user.userspace_set.first().space + request = RequestFactory() + request.user = user + request.space = space + automation = AutomationEngine(request, False) + + with scope(space=space): + Automation.objects.get_or_create(name='transpose words test', type=Automation.TRANSPOSE_WORDS, param_1='second', param_2='first', created_by=user, space=space) + assert automation.apply_transpose_automation(arg[0]) == arg[1] + + +def test_url_import_regex_replace(u1_s1): + # TODO this does not test import with multiple steps - do any sites import with this pattern? It doesn't look like the url_importer supports it + user = auth.get_user(u1_s1) + space = user.userspace_set.first().space + request = RequestFactory() + request.user = user + request.space = space + recipe = 'regex_recipe.html' + types = [Automation.DESCRIPTION_REPLACE, Automation.INSTRUCTION_REPLACE, Automation.NAME_REPLACE, Automation.FOOD_REPLACE, Automation.UNIT_REPLACE] + find_text = "_remove" + target_text = "Test" + + if 'cookbook' in os.getcwd(): + test_file = os.path.join(os.getcwd(), 'other', 'test_data', recipe) + else: + test_file = os.path.join(os.getcwd(), 'cookbook', 'tests', 'other', 'test_data', recipe) + with open(test_file, 'r', encoding='UTF-8') as d: + scrape = text_scraper(text=d.read(), url="https://www.allrecipes.com") + with scope(space=space): + for t in types: + Automation.objects.get_or_create(name=t, type=t, param_1='.*', param_2=find_text, param_3='', created_by=user, space=space) + recipe_json = get_from_scraper(scrape, request) + assert recipe_json['name'] == target_text + assert recipe_json['description'] == target_text + assert recipe_json['steps'][0]['instruction'] == target_text + assert recipe_json['steps'][0]['ingredients'][0]['food']['name'] == target_text + assert recipe_json['steps'][0]['ingredients'][0]['food']['name'] == target_text + assert recipe_json['steps'][0]['ingredients'][1]['unit']['name'] == target_text + assert recipe_json['steps'][0]['ingredients'][1]['unit']['name'] == target_text diff --git a/cookbook/tests/other/test_data/regex_recipe.html b/cookbook/tests/other/test_data/regex_recipe.html new file mode 100644 index 00000000..b19fcb7f --- /dev/null +++ b/cookbook/tests/other/test_data/regex_recipe.html @@ -0,0 +1,38 @@ + + + + +
+