You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
92 lines
2.9 KiB
92 lines
2.9 KiB
from flask import request
|
|
from flask_jwt_extended.exceptions import NoAuthorizationError
|
|
from flask_jwt_extended.utils import verify_token_claims
|
|
from flask_jwt_extended.view_decorators import _decode_jwt_from_request
|
|
from werkzeug.exceptions import Unauthorized
|
|
|
|
from internal_lib.permission_parser import parse_permission, control_permission, is_admin
|
|
from models.User import User
|
|
from restapi.BaseAuthModel import BaseAuth
|
|
|
|
|
|
class AuthApi(BaseAuth):
|
|
def authorized(self):
|
|
""
|
|
|
|
|
|
class AuthJWT(BaseAuth):
|
|
user = None
|
|
|
|
def authorized(self):
|
|
try:
|
|
jwt_data, jwt_header = _decode_jwt_from_request(request_type='access')
|
|
verify_token_claims(jwt_data)
|
|
self.user = User.objects.get(id=jwt_data['identity'])
|
|
except Exception as e:
|
|
self.set_error(e)
|
|
return False
|
|
|
|
return True
|
|
|
|
def has_model_delete_permission(self, obj, model):
|
|
if self.user is None:
|
|
self.authorized()
|
|
|
|
if model.__name__.lower() == "union":
|
|
return False, obj
|
|
if control_permission(self.user.user_group, model.__name__.lower(), "delete", str(obj.id),
|
|
str(self.user.union)):
|
|
return True, obj
|
|
else:
|
|
return False, obj
|
|
|
|
def has_model_update_permission(self, obj, update: dict):
|
|
model = obj.__class__.__name__.lower()
|
|
|
|
if self.user is None:
|
|
self.authorized()
|
|
if update.get('id') or update.get('pk'):
|
|
return False, update
|
|
if not is_admin(self.user.user_group) and update.get('union'):
|
|
return False, update
|
|
if control_permission(self.user.user_group, model, "update", str(obj.id),
|
|
str(self.user.union.id)):
|
|
return True, update
|
|
|
|
return False, update
|
|
|
|
def has_model_read_permission(self, qs):
|
|
from flask import current_app
|
|
if self.user is None:
|
|
self.authorized()
|
|
unions = []
|
|
has_read = False
|
|
for right in self.user.user_group.rights:
|
|
permission = parse_permission(right)
|
|
current_app.logger.info(permission)
|
|
if permission.get('read'):
|
|
has_read = True
|
|
unions.append(permission.get('union'))
|
|
|
|
if has_read:
|
|
if qs._collection.name == "union":
|
|
qs = qs.filter(id__in=unions, deleted=False)
|
|
else:
|
|
qs = qs.filter(union__in=unions, deleted=False)
|
|
else:
|
|
raise Unauthorized()
|
|
|
|
return qs
|
|
|
|
def has_model_write_permission(self, obj):
|
|
model = obj.__class__.__name__.lower()
|
|
|
|
if self.user is None:
|
|
self.authorized()
|
|
|
|
obj.union = self.user.union.id
|
|
if control_permission(self.user.user_group, model, "write", str(obj.id),
|
|
str(self.user.union.id)):
|
|
return True, obj
|
|
|
|
return False, obj
|
|
|