You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
backend/forward_stream.py

209 lines
6.3 KiB

import argparse
import asyncio
import random
import string
import time
import aiohttp
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
from aiortc.contrib.media import MediaPlayer, MediaRecorder
pcs = set()
def transaction_id():
return "".join(random.choice(string.ascii_letters) for x in range(12))
class JanusPlugin:
def __init__(self, session, url):
self._queue = asyncio.Queue()
self._session = session
self._url = url
async def send(self, payload):
message = {"janus": "message", "transaction": transaction_id()}
message.update(payload)
async with self._session._http.post(self._url, json=message) as response:
data = await response.json()
assert data["janus"] == "ack"
response = await self._queue.get()
assert response["transaction"] == message["transaction"]
return response
class JanusSession:
def __init__(self, url):
self._http = None
self._poll_task = None
self._plugins = {}
self._root_url = url
self._session_url = None
async def attach(self, plugin_name: str) -> JanusPlugin:
message = {
"janus": "attach",
"plugin": plugin_name,
"transaction": transaction_id(),
}
async with self._http.post(self._session_url, json=message) as response:
data = await response.json()
assert data["janus"] == "success"
plugin_id = data["data"]["id"]
plugin = JanusPlugin(self, self._session_url + "/" + str(plugin_id))
self._plugins[plugin_id] = plugin
return plugin
async def create(self):
self._http = aiohttp.ClientSession()
message = {"janus": "create", "transaction": transaction_id()}
async with self._http.post(self._root_url, json=message) as response:
data = await response.json()
assert data["janus"] == "success"
session_id = data["data"]["id"]
self._session_url = self._root_url + "/" + str(session_id)
self._poll_task = asyncio.ensure_future(self._poll())
async def destroy(self):
if self._poll_task:
self._poll_task.cancel()
self._poll_task = None
if self._session_url:
message = {"janus": "destroy", "transaction": transaction_id()}
async with self._http.post(self._session_url, json=message) as response:
data = await response.json()
assert data["janus"] == "success"
self._session_url = None
if self._http:
await self._http.close()
self._http = None
async def _poll(self):
while True:
params = {"maxev": 1, "rid": int(time.time() * 1000)}
async with self._http.get(self._session_url, params=params) as response:
data = await response.json()
if data["janus"] == "event":
plugin = self._plugins.get(data["sender"], None)
if plugin:
await plugin._queue.put(data)
else:
print(data)
async def subscribe(session, room, feed):
print("sub connect")
pc = RTCPeerConnection()
pcs.add(pc)
print("record")
recorder = MediaRecorder("rtmp://vid.w3ic.org/show/{}".format(feed),format="flv")
@pc.on("track")
async def on_track(track):
print("Track %s received" % track.kind)
if track.kind == "video":
print('video recived')
recorder.addTrack(track)
if track.kind == "audio":
print('audio recived')
recorder.addTrack(track)
# subscribe
plugin = await session.attach("janus.plugin.videoroom")
print({"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}})
response = await plugin.send(
{"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}}
)
# apply offer
print(response["jsep"]["sdp"])
print(response["jsep"]["type"])
await pc.setRemoteDescription(
RTCSessionDescription(
sdp=response["jsep"]["sdp"], type=response["jsep"]["type"]
)
)
# send answer
await pc.setLocalDescription(await pc.createAnswer())
response = await plugin.send(
{
"body": {"request": "start"},
"jsep": {
"sdp": pc.localDescription.sdp,
"trickle": False,
"type": pc.localDescription.type,
},
}
)
print(pc.localDescription.sdp)
print("recorder start")
await recorder.start()
async def run(room, session):
await session.create()
# join video room
plugin = await session.attach("janus.plugin.videoroom")
response = await plugin.send(
{
"body": {
"display": "aiortc",
"ptype": "publisher",
"request": "join",
"room": room,
}
}
)
print(response["plugindata"])
publishers = response["plugindata"]["data"]["publishers"]
for publisher in publishers:
print("id: %(id)s, display: %(display)s" % publisher)
# receive video
if publishers:
for publisher in publishers:
await subscribe(
session=session, room=room, feed=publisher["id"]
)
# exchange media for 10 minutes
print("Exchanging media")
await asyncio.sleep(120000)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Janus")
parser.add_argument("url", help="Janus root URL, e.g. http://localhost:8088/janus")
parser.add_argument(
"--room",
type=int,
default=2671537157855710,
help="The video room ID to join (default: 1234).",
),
parser.add_argument("--record-to", help="Write received media to a file."),
args = parser.parse_args()
# create signaling and peer connection
loop = asyncio.get_event_loop()
try:
items =[]
session = JanusSession(args.url)
loop.run_until_complete(run(room=args.room, session=session))
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(session.destroy())
# close peer connections
coros = [pc.close() for pc in pcs]
loop.run_until_complete(asyncio.gather(*coros))