You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
flask-mongorester/resource.py

216 lines
7.8 KiB

from bson import ObjectId
from flask import request
from mongoengine import Document, ObjectIdField, ListField, \
EmbeddedDocumentField, ReferenceField, ImageField, FileField, DecimalField, QuerySet
from base64 import b64encode
from mongoengine.base import BaseField
class Resource:
model: Document = None
qs: dict = {}
eqs: dict = {}
build_reference: bool = False
base64_image: bool = True
base64_image_as_full: bool = False
limit: int = 10
offset: int = 0
fields: list = []
ignore_fields: list = []
mask_fields: dict = {}
meta: dict = {}
as_pk: str = 'id'
query_document: Document = None
mongo_qs = None
def __init__(self, model: Document):
self.model = model
self.limit = 10
if 'no_image' in request.args:
self.base64_image = False
if 'with_sub_docs' in request.args:
self.build_reference = request.args.get('with_sub_docs')
if 'thumb' in request.args:
self.base64_image = True
self.base64_image_as_full = False
if 'full_image' in request.args:
self.base64_image = True
self.base64_image_as_full = True
if 'limit' in request.args:
self.limit = int(request.args.get('limit'))
if 'fields' in request.args:
self.fields = request.args.get('fields')
if 'offset' in request.args:
self.offset = int(request.args.get('offset'))
self.meta = getattr(self.model, '_meta')
if 'ignore_fields' in self.meta:
self.ignore_fields = self.meta.get('ignore_fields')
if 'with_sub_docs' in self.meta:
self.build_reference = self.meta.get('with_sub_docs')
if 'mask_fields' in self.meta:
self.mask_fields = self.meta.get('mask_fields')
if 'document' in self.meta:
self.model = self.meta.get('document')
if 'query' in self.meta:
self.qs = self.meta.get('query')
if 'as_pk' in self.meta:
self.as_pk = self.meta.get('as_pk')
if 'query_document' in self.meta:
self.query_document = self.meta.get('query_document')
def external_query(self, qs: dict):
self.eqs = qs
def query(self, qs: dict) -> None:
self.qs = qs
def to_qs(self, pk: str = None) -> tuple:
query = {}
if self.qs:
for key, val in self.qs.items():
if callable(val):
query.update({key: val()})
else:
query.update({key: val})
if self.query_document:
query = {}
if self.eqs:
for key, val in self.eqs.items():
if callable(val):
query.update({key: val()})
else:
query.update({key: val})
data = self.query_document.objects.filter(**query)
field_name = ""
for i in self.model._fields_ordered:
try:
if isinstance(getattr(self.model, i).document_type, self.query_document.__class__):
field_name = i + "__in"
except:
pass
query.clear()
query = {field_name: [i.id for i in data]}
for key, val in self.qs.items():
if callable(val):
query.update({key: val()})
else:
query.update({key: val})
print(query)
data = self.model.objects.filter(**query)
if pk:
data.get(**{self.as_pk: pk})
return data
def to_json(self, pk: str = None, qs=None) -> tuple:
if qs is None:
if self.mongo_qs is None:
self.mongo_qs = self.to_qs(pk)
else:
self.mongo_qs = qs
data = self.mongo_qs
count = data.count()
if pk:
json_data = self.parse_fields(self.mongo_qs.get(**{self.as_pk:pk}),self.model)
else:
data = data[self.offset:self.limit + self.offset]
json_data = []
for item in data:
item_data = self.parse_fields(item, self.model)
json_data.append(item_data)
return count, json_data
def parse_field(self, field: BaseField, value: any) -> any:
if isinstance(field, ObjectIdField):
return str(value)
elif isinstance(field, ReferenceField):
if self.build_reference:
return self.parse_fields(value, field.document_type)
if value:
return str(value.id)
else:
return value
elif isinstance(field, ImageField):
if self.base64_image and value:
if self.base64_image_as_full:
file = b64encode(value.read()).decode("utf-8")
return {
"size": value.size,
"upload_date": value.gridout.upload_date,
"format": value.format,
"base64": file
}
else:
file = b64encode(value.thumbnail.read()).decode("utf-8")
return {
"upload_date": value.gridout.upload_date,
"format": value.format,
"base64": file
}
else:
if value:
return {
"size": value.size
}
else:
return None
elif isinstance(field, DecimalField):
return float(value)
elif isinstance(field, FileField):
return "file"
elif isinstance(field, ListField):
if isinstance(field.field, EmbeddedDocumentField):
return [self.parse_fields(item, field.field.document_type) for item in value]
elif isinstance(field.field, ReferenceField) and self.build_reference:
return [self.parse_fields(item, field.field.document_type) for item in value]
else:
return [self.parse_field(field.field, item) for item in value]
elif isinstance(value, QuerySet):
if value.count() > 0:
return [self.parse_fields(i, value._document) for i in value]
else:
return value
def parse_fields(self, item: Document, model: Document) -> dict:
parsed_item = {}
fields = list(getattr(model, "_fields_ordered"))
if self.model != model:
in_meta = getattr(model,'_meta')
if in_meta.get('ignore_fields'):
self.ignore_fields += in_meta.get('ignore_fields')
for key, val in model.__dict__.items():
if isinstance(getattr(model, key), property):
fields.append(key)
for i in fields:
if i not in self.ignore_fields:
field = getattr(model, i)
if i not in self.mask_fields:
if not item is None:
value = getattr(item, i)
else:
value = None
else:
value = getattr(item, i)
if self.mask_fields.get(i) == 'all':
value = '*************'
elif self.mask_fields.get(i) == 'email':
value = '{}****@{}***.{}'.format(value[0], value.split("@")[1][0],
'.'.join(value.split("@")[1].split(".")[1:]))
else:
value = '*************'
if not value is None:
parsed_item.update({i: self.parse_field(field, value)})
else:
parsed_item.update({i: None})
return parsed_item