You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
209 lines
6.3 KiB
209 lines
6.3 KiB
import argparse
|
|
import asyncio
|
|
import random
|
|
import string
|
|
import time
|
|
|
|
import aiohttp
|
|
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
|
|
from aiortc.contrib.media import MediaPlayer, MediaRecorder
|
|
|
|
pcs = set()
|
|
|
|
def transaction_id():
|
|
return "".join(random.choice(string.ascii_letters) for x in range(12))
|
|
|
|
|
|
class JanusPlugin:
|
|
def __init__(self, session, url):
|
|
self._queue = asyncio.Queue()
|
|
self._session = session
|
|
self._url = url
|
|
|
|
async def send(self, payload):
|
|
message = {"janus": "message", "transaction": transaction_id()}
|
|
message.update(payload)
|
|
async with self._session._http.post(self._url, json=message) as response:
|
|
data = await response.json()
|
|
assert data["janus"] == "ack"
|
|
|
|
response = await self._queue.get()
|
|
assert response["transaction"] == message["transaction"]
|
|
return response
|
|
|
|
|
|
|
|
|
|
class JanusSession:
|
|
def __init__(self, url):
|
|
self._http = None
|
|
self._poll_task = None
|
|
self._plugins = {}
|
|
self._root_url = url
|
|
self._session_url = None
|
|
|
|
async def attach(self, plugin_name: str) -> JanusPlugin:
|
|
message = {
|
|
"janus": "attach",
|
|
"plugin": plugin_name,
|
|
"transaction": transaction_id(),
|
|
}
|
|
async with self._http.post(self._session_url, json=message) as response:
|
|
data = await response.json()
|
|
assert data["janus"] == "success"
|
|
plugin_id = data["data"]["id"]
|
|
plugin = JanusPlugin(self, self._session_url + "/" + str(plugin_id))
|
|
self._plugins[plugin_id] = plugin
|
|
return plugin
|
|
|
|
async def create(self):
|
|
self._http = aiohttp.ClientSession()
|
|
message = {"janus": "create", "transaction": transaction_id()}
|
|
async with self._http.post(self._root_url, json=message) as response:
|
|
data = await response.json()
|
|
assert data["janus"] == "success"
|
|
session_id = data["data"]["id"]
|
|
self._session_url = self._root_url + "/" + str(session_id)
|
|
|
|
self._poll_task = asyncio.ensure_future(self._poll())
|
|
|
|
async def destroy(self):
|
|
if self._poll_task:
|
|
self._poll_task.cancel()
|
|
self._poll_task = None
|
|
|
|
if self._session_url:
|
|
message = {"janus": "destroy", "transaction": transaction_id()}
|
|
async with self._http.post(self._session_url, json=message) as response:
|
|
data = await response.json()
|
|
assert data["janus"] == "success"
|
|
self._session_url = None
|
|
|
|
if self._http:
|
|
await self._http.close()
|
|
self._http = None
|
|
|
|
async def _poll(self):
|
|
while True:
|
|
params = {"maxev": 1, "rid": int(time.time() * 1000)}
|
|
async with self._http.get(self._session_url, params=params) as response:
|
|
data = await response.json()
|
|
if data["janus"] == "event":
|
|
plugin = self._plugins.get(data["sender"], None)
|
|
if plugin:
|
|
await plugin._queue.put(data)
|
|
else:
|
|
print(data)
|
|
|
|
|
|
|
|
async def subscribe(session, room, feed):
|
|
print("sub connect")
|
|
pc = RTCPeerConnection()
|
|
pcs.add(pc)
|
|
print("record")
|
|
recorder = MediaRecorder("rtmp://vid.w3ic.org/show/{}".format(feed),format="flv")
|
|
@pc.on("track")
|
|
async def on_track(track):
|
|
print("Track %s received" % track.kind)
|
|
if track.kind == "video":
|
|
print('video recived')
|
|
recorder.addTrack(track)
|
|
if track.kind == "audio":
|
|
print('audio recived')
|
|
recorder.addTrack(track)
|
|
|
|
# subscribe
|
|
plugin = await session.attach("janus.plugin.videoroom")
|
|
print({"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}})
|
|
response = await plugin.send(
|
|
{"body": {"request": "join", "ptype": "subscriber", "room": room, "feed": feed}}
|
|
)
|
|
|
|
# apply offer
|
|
print(response["jsep"]["sdp"])
|
|
print(response["jsep"]["type"])
|
|
await pc.setRemoteDescription(
|
|
RTCSessionDescription(
|
|
sdp=response["jsep"]["sdp"], type=response["jsep"]["type"]
|
|
)
|
|
)
|
|
|
|
# send answer
|
|
await pc.setLocalDescription(await pc.createAnswer())
|
|
response = await plugin.send(
|
|
{
|
|
"body": {"request": "start"},
|
|
"jsep": {
|
|
"sdp": pc.localDescription.sdp,
|
|
"trickle": False,
|
|
"type": pc.localDescription.type,
|
|
},
|
|
}
|
|
)
|
|
print(pc.localDescription.sdp)
|
|
print("recorder start")
|
|
await recorder.start()
|
|
|
|
|
|
async def run(room, session):
|
|
await session.create()
|
|
|
|
# join video room
|
|
plugin = await session.attach("janus.plugin.videoroom")
|
|
response = await plugin.send(
|
|
{
|
|
"body": {
|
|
"display": "aiortc",
|
|
"ptype": "publisher",
|
|
"request": "join",
|
|
"room": room,
|
|
}
|
|
}
|
|
)
|
|
print(response["plugindata"])
|
|
publishers = response["plugindata"]["data"]["publishers"]
|
|
for publisher in publishers:
|
|
print("id: %(id)s, display: %(display)s" % publisher)
|
|
|
|
# receive video
|
|
if publishers:
|
|
for publisher in publishers:
|
|
await subscribe(
|
|
session=session, room=room, feed=publisher["id"]
|
|
)
|
|
|
|
# exchange media for 10 minutes
|
|
print("Exchanging media")
|
|
await asyncio.sleep(120000)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Janus")
|
|
parser.add_argument("url", help="Janus root URL, e.g. http://localhost:8088/janus")
|
|
parser.add_argument(
|
|
"--room",
|
|
type=int,
|
|
default=2671537157855710,
|
|
help="The video room ID to join (default: 1234).",
|
|
),
|
|
parser.add_argument("--record-to", help="Write received media to a file."),
|
|
args = parser.parse_args()
|
|
|
|
|
|
# create signaling and peer connection
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
|
|
items =[]
|
|
session = JanusSession(args.url)
|
|
loop.run_until_complete(run(room=args.room, session=session))
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
|
|
loop.run_until_complete(session.destroy())
|
|
|
|
# close peer connections
|
|
coros = [pc.close() for pc in pcs]
|
|
loop.run_until_complete(asyncio.gather(*coros)) |