from flask import request from flask_jwt_extended.exceptions import NoAuthorizationError from flask_jwt_extended.utils import verify_token_claims from flask_jwt_extended.view_decorators import _decode_jwt_from_request from werkzeug.exceptions import Unauthorized from internal_lib.permission_parser import parse_permission, control_permission, is_admin from models.User import User from restapi.BaseAuthModel import BaseAuth class AuthApi(BaseAuth): def authorized(self): "" class AuthJWT(BaseAuth): user = None def authorized(self): try: jwt_data, jwt_header = _decode_jwt_from_request(request_type='access') verify_token_claims(jwt_data) self.user = User.objects.get(id=jwt_data['identity']) except Exception as e: self.set_error(e) return False return True def has_model_delete_permission(self, obj, model): if self.user is None: self.authorized() if model.__name__.lower() == "union": return False, obj if control_permission(self.user.user_group, model.__name__.lower(), "delete", str(obj.id), str(self.user.union)): return True, obj else: return False, obj def has_model_update_permission(self, obj, update: dict): model = obj.__class__.__name__.lower() if self.user is None: self.authorized() if update.get('id') or update.get('pk'): return False, update if not is_admin(self.user.user_group) and update.get('union'): return False, update if control_permission(self.user.user_group, model, "update", str(obj.id), str(self.user.union.id)): return True, update return False, update def has_model_read_permission(self, qs): from flask import current_app if self.user is None: self.authorized() unions = [] has_read = False for right in self.user.user_group.rights: permission = parse_permission(right) current_app.logger.info(permission) if permission.get('read'): has_read = True unions.append(permission.get('union')) if has_read: if qs._collection.name == "union": qs = qs.filter(id__in=unions, deleted=False) else: qs = qs.filter(union__in=unions, deleted=False) else: raise Unauthorized() return qs def has_model_write_permission(self, obj): model = obj.__class__.__name__.lower() if self.user is None: self.authorized() obj.union = self.user.union.id if control_permission(self.user.user_group, model, "write", str(obj.id), str(self.user.union.id)): return True, obj return False, obj