master
Mustafa Yontar 4 years ago
parent 822c4de1ea
commit 5d0709b4f3
  1. 227
      app.py
  2. 206
      forward_stream.py
  3. 0
      mainapp/__init__.py
  4. 82
      mainapp/janus.py
  5. 18
      mainapp/models.py
  6. 10
      requirements.txt

227
app.py

@ -0,0 +1,227 @@
import argparse
import json
import os
from getpass import getpass
from random import randint
import sh
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, jwt_optional, get_jwt_identity
from flask_mongoengine import MongoEngine
from werkzeug.security import generate_password_hash, check_password_hash
from string import ascii_lowercase, ascii_uppercase, digits
from mainapp.janus import Janus
from mainapp.models import User, Room
app = Flask(__name__)
app.config['MONGODB_SETTINGS'] = {
'host': os.environ.get('MONGODB_URL')
}
app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'changeme')
app.config['JANUS_URL'] = os.environ.get('JANUS_URL')
app.config['JANUS_ADMIN_KEY'] = os.environ.get('JANUS_ADMIN_KEY', '')
db = MongoEngine()
db.init_app(app)
jwt = JWTManager(app)
cors = CORS(app, resources={r"/api/*": {"origins": "*"}})
def get_current_user():
try:
current_user = get_jwt_identity()
return User.objects.get(id=current_user)
except:
return False
@app.route('/api/kick/<rid>/<publisher>', methods=['GET'])
@jwt_optional
def kick(rid, publisher):
r = Room.objects.get(ridn=rid)
user = get_current_user()
if user:
if user.root_access or r.creator == user:
jan = Janus(os.environ.get('JANUS_URL'), app.config['JANUS_ADMIN_KEY'])
jan.connect()
jan.kick(r.room_id, int(publisher))
return "ok"
sh_coms = {}
@app.route("/api/sh")
def sh_coms_list():
room_rs = []
for room_id , sh_item in sh_coms.items():
room_rs.append({"room":room_id, "status":sh_item.exit_code})
return jsonify(room_rs)
@app.route("/api/sh/kill/<rid>")
def sh_kill(rid):
if sh_coms.get(int(rid)):
print(sh_coms.get(int(rid)).kill())
return "ok"
def done(cmd, success, exit_code):
print('done')
@app.route('/api/rtp/<rid>/<publisher>', methods=['GET'])
@jwt_optional
def rtp_forward(rid, publisher):
r = Room.objects.get(ridn=rid)
user = get_current_user()
if user:
if user.root_access or r.creator == user:
if not sh_coms.get(r.room_id,None) or sh_coms.get(r.room_id).exit_code:
room_sh = sh.python(['forward_stream.py', '--room',r.room_id, "https://vid.w3ic.org/janus"],_bg=True, _done=done)
sh_coms.update({r.room_id:room_sh})
return "ok"
@app.route('/api/room/<rid>', methods=['GET'])
@jwt_optional
def room(rid):
r = Room.objects.get(ridn=rid)
user = get_current_user()
if user:
return jsonify({'rid': r.room_id, "can_modify": user.root_access or r.creator == user})
else:
return jsonify({'rid': r.room_id, "can_modify": False})
@app.route('/api/create/room', methods=['POST'])
@jwt_required
def create_room():
if not request.is_json:
return jsonify({"msg": "Missing JSON in request"}), 400
room_name = request.json.get('room_name', 'No name room')
video_codec = request.json.get('video_codec', 'vp9')
audio_codec = request.json.get('audio_codec', 'opus')
publisher_count = request.json.get('publisher_count', 16)
bitrate = request.json.get('bitrate', 128000)
data = request.json.get('data', False)
user = get_current_user()
if user.create_room_access or user.root_access:
jan = Janus(os.environ.get('JANUS_URL'), app.config['JANUS_ADMIN_KEY'])
jan.connect()
room_id = jan.create_room(name=room_name, video_codec=video_codec, audio_codec=audio_codec,
publisher_count=int(publisher_count), bitrate=bitrate, data=data)
room = Room()
room.room_name = room_name
room.creator = user
room.room_id = room_id
have_room = True
ridn = None
letters = ascii_lowercase + ascii_uppercase + digits
while have_room:
ridn = ''.join([letters[randint(0, len(letters) - 1)] for rid in range(6)])
if Room.objects.filter(ridn=ridn).count() == 0:
have_room = False
room.ridn = ridn
room.save()
return jsonify({"msg": "seccess", "ridn": ridn}), 00
else:
return jsonify({"msg": "You don't have a this access"}), 403
@app.route('/api/rooms', methods=['GET'])
@jwt_required
def rooms():
user = get_current_user()
if user.root_access:
json_data = json.dumps({
"rooms": json.loads(Room.objects.all().to_json()),
"can_create": True
})
else:
json_data = json.dumps({
"rooms": user.rooms.to_json(),
"can_create": False
})
return json_data
@app.route('/api/create/user', methods=['POST'])
@jwt_required
def create_user():
if not request.is_json:
return jsonify({"msg": "Missing JSON in request"}), 400
room_id = request.json.get('room', None)
create_room = request.json.get('create_room', False)
root_access = request.json.get('root_access', False)
password = request.json.get('password', False)
user = get_current_user()
if not user.create_room_access and not user.root_access:
return jsonify({"msg": "You don't have a this access"}), 403
room = Room.objects.get(ridn=room_id)
if room.creator != user and not user.root_access:
return jsonify({"msg": "You don't have a this access"}), 403
if (create_room or root_access) and not user.root_access:
return jsonify({"msg": "You don't have a this access"}), 403
letters = ascii_lowercase + ascii_uppercase + digits
have_user = True
user_id = None
while have_user:
user_id = ''.join([letters[randint(len(letters) - 1)] for rid in range(6)])
if User.objects.filter(uidn=user_id).count() == 0:
have_user = False
new_user = User()
new_user.create_room_access = create_room
new_user.root_access = root_access
new_user.uidn = user_id
new_user.rooms.append(room)
if create_room or root_access:
new_user.passw = generate_password_hash(password, method='sha256')
new_user.save()
return jsonify({'msg': 'user created', 'uid': user_id})
@app.route('/api/login', methods=['POST'])
def login():
if not request.is_json:
return jsonify({"msg": "Missing JSON in request"}), 400
username = request.json.get('username', None)
password = request.json.get('password', None)
if not username:
return jsonify({"msg": "Missing username parameter"}), 400
user = User.objects.filter(uidn=username)
if user.count() == 0:
return jsonify({"msg": "Bad username or password"}), 401
else:
user = user.get()
if not password and (user.root_access or user.create_room_access):
return jsonify({"msg": "Password need for this user"}), 401
if password or user.passw:
if not check_password_hash(user.passw, password):
return jsonify({"msg": "Bad username or password"}), 401
# Identity can be any data that is json serializable
access_token = create_access_token(identity=str(user.id))
return jsonify(access_token=access_token), 200
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Albatros Backend')
parser.add_argument('--create-user', nargs='?', help='Create a root user')
args = parser.parse_args()
if args.create_user:
new_user = User()
new_user.uidn = args.create_user
new_user.root_access = True
passcheck = False
while not passcheck:
passwd = getpass('Password: ')
passwd_check = getpass('Password Check: ')
if passwd_check == passwd:
passcheck = True
new_user.passw = generate_password_hash(passwd, method='sha256')
new_user.save()
exit()
app.run(debug=True)

