You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
Adunatio/internal_lib/AuthMethods.py

92 lines
2.9 KiB

from flask import request
from flask_jwt_extended.exceptions import NoAuthorizationError
from flask_jwt_extended.utils import verify_token_claims
from flask_jwt_extended.view_decorators import _decode_jwt_from_request
from werkzeug.exceptions import Unauthorized
from internal_lib.permission_parser import parse_permission, control_permission, is_admin
from models.User import User
from restapi.BaseAuthModel import BaseAuth
class AuthApi(BaseAuth):
def authorized(self):
""
class AuthJWT(BaseAuth):
user = None
def authorized(self):
try:
jwt_data, jwt_header = _decode_jwt_from_request(request_type='access')
verify_token_claims(jwt_data)
self.user = User.objects.get(id=jwt_data['identity'])
except Exception as e:
self.set_error(e)
return False
return True
def has_model_delete_permission(self, obj, model):
if self.user is None:
self.authorized()
if model.__name__.lower() == "union":
return False, obj
if control_permission(self.user.user_group, model.__name__.lower(), "delete", str(obj.id),
str(self.user.union)):
return True, obj
else:
return False, obj
def has_model_update_permission(self, obj, update: dict):
model = obj.__class__.__name__.lower()
if self.user is None:
self.authorized()
if update.get('id') or update.get('pk'):
return False, update
if not is_admin(self.user.user_group) and update.get('union'):
return False, update
if control_permission(self.user.user_group, model, "update", str(obj.id),
str(self.user.union.id)):
return True, update
return False, update
def has_model_read_permission(self, qs):
from flask import current_app
if self.user is None:
self.authorized()
unions = []
has_read = False
for right in self.user.user_group.rights:
permission = parse_permission(right)
current_app.logger.info(permission)
if permission.get('read'):
has_read = True
unions.append(permission.get('union'))
if has_read:
if qs._collection.name == "union":
qs = qs.filter(id__in=unions, deleted=False)
else:
qs = qs.filter(union__in=unions, deleted=False)
else:
raise Unauthorized()
return qs
def has_model_write_permission(self, obj):
model = obj.__class__.__name__.lower()
if self.user is None:
self.authorized()
obj.union = self.user.union.id
if control_permission(self.user.user_group, model, "write", str(obj.id),
str(self.user.union.id)):
return True, obj
return False, obj