import argparse import asyncio import random import string import time import aiohttp from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack from aiortc.contrib.media import MediaPlayer, MediaRecorder pcs = set() def transaction_id(): return "".join(random.choice(string.ascii_letters) for x in range(12)) class JanusPlugin: def __init__(self, session, url): self._queue = asyncio.Queue() self._session = session self._url = url async def send(self, payload): message = {"janus": "message", "transaction": transaction_id()} message.update(payload) async with self._session._http.post(self._url, json=message) as response: data = await response.json() assert data["janus"] == "ack" response = await self._queue.get() assert response["transaction"] == message["transaction"] return response class JanusSession: def __init__(self, url): self._http = None self._poll_task = None self._plugins = {} self._root_url = url self._session_url = None async def attach(self, plugin_name: str) -> JanusPlugin: message = { "janus": "attach", "plugin": plugin_name, "transaction": transaction_id(), } async with self._http.post(self._session_url, json=message) as response: data = await response.json() assert data["janus"] == "success" plugin_id = data["data"]["id"] plugin = JanusPlugin(self, self._session_url + "/" + str(plugin_id)) self._plugins[plugin_id] = plugin return plugin async def create(self): self._http = aiohttp.ClientSession() message = {"janus": "create", "transaction": transaction_id()} async with self._http.post(self._root_url, json=message) as response: data = await response.json() assert data["janus"] == "success" session_id = data["data"]["id"] self._session_url = self._root_url + "/" + str(session_id) self._poll_task = asyncio.ensure_future(self._poll()) async def destroy(self): if self._poll_task: self._poll_task.cancel() self._poll_task = None if self._session_url: message = {"janus": "destroy", "transaction": transaction_id()} async with self._http.post(self._session_url, json=message) as response: data = await response.json() assert data["janus"] == "success" self._session_url = None if self._http: await self._http.close() self._http = None async def _poll(self): while True: params = {"maxev": 1, "rid": int(time.time() * 1000)} async with self._http.get(self._session_url, params=params) as response: data = await response.json() if data["janus"] == "event": plugin = self._plugins.get(data["sender"], None) if plugin: await plugin._queue.put(data) else: print(data) async def subscribe(session, room, feed): print("sub connect") pc = RTCPeerConnection() pcs.add(pc) print("record") recorder = MediaRecorder("rtmp://vid.w3ic.org/show/{}".format(feed),format="flv") @pc.on("track") async def on_track(track): print("Track %s received" % track.kind) if track.kind == "video": print('video recived') recorder.addTrack(track) if track.kind == "audio": print('audio recived') recorder.addTrack(track) # subscribe plugin = await session.attach("janus.plugin.videoroom") print({"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}}) response = await plugin.send( {"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}} ) # apply offer print(response["jsep"]["sdp"]) print(response["jsep"]["type"]) await pc.setRemoteDescription( RTCSessionDescription( sdp=response["jsep"]["sdp"], type=response["jsep"]["type"] ) ) # send answer await pc.setLocalDescription(await pc.createAnswer()) response = await plugin.send( { "body": {"request": "start"}, "jsep": { "sdp": pc.localDescription.sdp, "trickle": False, "type": pc.localDescription.type, }, } ) print(pc.localDescription.sdp) print("recorder start") await recorder.start() async def run(room, session): await session.create() # join video room plugin = await session.attach("janus.plugin.videoroom") response = await plugin.send( { "body": { "display": "aiortc", "ptype": "publisher", "request": "join", "room": room, } } ) print(response["plugindata"]) publishers = response["plugindata"]["data"]["publishers"] for publisher in publishers: print("id: %(id)s, display: %(display)s" % publisher) # receive video if publishers: for publisher in publishers: await subscribe( session=session, room=room, feed=publisher["id"] ) # exchange media for 10 minutes print("Exchanging media") await asyncio.sleep(120000) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Janus") parser.add_argument("url", help="Janus root URL, e.g. http://localhost:8088/janus") parser.add_argument( "--room", type=int, default=2671537157855710, help="The video room ID to join (default: 1234).", ), parser.add_argument("--record-to", help="Write received media to a file."), args = parser.parse_args() # create signaling and peer connection loop = asyncio.get_event_loop() try: items =[] session = JanusSession(args.url) loop.run_until_complete(run(room=args.room, session=session)) except KeyboardInterrupt: pass finally: loop.run_until_complete(session.destroy()) # close peer connections coros = [pc.close() for pc in pcs] loop.run_until_complete(asyncio.gather(*coros))