|
|
@ -6,6 +6,7 @@ import pg from 'pg' |
|
|
|
import log from 'npmlog' |
|
|
|
import log from 'npmlog' |
|
|
|
import url from 'url' |
|
|
|
import url from 'url' |
|
|
|
import WebSocket from 'ws' |
|
|
|
import WebSocket from 'ws' |
|
|
|
|
|
|
|
import uuid from 'uuid' |
|
|
|
|
|
|
|
|
|
|
|
const env = process.env.NODE_ENV || 'development' |
|
|
|
const env = process.env.NODE_ENV || 'development' |
|
|
|
|
|
|
|
|
|
|
@ -43,6 +44,13 @@ const allowCrossDomain = (req, res, next) => { |
|
|
|
next() |
|
|
|
next() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const setRequestId = (req, res, next) => { |
|
|
|
|
|
|
|
req.requestId = uuid.v4() |
|
|
|
|
|
|
|
res.header('X-Request-Id', req.requestId) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
next() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const accountFromToken = (token, req, next) => { |
|
|
|
const accountFromToken = (token, req, next) => { |
|
|
|
pgPool.connect((err, client, done) => { |
|
|
|
pgPool.connect((err, client, done) => { |
|
|
|
if (err) { |
|
|
|
if (err) { |
|
|
@ -90,7 +98,7 @@ const authenticationMiddleware = (req, res, next) => { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const errorMiddleware = (err, req, res, next) => { |
|
|
|
const errorMiddleware = (err, req, res, next) => { |
|
|
|
log.error(err) |
|
|
|
log.error(req.requestId, err) |
|
|
|
res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }) |
|
|
|
res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }) |
|
|
|
res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' })) |
|
|
|
res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' })) |
|
|
|
} |
|
|
|
} |
|
|
@ -98,7 +106,7 @@ const errorMiddleware = (err, req, res, next) => { |
|
|
|
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); |
|
|
|
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); |
|
|
|
|
|
|
|
|
|
|
|
const streamFrom = (redisClient, id, req, output, needsFiltering = false) => { |
|
|
|
const streamFrom = (redisClient, id, req, output, needsFiltering = false) => { |
|
|
|
log.verbose(`Starting stream from ${id} for ${req.accountId}`) |
|
|
|
log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`) |
|
|
|
|
|
|
|
|
|
|
|
redisClient.on('message', (channel, message) => { |
|
|
|
redisClient.on('message', (channel, message) => { |
|
|
|
const { event, payload, queued_at } = JSON.parse(message) |
|
|
|
const { event, payload, queued_at } = JSON.parse(message) |
|
|
@ -107,7 +115,7 @@ const streamFrom = (redisClient, id, req, output, needsFiltering = false) => { |
|
|
|
const now = new Date().getTime() |
|
|
|
const now = new Date().getTime() |
|
|
|
const delta = now - queued_at; |
|
|
|
const delta = now - queued_at; |
|
|
|
|
|
|
|
|
|
|
|
log.silly(`Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`) |
|
|
|
log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`) |
|
|
|
output(event, payload) |
|
|
|
output(event, payload) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -154,7 +162,7 @@ const streamToHttp = (req, res, redisClient) => { |
|
|
|
const heartbeat = setInterval(() => res.write(':thump\n'), 15000) |
|
|
|
const heartbeat = setInterval(() => res.write(':thump\n'), 15000) |
|
|
|
|
|
|
|
|
|
|
|
req.on('close', () => { |
|
|
|
req.on('close', () => { |
|
|
|
log.verbose(`Ending stream for ${req.accountId}`) |
|
|
|
log.verbose(req.requestId, `Ending stream for ${req.accountId}`) |
|
|
|
clearInterval(heartbeat) |
|
|
|
clearInterval(heartbeat) |
|
|
|
redisClient.quit() |
|
|
|
redisClient.quit() |
|
|
|
}) |
|
|
|
}) |
|
|
@ -168,11 +176,16 @@ const streamToHttp = (req, res, redisClient) => { |
|
|
|
// Setup stream output to WebSockets
|
|
|
|
// Setup stream output to WebSockets
|
|
|
|
const streamToWs = (req, ws, redisClient) => { |
|
|
|
const streamToWs = (req, ws, redisClient) => { |
|
|
|
ws.on('close', () => { |
|
|
|
ws.on('close', () => { |
|
|
|
log.verbose(`Ending stream for ${req.accountId}`) |
|
|
|
log.verbose(req.requestId, `Ending stream for ${req.accountId}`) |
|
|
|
redisClient.quit() |
|
|
|
redisClient.quit() |
|
|
|
}) |
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
return (event, payload) => { |
|
|
|
return (event, payload) => { |
|
|
|
|
|
|
|
if (ws.readyState !== ws.OPEN) { |
|
|
|
|
|
|
|
log.error(req.requestId, 'Tried writing to closed socket') |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
ws.send(JSON.stringify({ event, payload })) |
|
|
|
ws.send(JSON.stringify({ event, payload })) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -184,6 +197,7 @@ const getRedisClient = () => redis.createClient({ |
|
|
|
password: process.env.REDIS_PASSWORD |
|
|
|
password: process.env.REDIS_PASSWORD |
|
|
|
}) |
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app.use(setRequestId) |
|
|
|
app.use(allowCrossDomain) |
|
|
|
app.use(allowCrossDomain) |
|
|
|
app.use(authenticationMiddleware) |
|
|
|
app.use(authenticationMiddleware) |
|
|
|
app.use(errorMiddleware) |
|
|
|
app.use(errorMiddleware) |
|
|
@ -206,11 +220,11 @@ app.get('/api/v1/streaming/hashtag', (req, res) => { |
|
|
|
wss.on('connection', ws => { |
|
|
|
wss.on('connection', ws => { |
|
|
|
const location = url.parse(ws.upgradeReq.url, true) |
|
|
|
const location = url.parse(ws.upgradeReq.url, true) |
|
|
|
const token = location.query.access_token |
|
|
|
const token = location.query.access_token |
|
|
|
const req = {} |
|
|
|
const req = { requestId: uuid.v4() } |
|
|
|
|
|
|
|
|
|
|
|
accountFromToken(token, req, err => { |
|
|
|
accountFromToken(token, req, err => { |
|
|
|
if (err) { |
|
|
|
if (err) { |
|
|
|
log.error(err) |
|
|
|
log.error(req.requestId, err) |
|
|
|
ws.close() |
|
|
|
ws.close() |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|