from bson import ObjectId from flask import request from mongoengine import Document, ObjectIdField, ListField, \ EmbeddedDocumentField, ReferenceField, ImageField, FileField, DecimalField, QuerySet from base64 import b64encode from mongoengine.base import BaseField class Resource: model: Document = None qs: dict = {} eqs: dict = {} build_reference: bool = False base64_image: bool = True base64_image_as_full: bool = False limit: int = 10 offset: int = 0 fields: list = [] ignore_fields: list = [] mask_fields: dict = {} meta: dict = {} as_pk: str = 'id' query_document: Document = None mongo_qs = None def __init__(self, model: Document): self.model = model self.limit = 10 if 'no_image' in request.args: self.base64_image = False if 'with_sub_docs' in request.args: self.build_reference = request.args.get('with_sub_docs') if 'thumb' in request.args: self.base64_image = True self.base64_image_as_full = False if 'full_image' in request.args: self.base64_image = True self.base64_image_as_full = True if 'limit' in request.args: self.limit = int(request.args.get('limit')) if 'fields' in request.args: self.fields = request.args.get('fields') if 'offset' in request.args: self.offset = int(request.args.get('offset')) self.meta = getattr(self.model, '_meta') if 'ignore_fields' in self.meta: self.ignore_fields = self.meta.get('ignore_fields') if 'with_sub_docs' in self.meta: self.build_reference = self.meta.get('with_sub_docs') if 'mask_fields' in self.meta: self.mask_fields = self.meta.get('mask_fields') if 'document' in self.meta: self.model = self.meta.get('document') if 'query' in self.meta: self.qs = self.meta.get('query') if 'as_pk' in self.meta: self.as_pk = self.meta.get('as_pk') if 'query_document' in self.meta: self.query_document = self.meta.get('query_document') def external_query(self, qs: dict): self.eqs = qs def query(self, qs: dict) -> None: self.qs = qs def to_qs(self, pk: str = None) -> tuple: query = {} if self.qs: for key, val in self.qs.items(): if callable(val): query.update({key: val()}) else: query.update({key: val}) if self.query_document: query = {} if self.eqs: for key, val in self.eqs.items(): if callable(val): query.update({key: val()}) else: query.update({key: val}) data = self.query_document.objects.filter(**query) field_name = "" for i in self.model._fields_ordered: try: if isinstance(getattr(self.model, i).document_type, self.query_document.__class__): field_name = i + "__in" except: pass query.clear() query = {field_name: [i.id for i in data]} for key, val in self.qs.items(): if callable(val): query.update({key: val()}) else: query.update({key: val}) print(query) data = self.model.objects.filter(**query) if pk: data.get(**{self.as_pk: pk}) return data def to_json(self, pk: str = None, qs=None) -> tuple: if qs is None: if self.mongo_qs is None: self.mongo_qs = self.to_qs(pk) else: self.mongo_qs = qs data = self.mongo_qs count = data.count() if pk: json_data = self.parse_fields(self.mongo_qs.get(**{self.as_pk:pk}),self.model) else: data = data[self.offset:self.limit + self.offset] json_data = [] for item in data: item_data = self.parse_fields(item, self.model) json_data.append(item_data) return count, json_data def parse_field(self, field: BaseField, value: any) -> any: if isinstance(field, ObjectIdField): return str(value) elif isinstance(field, ReferenceField): if self.build_reference: return self.parse_fields(value, field.document_type) if value: return str(value.id) else: return value elif isinstance(field, ImageField): if self.base64_image and value: if self.base64_image_as_full: file = b64encode(value.read()).decode("utf-8") return { "size": value.size, "upload_date": value.gridout.upload_date, "format": value.format, "base64": file } else: file = b64encode(value.thumbnail.read()).decode("utf-8") return { "upload_date": value.gridout.upload_date, "format": value.format, "base64": file } else: if value: return { "size": value.size } else: return None elif isinstance(field, DecimalField): return float(value) elif isinstance(field, FileField): return "file" elif isinstance(field, ListField): if isinstance(field.field, EmbeddedDocumentField): return [self.parse_fields(item, field.field.document_type) for item in value] elif isinstance(field.field, ReferenceField) and self.build_reference: return [self.parse_fields(item, field.field.document_type) for item in value] else: return [self.parse_field(field.field, item) for item in value] elif isinstance(value, QuerySet): if value.count() > 0: return [self.parse_fields(i, value._document) for i in value] else: return value def parse_fields(self, item: Document, model: Document) -> dict: parsed_item = {} fields = list(getattr(model, "_fields_ordered")) if self.model != model: in_meta = getattr(model,'_meta') if in_meta.get('ignore_fields'): self.ignore_fields += in_meta.get('ignore_fields') for key, val in model.__dict__.items(): if isinstance(getattr(model, key), property): fields.append(key) for i in fields: if i not in self.ignore_fields: field = getattr(model, i) if i not in self.mask_fields: if not item is None: value = getattr(item, i) else: value = None else: value = getattr(item, i) if self.mask_fields.get(i) == 'all': value = '*************' elif self.mask_fields.get(i) == 'email': value = '{}****@{}***.{}'.format(value[0], value.split("@")[1][0], '.'.join(value.split("@")[1].split(".")[1:])) else: value = '*************' if not value is None: parsed_item.update({i: self.parse_field(field, value)}) else: parsed_item.update({i: None}) return parsed_item