From 5d0709b4f391cda102e824dce30b85cca9372158 Mon Sep 17 00:00:00 2001 From: Mustafa Yontar Date: Wed, 15 Apr 2020 15:10:00 +0300 Subject: [PATCH] inital --- app.py | 227 ++++++++++++++++++++++++++++++++++++++++++++ forward_stream.py | 206 ++++++++++++++++++++++++++++++++++++++++ mainapp/__init__.py | 0 mainapp/janus.py | 82 ++++++++++++++++ mainapp/models.py | 18 ++++ requirements.txt | 10 ++ 6 files changed, 543 insertions(+) create mode 100644 app.py create mode 100644 forward_stream.py create mode 100644 mainapp/__init__.py create mode 100644 mainapp/janus.py create mode 100644 mainapp/models.py create mode 100644 requirements.txt diff --git a/app.py b/app.py new file mode 100644 index 0000000..62c7d60 --- /dev/null +++ b/app.py @@ -0,0 +1,227 @@ +import argparse +import json +import os +from getpass import getpass +from random import randint +import sh +from flask import Flask, request, jsonify +from flask_cors import CORS +from flask_jwt_extended import JWTManager, create_access_token, jwt_required, jwt_optional, get_jwt_identity +from flask_mongoengine import MongoEngine +from werkzeug.security import generate_password_hash, check_password_hash +from string import ascii_lowercase, ascii_uppercase, digits + +from mainapp.janus import Janus +from mainapp.models import User, Room + +app = Flask(__name__) +app.config['MONGODB_SETTINGS'] = { + 'host': os.environ.get('MONGODB_URL') +} +app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'changeme') +app.config['JANUS_URL'] = os.environ.get('JANUS_URL') +app.config['JANUS_ADMIN_KEY'] = os.environ.get('JANUS_ADMIN_KEY', '') +db = MongoEngine() +db.init_app(app) + +jwt = JWTManager(app) +cors = CORS(app, resources={r"/api/*": {"origins": "*"}}) + + +def get_current_user(): + try: + current_user = get_jwt_identity() + return User.objects.get(id=current_user) + except: + return False + + +@app.route('/api/kick//', methods=['GET']) +@jwt_optional +def kick(rid, publisher): + r = Room.objects.get(ridn=rid) + user = get_current_user() + if user: + if user.root_access or r.creator == user: + jan = Janus(os.environ.get('JANUS_URL'), app.config['JANUS_ADMIN_KEY']) + jan.connect() + jan.kick(r.room_id, int(publisher)) + return "ok" +sh_coms = {} + +@app.route("/api/sh") +def sh_coms_list(): + room_rs = [] + for room_id , sh_item in sh_coms.items(): + room_rs.append({"room":room_id, "status":sh_item.exit_code}) + return jsonify(room_rs) + + +@app.route("/api/sh/kill/") +def sh_kill(rid): + if sh_coms.get(int(rid)): + print(sh_coms.get(int(rid)).kill()) + return "ok" + +def done(cmd, success, exit_code): + print('done') + +@app.route('/api/rtp//', methods=['GET']) +@jwt_optional +def rtp_forward(rid, publisher): + r = Room.objects.get(ridn=rid) + user = get_current_user() + if user: + if user.root_access or r.creator == user: + if not sh_coms.get(r.room_id,None) or sh_coms.get(r.room_id).exit_code: + room_sh = sh.python(['forward_stream.py', '--room',r.room_id, "https://vid.w3ic.org/janus"],_bg=True, _done=done) + + sh_coms.update({r.room_id:room_sh}) + return "ok" + + +@app.route('/api/room/', methods=['GET']) +@jwt_optional +def room(rid): + r = Room.objects.get(ridn=rid) + user = get_current_user() + if user: + return jsonify({'rid': r.room_id, "can_modify": user.root_access or r.creator == user}) + else: + return jsonify({'rid': r.room_id, "can_modify": False}) + + +@app.route('/api/create/room', methods=['POST']) +@jwt_required +def create_room(): + if not request.is_json: + return jsonify({"msg": "Missing JSON in request"}), 400 + + room_name = request.json.get('room_name', 'No name room') + video_codec = request.json.get('video_codec', 'vp9') + audio_codec = request.json.get('audio_codec', 'opus') + publisher_count = request.json.get('publisher_count', 16) + bitrate = request.json.get('bitrate', 128000) + data = request.json.get('data', False) + user = get_current_user() + if user.create_room_access or user.root_access: + jan = Janus(os.environ.get('JANUS_URL'), app.config['JANUS_ADMIN_KEY']) + jan.connect() + room_id = jan.create_room(name=room_name, video_codec=video_codec, audio_codec=audio_codec, + publisher_count=int(publisher_count), bitrate=bitrate, data=data) + room = Room() + room.room_name = room_name + room.creator = user + room.room_id = room_id + have_room = True + ridn = None + letters = ascii_lowercase + ascii_uppercase + digits + while have_room: + ridn = ''.join([letters[randint(0, len(letters) - 1)] for rid in range(6)]) + if Room.objects.filter(ridn=ridn).count() == 0: + have_room = False + room.ridn = ridn + room.save() + return jsonify({"msg": "seccess", "ridn": ridn}), 00 + else: + return jsonify({"msg": "You don't have a this access"}), 403 + + +@app.route('/api/rooms', methods=['GET']) +@jwt_required +def rooms(): + user = get_current_user() + if user.root_access: + json_data = json.dumps({ + "rooms": json.loads(Room.objects.all().to_json()), + "can_create": True + }) + else: + json_data = json.dumps({ + "rooms": user.rooms.to_json(), + "can_create": False + }) + return json_data + + +@app.route('/api/create/user', methods=['POST']) +@jwt_required +def create_user(): + if not request.is_json: + return jsonify({"msg": "Missing JSON in request"}), 400 + + room_id = request.json.get('room', None) + create_room = request.json.get('create_room', False) + root_access = request.json.get('root_access', False) + password = request.json.get('password', False) + user = get_current_user() + if not user.create_room_access and not user.root_access: + return jsonify({"msg": "You don't have a this access"}), 403 + + room = Room.objects.get(ridn=room_id) + if room.creator != user and not user.root_access: + return jsonify({"msg": "You don't have a this access"}), 403 + if (create_room or root_access) and not user.root_access: + return jsonify({"msg": "You don't have a this access"}), 403 + letters = ascii_lowercase + ascii_uppercase + digits + have_user = True + user_id = None + while have_user: + user_id = ''.join([letters[randint(len(letters) - 1)] for rid in range(6)]) + if User.objects.filter(uidn=user_id).count() == 0: + have_user = False + new_user = User() + new_user.create_room_access = create_room + new_user.root_access = root_access + new_user.uidn = user_id + new_user.rooms.append(room) + if create_room or root_access: + new_user.passw = generate_password_hash(password, method='sha256') + new_user.save() + return jsonify({'msg': 'user created', 'uid': user_id}) + + +@app.route('/api/login', methods=['POST']) +def login(): + if not request.is_json: + return jsonify({"msg": "Missing JSON in request"}), 400 + + username = request.json.get('username', None) + password = request.json.get('password', None) + if not username: + return jsonify({"msg": "Missing username parameter"}), 400 + + user = User.objects.filter(uidn=username) + if user.count() == 0: + return jsonify({"msg": "Bad username or password"}), 401 + else: + user = user.get() + if not password and (user.root_access or user.create_room_access): + return jsonify({"msg": "Password need for this user"}), 401 + if password or user.passw: + if not check_password_hash(user.passw, password): + return jsonify({"msg": "Bad username or password"}), 401 + + # Identity can be any data that is json serializable + access_token = create_access_token(identity=str(user.id)) + return jsonify(access_token=access_token), 200 + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Albatros Backend') + parser.add_argument('--create-user', nargs='?', help='Create a root user') + args = parser.parse_args() + if args.create_user: + new_user = User() + new_user.uidn = args.create_user + new_user.root_access = True + passcheck = False + while not passcheck: + passwd = getpass('Password: ') + passwd_check = getpass('Password Check: ') + if passwd_check == passwd: + passcheck = True + new_user.passw = generate_password_hash(passwd, method='sha256') + new_user.save() + exit() + app.run(debug=True) diff --git a/forward_stream.py b/forward_stream.py new file mode 100644 index 0000000..bb6e7f3 --- /dev/null +++ b/forward_stream.py @@ -0,0 +1,206 @@ +import argparse +import asyncio +import random +import string +import time + +import aiohttp +from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack +from aiortc.contrib.media import MediaPlayer, MediaRecorder + +pcs = set() + +def transaction_id(): + return "".join(random.choice(string.ascii_letters) for x in range(12)) + + +class JanusPlugin: + def __init__(self, session, url): + self._queue = asyncio.Queue() + self._session = session + self._url = url + + async def send(self, payload): + message = {"janus": "message", "transaction": transaction_id()} + message.update(payload) + async with self._session._http.post(self._url, json=message) as response: + data = await response.json() + assert data["janus"] == "ack" + + response = await self._queue.get() + assert response["transaction"] == message["transaction"] + return response + + + + +class JanusSession: + def __init__(self, url): + self._http = None + self._poll_task = None + self._plugins = {} + self._root_url = url + self._session_url = None + + async def attach(self, plugin_name: str) -> JanusPlugin: + message = { + "janus": "attach", + "plugin": plugin_name, + "transaction": transaction_id(), + } + async with self._http.post(self._session_url, json=message) as response: + data = await response.json() + assert data["janus"] == "success" + plugin_id = data["data"]["id"] + plugin = JanusPlugin(self, self._session_url + "/" + str(plugin_id)) + self._plugins[plugin_id] = plugin + return plugin + + async def create(self): + self._http = aiohttp.ClientSession() + message = {"janus": "create", "transaction": transaction_id()} + async with self._http.post(self._root_url, json=message) as response: + data = await response.json() + assert data["janus"] == "success" + session_id = data["data"]["id"] + self._session_url = self._root_url + "/" + str(session_id) + + self._poll_task = asyncio.ensure_future(self._poll()) + + async def destroy(self): + if self._poll_task: + self._poll_task.cancel() + self._poll_task = None + + if self._session_url: + message = {"janus": "destroy", "transaction": transaction_id()} + async with self._http.post(self._session_url, json=message) as response: + data = await response.json() + assert data["janus"] == "success" + self._session_url = None + + if self._http: + await self._http.close() + self._http = None + + async def _poll(self): + while True: + params = {"maxev": 1, "rid": int(time.time() * 1000)} + async with self._http.get(self._session_url, params=params) as response: + data = await response.json() + if data["janus"] == "event": + plugin = self._plugins.get(data["sender"], None) + if plugin: + await plugin._queue.put(data) + else: + print(data) + + + +async def subscribe(session, room, feed): + print("sub connect") + pc = RTCPeerConnection() + pcs.add(pc) + print("record") + recorder = MediaRecorder("rtmp://vid.w3ic.org/show/{}".format(feed),format="flv") + @pc.on("track") + async def on_track(track): + print("Track %s received" % track.kind) + if track.kind == "video": + print('video recived') + recorder.addTrack(track) + if track.kind == "audio": + print('audio recived') + recorder.addTrack(track) + + # subscribe + plugin = await session.attach("janus.plugin.videoroom") + print({"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}}) + response = await plugin.send( + {"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}} + ) + + # apply offer + await pc.setRemoteDescription( + RTCSessionDescription( + sdp=response["jsep"]["sdp"], type=response["jsep"]["type"] + ) + ) + + # send answer + await pc.setLocalDescription(await pc.createAnswer()) + response = await plugin.send( + { + "body": {"request": "start"}, + "jsep": { + "sdp": pc.localDescription.sdp, + "trickle": False, + "type": pc.localDescription.type, + }, + } + ) + print("recorder start") + await recorder.start() + + +async def run(room, session): + await session.create() + + # join video room + plugin = await session.attach("janus.plugin.videoroom") + response = await plugin.send( + { + "body": { + "display": "aiortc", + "ptype": "publisher", + "request": "join", + "room": room, + } + } + ) + print(response["plugindata"]) + publishers = response["plugindata"]["data"]["publishers"] + for publisher in publishers: + print("id: %(id)s, display: %(display)s" % publisher) + + # receive video + if publishers: + for publisher in publishers: + await subscribe( + session=session, room=room, feed=publisher["id"] + ) + + # exchange media for 10 minutes + print("Exchanging media") + await asyncio.sleep(120000) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Janus") + parser.add_argument("url", help="Janus root URL, e.g. http://localhost:8088/janus") + parser.add_argument( + "--room", + type=int, + default=2671537157855710, + help="The video room ID to join (default: 1234).", + ), + parser.add_argument("--record-to", help="Write received media to a file."), + args = parser.parse_args() + + + # create signaling and peer connection + loop = asyncio.get_event_loop() + try: + + items =[] + session = JanusSession(args.url) + loop.run_until_complete(run(room=args.room, session=session)) + except KeyboardInterrupt: + pass + finally: + + loop.run_until_complete(session.destroy()) + + # close peer connections + coros = [pc.close() for pc in pcs] + loop.run_until_complete(asyncio.gather(*coros)) \ No newline at end of file diff --git a/mainapp/__init__.py b/mainapp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mainapp/janus.py b/mainapp/janus.py new file mode 100644 index 0000000..de62384 --- /dev/null +++ b/mainapp/janus.py @@ -0,0 +1,82 @@ +from random import randint +from string import digits, ascii_uppercase, ascii_lowercase +from urllib.parse import urljoin + +import requests + + +class Janus: + url: str = None + admin_key: str = None + session_id: int = None + attach_id: int = None + + def __init__(self, url: str, admin_key: str = None) -> None: + self.url = url + self.admin_key = admin_key + + def attach(self): + session_uri = urljoin(self.url, "{}/".format(self.session_id)) + request_body = {"janus": "attach", "plugin": "janus.plugin.videoroom", + "opaque_id": "room-{}".format(self.create_transaction_id()), + "transaction": self.create_transaction_id()} + response = requests.post(session_uri, json=request_body).json() + if response.get('janus') == "success": + self.attach_id = response.get('data').get('id') + return True + return False + def rtp_forward(self,room, publisher): + attach_uri = urljoin(self.url, "{}/{}".format(self.session_id, self.attach_id)) + + request_body = {"janus": "message", + "body": {"request": "rtp_forward", 'secret': 'adminpwd', "admin_key": self.admin_key, "room": room, + "publisher_id": publisher, "audio_port": 10033, "audiopt": 111, "video_port": 10038, "videopt": 100, "host": "127.0.0.1" + }, + "transaction": self.create_transaction_id()} + response = requests.post(attach_uri, json=request_body).json() + print(response) + def kick(self, room, publisher): + attach_uri = urljoin(self.url, "{}/{}".format(self.session_id, self.attach_id)) + request_body = {"janus": "message", + "body": {"request": "kick", 'secret': 'adminpwd', "admin_key": self.admin_key, "room": room, "id": publisher + }, + "transaction": self.create_transaction_id()} + response = requests.post(attach_uri, json=request_body).json() + print(response) + + def create_room(self, name: str = "", video_codec: str = "vp9", audio_codec: str = "opus", + publisher_count: int = 16, + bitrate: int = 500000, data: bool = False) -> int: + attach_uri = urljoin(self.url, "{}/{}".format(self.session_id, self.attach_id)) + request_body = {"janus": "message", + "body": {"request": "create", "admin_key": self.admin_key, "audiolevel_ext": True, + "audiolevel_event": True, + "description": name, "bitrate": bitrate, + "publishers": publisher_count, + "audiocodec": audio_codec, + "fir_freq": 10, + "videocodec": video_codec, + "bitrate_cap": True, + "data": data + }, + "transaction": self.create_transaction_id()} + print(request_body) + response = requests.post(attach_uri, json=request_body).json() + print(response) + return response.get("plugindata").get('data').get('room') + + @staticmethod + def create_transaction_id(length: int = 16) -> str: + letters = ascii_lowercase + ascii_uppercase + digits + transaction_id = ''.join([letters[randint(0, len(letters) - 1)] for rid in range(length)]) + return transaction_id + + def connect(self) -> bool: + transaction_id = self.create_transaction_id() + request_data = { + "janus": "create", + "transaction": transaction_id + } + response = requests.post(self.url, json=request_data).json() + self.session_id = response.get('data').get('id') + return self.attach() diff --git a/mainapp/models.py b/mainapp/models.py new file mode 100644 index 0000000..785d37a --- /dev/null +++ b/mainapp/models.py @@ -0,0 +1,18 @@ +from flask_mongoengine import Document +from mongoengine import StringField, IntField, BooleanField, ListField, ReferenceField + + +class User(Document): + uidn = StringField() + passw = StringField() + rooms = ListField(ReferenceField('Room')) + create_room_access = BooleanField(default=False) + root_access = BooleanField(default=False) + +class Room(Document): + ridn = StringField() + creator = ReferenceField(User) + room_name = StringField() + streamer = BooleanField(default=False) + room_id = IntField() + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7db5617 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +flask +flask-mongoengine +flask-login +flask-jwt-extended +requests +flask-cors +av +aiortc +aiohttp +sh \ No newline at end of file