@ -0,0 +1,206 @@
import argparse
import asyncio
import random
import string
import time
import aiohttp
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
from aiortc.contrib.media import MediaPlayer, MediaRecorder
pcs = set()
def transaction_id():
return "".join(random.choice(string.ascii_letters) for x in range(12))
class JanusPlugin:
def __init__(self, session, url):
self._queue = asyncio.Queue()
self._session = session
self._url = url
async def send(self, payload):
message = {"janus": "message", "transaction": transaction_id()}
message.update(payload)
async with self._session._http.post(self._url, json=message) as response:
data = await response.json()
assert data["janus"] == "ack"
response = await self._queue.get()
assert response["transaction"] == message["transaction"]
return response
class JanusSession:
def __init__(self, url):
self._http = None
self._poll_task = None
self._plugins = {}
self._root_url = url
self._session_url = None
async def attach(self, plugin_name: str) -> JanusPlugin:
message = {
"janus": "attach",
"plugin": plugin_name,
"transaction": transaction_id(),
}
async with self._http.post(self._session_url, json=message) as response:
data = await response.json()
assert data["janus"] == "success"
plugin_id = data["data"]["id"]
plugin = JanusPlugin(self, self._session_url + "/" + str(plugin_id))
self._plugins[plugin_id] = plugin
return plugin
async def create(self):
self._http = aiohttp.ClientSession()
message = {"janus": "create", "transaction": transaction_id()}
async with self._http.post(self._root_url, json=message) as response:
data = await response.json()
assert data["janus"] == "success"
session_id = data["data"]["id"]
self._session_url = self._root_url + "/" + str(session_id)
self._poll_task = asyncio.ensure_future(self._poll())
async def destroy(self):
if self._poll_task:
self._poll_task.cancel()
self._poll_task = None
if self._session_url:
message = {"janus": "destroy", "transaction": transaction_id()}
async with self._http.post(self._session_url, json=message) as response:
data = await response.json()
assert data["janus"] == "success"
self._session_url = None
if self._http:
await self._http.close()
self._http = None
async def _poll(self):
while True:
params = {"maxev": 1, "rid": int(time.time() * 1000)}
async with self._http.get(self._session_url, params=params) as response:
data = await response.json()
if data["janus"] == "event":
plugin = self._plugins.get(data["sender"], None)
if plugin:
await plugin._queue.put(data)
else:
print(data)
async def subscribe(session, room, feed):
print("sub connect")
pc = RTCPeerConnection()
pcs.add(pc)
print("record")
recorder = MediaRecorder("rtmp://vid.w3ic.org/show/{}".format(feed),format="flv")
@pc.on("track")
async def on_track(track):
print("Track %s received" % track.kind)
if track.kind == "video":
print('video recived')
recorder.addTrack(track)
if track.kind == "audio":
print('audio recived')
recorder.addTrack(track)
# subscribe
plugin = await session.attach("janus.plugin.videoroom")
print({"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}})
response = await plugin.send(
{"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}}
)
# apply offer
await pc.setRemoteDescription(
RTCSessionDescription(
sdp=response["jsep"]["sdp"], type=response["jsep"]["type"]
)
)
# send answer
await pc.setLocalDescription(await pc.createAnswer())
response = await plugin.send(
{
"body": {"request": "start"},
"jsep": {
"sdp": pc.localDescription.sdp,
"trickle": False,
"type": pc.localDescription.type,
},
}
)
print("recorder start")
await recorder.start()
async def run(room, session):
await session.create()
# join video room
plugin = await session.attach("janus.plugin.videoroom")
response = await plugin.send(
{
"body": {
"display": "aiortc",
"ptype": "publisher",
"request": "join",
"room": room,
}
}
)
print(response["plugindata"])
publishers = response["plugindata"]["data"]["publishers"]
for publisher in publishers:
print("id: %(id)s, display: %(display)s" % publisher)
# receive video
if publishers:
for publisher in publishers:
await subscribe(
session=session, room=room, feed=publisher["id"]
)
# exchange media for 10 minutes
print("Exchanging media")
await asyncio.sleep(120000)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Janus")
parser.add_argument("url", help="Janus root URL, e.g. http://localhost:8088/janus")
parser.add_argument(
"--room",
type=int,
default=2671537157855710,
help="The video room ID to join (default: 1234).",
),
parser.add_argument("--record-to", help="Write received media to a file."),
args = parser.parse_args()
# create signaling and peer connection
loop = asyncio.get_event_loop()
try:
items =[]
session = JanusSession(args.url)
loop.run_until_complete(run(room=args.room, session=session))
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(session.destroy())
# close peer connections
coros = [pc.close() for pc in pcs]
loop.run_until_complete(asyncio.gather(*coros))

@ -0,0 +1,82 @@
from random import randint
from string import digits, ascii_uppercase, ascii_lowercase
from urllib.parse import urljoin
import requests
class Janus:
url: str = None
admin_key: str = None
session_id: int = None
attach_id: int = None
def __init__(self, url: str, admin_key: str = None) -> None:
self.url = url
self.admin_key = admin_key
def attach(self):
session_uri = urljoin(self.url, "{}/".format(self.session_id))
request_body = {"janus": "attach", "plugin": "janus.plugin.videoroom",
"opaque_id": "room-{}".format(self.create_transaction_id()),
"transaction": self.create_transaction_id()}
response = requests.post(session_uri, json=request_body).json()
if response.get('janus') == "success":
self.attach_id = response.get('data').get('id')
return True
return False
def rtp_forward(self,room, publisher):
attach_uri = urljoin(self.url, "{}/{}".format(self.session_id, self.attach_id))
request_body = {"janus": "message",
"body": {"request": "rtp_forward", 'secret': 'adminpwd', "admin_key": self.admin_key, "room": room,
"publisher_id": publisher, "audio_port": 10033, "audiopt": 111, "video_port": 10038, "videopt": 100, "host": "127.0.0.1"
},
"transaction": self.create_transaction_id()}
response = requests.post(attach_uri, json=request_body).json()
print(response)
def kick(self, room, publisher):
attach_uri = urljoin(self.url, "{}/{}".format(self.session_id, self.attach_id))
request_body = {"janus": "message",
"body": {"request": "kick", 'secret': 'adminpwd', "admin_key": self.admin_key, "room": room, "id": publisher
},
"transaction": self.create_transaction_id()}
response = requests.post(attach_uri, json=request_body).json()
print(response)
def create_room(self, name: str = "", video_codec: str = "vp9", audio_codec: str = "opus",
publisher_count: int = 16,
bitrate: int = 500000, data: bool = False) -> int:
attach_uri = urljoin(self.url, "{}/{}".format(self.session_id, self.attach_id))
request_body = {"janus": "message",
"body": {"request": "create", "admin_key": self.admin_key, "audiolevel_ext": True,
"audiolevel_event": True,
"description": name, "bitrate": bitrate,
"publishers": publisher_count,
"audiocodec": audio_codec,
"fir_freq": 10,
"videocodec": video_codec,
"bitrate_cap": True,
"data": data
},
"transaction": self.create_transaction_id()}
print(request_body)
response = requests.post(attach_uri, json=request_body).json()
print(response)
return response.get("plugindata").get('data').get('room')
@staticmethod
def create_transaction_id(length: int = 16) -> str:
letters = ascii_lowercase + ascii_uppercase + digits
transaction_id = ''.join([letters[randint(0, len(letters) - 1)] for rid in range(length)])
return transaction_id
def connect(self) -> bool:
transaction_id = self.create_transaction_id()
request_data = {
"janus": "create",
"transaction": transaction_id
}
response = requests.post(self.url, json=request_data).json()
self.session_id = response.get('data').get('id')
return self.attach()

@ -0,0 +1,18 @@
from flask_mongoengine import Document
from mongoengine import StringField, IntField, BooleanField, ListField, ReferenceField
class User(Document):
uidn = StringField()
passw = StringField()
rooms = ListField(ReferenceField('Room'))
create_room_access = BooleanField(default=False)
root_access = BooleanField(default=False)
class Room(Document):
ridn = StringField()
creator = ReferenceField(User)
room_name = StringField()
streamer = BooleanField(default=False)
room_id = IntField()

@ -0,0 +1,10 @@
flask
flask-mongoengine
flask-login
flask-jwt-extended
requests
flask-cors
av
aiortc
aiohttp
sh
Loading…
Cancel
Save