TandoorRecipes/cookbook/helper/open_data_importer.py
2024-02-19 19:57:56 +01:00

495 lines
22 KiB
Python

import traceback
from collections import defaultdict
from decimal import Decimal
from cookbook.models import (Food, FoodProperty, Property, PropertyType, Supermarket,
SupermarketCategory, SupermarketCategoryRelation, Unit, UnitConversion)
import re
class OpenDataImportResponse:
total_created = 0
total_updated = 0
total_untouched = 0
total_errored = 0
def to_dict(self):
return {'total_created': self.total_created, 'total_updated': self.total_updated, 'total_untouched': self.total_untouched, 'total_errored': self.total_errored}
class OpenDataImporter:
request = None
data = {}
slug_id_cache = {}
update_existing = False
use_metric = True
def __init__(self, request, data, update_existing=False, use_metric=True):
self.request = request
self.data = data
self.update_existing = update_existing
self.use_metric = use_metric
def _update_slug_cache(self, object_class, datatype):
self.slug_id_cache[datatype] = dict(object_class.objects.filter(space=self.request.space, open_data_slug__isnull=False).values_list('open_data_slug', 'id', ))
@staticmethod
def _is_obj_identical(field_list, obj, existing_obj):
"""
checks if the obj meant for import is identical to an already existing one
:param field_list: list of field names to check
:type field_list: list[str]
:param obj: object meant for import
:type obj: Object
:param existing_obj: object already in DB
:type existing_obj: Object
:return: if objects are identical
:rtype: bool
"""
for field in field_list:
if isinstance(getattr(obj, field), float) or isinstance(getattr(obj, field), Decimal):
if abs(float(getattr(obj, field)) - float(existing_obj[field])) > 0.001: # convert both to float and check if basically equal
print(f'comparing FLOAT {obj} failed because field {field} is not equal ({getattr(obj, field)} != {existing_obj[field]})')
return False
elif getattr(obj, field) != existing_obj[field]:
print(f'comparing {obj} failed because field {field} is not equal ({getattr(obj, field)} != {existing_obj[field]})')
return False
return True
@staticmethod
def _merge_if_conflicting(model_type, obj, existing_data_slugs, existing_data_names):
"""
sometimes there might be two objects conflicting for open data import (one has the slug, the other the name)
this function checks if that is the case and merges the two objects if possible
:param model_type: type of model to check/merge
:type model_type: Model
:param obj: object that should be created/updated
:type obj: Model
:param existing_data_slugs: dict of open data slugs mapped to objects
:type existing_data_slugs: dict
:param existing_data_names: dict of names mapped to objects
:type existing_data_names: dict
:return: true if merge was successful or not necessary else false
:rtype: bool
"""
if obj.open_data_slug in existing_data_slugs and obj.name in existing_data_names and existing_data_slugs[obj.open_data_slug]['pk'] != existing_data_names[obj.name]['pk']:
try:
source_obj = model_type.objects.get(pk=existing_data_slugs[obj.open_data_slug]['pk'])
del existing_data_slugs[obj.open_data_slug]
source_obj.merge_into(model_type.objects.get(pk=existing_data_names[obj.name]['pk']))
return True
except RuntimeError:
return False # in the edge case (e.g. parent/child) that an object cannot be merged don't update it for now
else:
return True
@staticmethod
def _get_existing_obj(obj, existing_data_slugs, existing_data_names):
"""
gets the existing object from slug or name cache
:param obj: object that should be found
:type obj: Model
:param existing_data_slugs: dict of open data slugs mapped to objects
:type existing_data_slugs: dict
:param existing_data_names: dict of names mapped to objects
:type existing_data_names: dict
:return: existing object
:rtype: dict
"""
existing_obj = None
if obj.open_data_slug in existing_data_slugs:
existing_obj = existing_data_slugs[obj.open_data_slug]
elif obj.name in existing_data_names:
existing_obj = existing_data_names[obj.name]
return existing_obj
def import_units(self):
od_response = OpenDataImportResponse()
datatype = 'unit'
model_type = Unit
field_list = ['name', 'plural_name', 'base_unit', 'open_data_slug']
existing_data_slugs = {}
existing_data_names = {}
for obj in model_type.objects.filter(space=self.request.space).values('pk', *field_list):
existing_data_slugs[obj['open_data_slug']] = obj
existing_data_names[obj['name']] = obj
update_list = []
create_list = []
for u in list(self.data[datatype].keys()):
obj = model_type(
name=self.data[datatype][u]['name'],
plural_name=self.data[datatype][u]['plural_name'],
base_unit=self.data[datatype][u]['base_unit'].lower() if self.data[datatype][u]['base_unit'] != '' else None,
open_data_slug=u,
space=self.request.space
)
if obj.open_data_slug in existing_data_slugs or obj.name in existing_data_names:
if not self._merge_if_conflicting(model_type, obj, existing_data_slugs, existing_data_names):
od_response.total_errored += 1
continue # if conflicting objects exist and cannot be merged skip object
existing_obj = self._get_existing_obj(obj, existing_data_slugs, existing_data_names)
if not self._is_obj_identical(field_list, obj, existing_obj):
obj.pk = existing_obj['pk']
update_list.append(obj)
else:
od_response.total_untouched += 1
else:
create_list.append(obj)
if self.update_existing and len(update_list) > 0:
model_type.objects.bulk_update(update_list, field_list)
od_response.total_updated += len(update_list)
if len(create_list) > 0:
model_type.objects.bulk_create(create_list, update_conflicts=True, update_fields=field_list, unique_fields=('space', 'name',))
od_response.total_created += len(create_list)
return od_response
def import_category(self):
od_response = OpenDataImportResponse()
datatype = 'category'
model_type = SupermarketCategory
field_list = ['name', 'open_data_slug']
existing_data_slugs = {}
existing_data_names = {}
for obj in model_type.objects.filter(space=self.request.space).values('pk', *field_list):
existing_data_slugs[obj['open_data_slug']] = obj
existing_data_names[obj['name']] = obj
update_list = []
create_list = []
for k in list(self.data[datatype].keys()):
obj = model_type(
name=self.data[datatype][k]['name'],
open_data_slug=k,
space=self.request.space
)
if obj.open_data_slug in existing_data_slugs or obj.name in existing_data_names:
if not self._merge_if_conflicting(model_type, obj, existing_data_slugs, existing_data_names):
od_response.total_errored += 1
continue # if conflicting objects exist and cannot be merged skip object
existing_obj = self._get_existing_obj(obj, existing_data_slugs, existing_data_names)
if not self._is_obj_identical(field_list, obj, existing_obj):
obj.pk = existing_obj['pk']
update_list.append(obj)
else:
od_response.total_untouched += 1
else:
create_list.append(obj)
if self.update_existing and len(update_list) > 0:
model_type.objects.bulk_update(update_list, field_list)
od_response.total_updated += len(update_list)
if len(create_list) > 0:
model_type.objects.bulk_create(create_list, update_conflicts=True, update_fields=field_list, unique_fields=('space', 'name',))
od_response.total_created += len(create_list)
return od_response
def import_property(self):
od_response = OpenDataImportResponse()
datatype = 'property'
model_type = PropertyType
field_list = ['name', 'unit', 'fdc_id', 'open_data_slug']
existing_data_slugs = {}
existing_data_names = {}
for obj in model_type.objects.filter(space=self.request.space).values('pk', *field_list):
existing_data_slugs[obj['open_data_slug']] = obj
existing_data_names[obj['name']] = obj
update_list = []
create_list = []
for k in list(self.data[datatype].keys()):
obj = model_type(
name=self.data[datatype][k]['name'],
unit=self.data[datatype][k]['unit'],
fdc_id=self.data[datatype][k]['fdc_id'],
open_data_slug=k,
space=self.request.space
)
if obj.open_data_slug in existing_data_slugs or obj.name in existing_data_names:
if not self._merge_if_conflicting(model_type, obj, existing_data_slugs, existing_data_names):
od_response.total_errored += 1
continue # if conflicting objects exist and cannot be merged skip object
existing_obj = self._get_existing_obj(obj, existing_data_slugs, existing_data_names)
if not self._is_obj_identical(field_list, obj, existing_obj):
obj.pk = existing_obj['pk']
update_list.append(obj)
else:
od_response.total_untouched += 1
else:
create_list.append(obj)
if self.update_existing and len(update_list) > 0:
model_type.objects.bulk_update(update_list, field_list)
od_response.total_updated += len(update_list)
if len(create_list) > 0:
model_type.objects.bulk_create(create_list, update_conflicts=True, update_fields=field_list, unique_fields=('space', 'name',))
od_response.total_created += len(create_list)
return od_response
def import_supermarket(self):
od_response = OpenDataImportResponse()
datatype = 'store'
model_type = Supermarket
field_list = ['name', 'open_data_slug']
existing_data_slugs = {}
existing_data_names = {}
for obj in model_type.objects.filter(space=self.request.space).values('pk', *field_list):
existing_data_slugs[obj['open_data_slug']] = obj
existing_data_names[obj['name']] = obj
update_list = []
create_list = []
self._update_slug_cache(SupermarketCategory, 'category')
for k in list(self.data[datatype].keys()):
obj = model_type(
name=self.data[datatype][k]['name'],
open_data_slug=k,
space=self.request.space
)
if obj.open_data_slug in existing_data_slugs or obj.name in existing_data_names:
if not self._merge_if_conflicting(model_type, obj, existing_data_slugs, existing_data_names):
od_response.total_errored += 1
continue # if conflicting objects exist and cannot be merged skip object
existing_obj = self._get_existing_obj(obj, existing_data_slugs, existing_data_names)
if not self._is_obj_identical(field_list, obj, existing_obj):
obj.pk = existing_obj['pk']
update_list.append(obj)
else:
od_response.total_untouched += 1
else:
create_list.append(obj)
if self.update_existing and len(update_list) > 0:
model_type.objects.bulk_update(update_list, field_list)
od_response.total_updated += len(update_list)
if len(create_list) > 0:
model_type.objects.bulk_create(create_list, update_conflicts=True, update_fields=field_list, unique_fields=('space', 'name',))
od_response.total_created += len(create_list)
# always add open data slug if matching supermarket is found, otherwise relation might fail
self._update_slug_cache(Supermarket, 'store')
for k in list(self.data[datatype].keys()):
relations = []
order = 0
for c in self.data[datatype][k]['categories']:
relations.append(
SupermarketCategoryRelation(
supermarket_id=self.slug_id_cache[datatype][k],
category_id=self.slug_id_cache['category'][c],
order=order,
)
)
order += 1
SupermarketCategoryRelation.objects.bulk_create(relations, ignore_conflicts=True, unique_fields=('supermarket', 'category',))
return od_response
def import_food(self):
od_response = OpenDataImportResponse()
datatype = 'food'
model_type = Food
field_list = ['name', 'open_data_slug']
existing_data_slugs = {}
existing_data_names = {}
for obj in model_type.objects.filter(space=self.request.space).values('pk', *field_list):
existing_data_slugs[obj['open_data_slug']] = obj
existing_data_names[obj['name']] = obj
update_list = []
create_list = []
self._update_slug_cache(Unit, 'unit')
self._update_slug_cache(PropertyType, 'property')
self._update_slug_cache(SupermarketCategory, 'category')
unit_g = Unit.objects.filter(space=self.request.space, base_unit__iexact='g').first()
for k in list(self.data[datatype].keys()):
obj_dict = {
'name': self.data[datatype][k]['name'],
'plural_name': self.data[datatype][k]['plural_name'] if self.data[datatype][k]['plural_name'] != '' else None,
'supermarket_category_id': self.slug_id_cache['category'][self.data[datatype][k]['store_category']],
'fdc_id': re.sub(r'\D', '', self.data[datatype][k]['fdc_id']) if self.data[datatype][k]['fdc_id'] != '' else None,
'open_data_slug': k,
'properties_food_unit_id': None,
'space_id': self.request.space.id,
}
if unit_g:
obj_dict['properties_food_unit_id'] = unit_g.id
obj = model_type(**obj_dict)
if obj.open_data_slug in existing_data_slugs or obj.name in existing_data_names:
if not self._merge_if_conflicting(model_type, obj, existing_data_slugs, existing_data_names):
od_response.total_errored += 1
continue # if conflicting objects exist and cannot be merged skip object
existing_obj = self._get_existing_obj(obj, existing_data_slugs, existing_data_names)
if not self._is_obj_identical(field_list, obj, existing_obj):
obj.pk = existing_obj['pk']
update_list.append(obj)
else:
od_response.total_untouched += 1
else:
create_list.append({'data': obj_dict})
if self.update_existing and len(update_list) > 0:
model_type.objects.bulk_update(update_list, field_list)
od_response.total_updated += len(update_list)
if len(create_list) > 0:
Food.load_bulk(create_list, None)
od_response.total_created += len(create_list)
# --------------- PROPERTY STUFF -----------------------
model_type = Property
field_list = ['property_type_id', 'property_amount', 'open_data_food_slug']
existing_data_slugs = {}
existing_data_property_types = {}
for obj in model_type.objects.filter(space=self.request.space).values('pk', *field_list):
existing_data_slugs[obj['open_data_food_slug']] = obj
existing_data_property_types[obj['property_type_id']] = obj
update_list = []
create_list = []
self._update_slug_cache(Food, 'food')
for k in list(self.data['food'].keys()):
for fp in self.data['food'][k]['properties']['type_values']:
obj = model_type(
property_type_id=self.slug_id_cache['property'][fp['property_type']],
property_amount=fp['property_value'],
open_data_food_slug=k,
space=self.request.space,
)
if obj.open_data_food_slug in existing_data_slugs and obj.property_type_id in existing_data_property_types and existing_data_slugs[obj.open_data_food_slug] == existing_data_property_types[obj.property_type_id]:
existing_obj = existing_data_slugs[obj.open_data_food_slug]
if not self._is_obj_identical(field_list, obj, existing_obj):
obj.pk = existing_obj['pk']
update_list.append(obj)
else:
create_list.append(obj)
if self.update_existing and len(update_list) > 0:
model_type.objects.bulk_update(update_list, field_list)
if len(create_list) > 0:
model_type.objects.bulk_create(create_list, ignore_conflicts=True, unique_fields=('space', 'open_data_food_slug', 'property_type',))
linked_properties = list(model_type.objects.filter(space=self.request.space).values_list('id', flat=True).all())
property_food_relation_list = []
for p in model_type.objects.filter(space=self.request.space, open_data_food_slug__isnull=False).values_list('open_data_food_slug', 'id', ):
# slug_id_cache should always exist, don't create relations for already linked properties (ignore_conflicts would do that as well but this is more performant)
if p[0] in self.slug_id_cache['food'] and p[1] not in linked_properties:
property_food_relation_list.append(Food.properties.through(food_id=self.slug_id_cache['food'][p[0]], property_id=p[1]))
FoodProperty.objects.bulk_create(property_food_relation_list, ignore_conflicts=True, unique_fields=('food_id', 'property_id',))
return od_response
def import_conversion(self):
od_response = OpenDataImportResponse()
datatype = 'conversion'
model_type = UnitConversion
field_list = ['base_amount', 'base_unit_id', 'converted_amount', 'converted_unit_id', 'food_id', 'open_data_slug']
self._update_slug_cache(Food, 'food')
self._update_slug_cache(Unit, 'unit')
existing_data_slugs = {}
existing_data_foods = defaultdict(list)
for obj in model_type.objects.filter(space=self.request.space).values('pk', *field_list):
existing_data_slugs[obj['open_data_slug']] = obj
existing_data_foods[obj['food_id']].append(obj)
update_list = []
create_list = []
for k in list(self.data[datatype].keys()):
# try catch here because sometimes key "k" is not set for the food cache
try:
obj = model_type(
base_amount=Decimal(self.data[datatype][k]['base_amount']),
base_unit_id=self.slug_id_cache['unit'][self.data[datatype][k]['base_unit']],
converted_amount=Decimal(self.data[datatype][k]['converted_amount']),
converted_unit_id=self.slug_id_cache['unit'][self.data[datatype][k]['converted_unit']],
food_id=self.slug_id_cache['food'][self.data[datatype][k]['food']],
open_data_slug=k,
space=self.request.space,
created_by_id=self.request.user.id,
)
if obj.open_data_slug in existing_data_slugs:
existing_obj = existing_data_slugs[obj.open_data_slug]
if not self._is_obj_identical(field_list, obj, existing_obj):
obj.pk = existing_obj['pk']
update_list.append(obj)
else:
od_response.total_untouched += 1
else:
matching_existing_found = False
if obj.food_id in existing_data_foods:
for edf in existing_data_foods[obj.food_id]:
if obj.base_unit_id == edf['base_unit_id'] and obj.converted_unit_id == edf['converted_unit_id']:
matching_existing_found = True
if not self._is_obj_identical(field_list, obj, edf):
obj.pk = edf['pk']
update_list.append(obj)
else:
od_response.total_untouched += 1
if not matching_existing_found:
create_list.append(obj)
except KeyError as e:
traceback.print_exc()
od_response.total_errored += 1
print(self.data[datatype][k]['food'] + ' is not in self.slug_id_cache["food"]')
if self.update_existing and len(update_list) > 0:
od_response.total_updated = model_type.objects.bulk_update(update_list, field_list)
od_response.total_errored += len(update_list) - od_response.total_updated
if len(create_list) > 0:
objs_created = model_type.objects.bulk_create(create_list, ignore_conflicts=True, unique_fields=('space', 'base_unit', 'converted_unit', 'food', 'open_data_slug'))
od_response.total_created = len(objs_created)
od_response.total_errored += len(create_list) - od_response.total_created
return od_response