@ -97,6 +97,8 @@ const startWorker = (workerId) => {
} ;
} ;
const app = express ( ) ;
const app = express ( ) ;
app . set ( 'trusted proxy' , process . env . TRUSTED _PROXY _IP || 'loopback,uniquelocal' ) ;
const pgPool = new pg . Pool ( Object . assign ( pgConfigs [ env ] , dbUrlToConfig ( process . env . DATABASE _URL ) ) ) ;
const pgPool = new pg . Pool ( Object . assign ( pgConfigs [ env ] , dbUrlToConfig ( process . env . DATABASE _URL ) ) ) ;
const server = http . createServer ( app ) ;
const server = http . createServer ( app ) ;
const redisNamespace = process . env . REDIS _NAMESPACE || null ;
const redisNamespace = process . env . REDIS _NAMESPACE || null ;
@ -177,6 +179,12 @@ const startWorker = (workerId) => {
next ( ) ;
next ( ) ;
} ;
} ;
const setRemoteAddress = ( req , res , next ) => {
req . remoteAddress = req . connection . remoteAddress ;
next ( ) ;
} ;
const accountFromToken = ( token , req , next ) => {
const accountFromToken = ( token , req , next ) => {
pgPool . connect ( ( err , client , done ) => {
pgPool . connect ( ( err , client , done ) => {
if ( err ) {
if ( err ) {
@ -208,17 +216,22 @@ const startWorker = (workerId) => {
} ) ;
} ) ;
} ;
} ;
const accountFromRequest = ( req , next ) => {
const accountFromRequest = ( req , next , required = true ) => {
const authorization = req . headers . authorization ;
const authorization = req . headers . authorization ;
const location = url . parse ( req . url , true ) ;
const location = url . parse ( req . url , true ) ;
const accessToken = location . query . access _token ;
const accessToken = location . query . access _token ;
if ( ! authorization && ! accessToken ) {
if ( ! authorization && ! accessToken ) {
if ( required ) {
const err = new Error ( 'Missing access token' ) ;
const err = new Error ( 'Missing access token' ) ;
err . statusCode = 401 ;
err . statusCode = 401 ;
next ( err ) ;
next ( err ) ;
return ;
return ;
} else {
next ( ) ;
return ;
}
}
}
const token = authorization ? authorization . replace ( /^Bearer / , '' ) : accessToken ;
const token = authorization ? authorization . replace ( /^Bearer / , '' ) : accessToken ;
@ -226,7 +239,17 @@ const startWorker = (workerId) => {
accountFromToken ( token , req , next ) ;
accountFromToken ( token , req , next ) ;
} ;
} ;
const PUBLIC _STREAMS = [
'public' ,
'public:local' ,
'hashtag' ,
'hashtag:local' ,
] ;
const wsVerifyClient = ( info , cb ) => {
const wsVerifyClient = ( info , cb ) => {
const location = url . parse ( info . req . url , true ) ;
const authRequired = ! PUBLIC _STREAMS . some ( stream => stream === location . query . stream ) ;
accountFromRequest ( info . req , err => {
accountFromRequest ( info . req , err => {
if ( ! err ) {
if ( ! err ) {
cb ( true , undefined , undefined ) ;
cb ( true , undefined , undefined ) ;
@ -234,16 +257,24 @@ const startWorker = (workerId) => {
log . error ( info . req . requestId , err . toString ( ) ) ;
log . error ( info . req . requestId , err . toString ( ) ) ;
cb ( false , 401 , 'Unauthorized' ) ;
cb ( false , 401 , 'Unauthorized' ) ;
}
}
} ) ;
} , authRequired ) ;
} ;
} ;
const PUBLIC _ENDPOINTS = [
'/api/v1/streaming/public' ,
'/api/v1/streaming/public/local' ,
'/api/v1/streaming/hashtag' ,
'/api/v1/streaming/hashtag/local' ,
] ;
const authenticationMiddleware = ( req , res , next ) => {
const authenticationMiddleware = ( req , res , next ) => {
if ( req . method === 'OPTIONS' ) {
if ( req . method === 'OPTIONS' ) {
next ( ) ;
next ( ) ;
return ;
return ;
}
}
accountFromRequest ( req , next ) ;
const authRequired = ! PUBLIC _ENDPOINTS . some ( endpoint => endpoint === req . path ) ;
accountFromRequest ( req , next , authRequired ) ;
} ;
} ;
const errorMiddleware = ( err , req , res , { } ) => {
const errorMiddleware = ( err , req , res , { } ) => {
@ -275,8 +306,10 @@ const startWorker = (workerId) => {
} ;
} ;
const streamFrom = ( id , req , output , attachCloseHandler , needsFiltering = false , notificationOnly = false ) => {
const streamFrom = ( id , req , output , attachCloseHandler , needsFiltering = false , notificationOnly = false ) => {
const accountId = req . accountId || req . remoteAddress ;
const streamType = notificationOnly ? ' (notification)' : '' ;
const streamType = notificationOnly ? ' (notification)' : '' ;
log . verbose ( req . requestId , ` Starting stream from ${ id } for ${ req . accountId } ${ streamType } ` ) ;
log . verbose ( req . requestId , ` Starting stream from ${ id } for ${ accountId } ${ streamType } ` ) ;
const listener = message => {
const listener = message => {
const { event , payload , queued _at } = JSON . parse ( message ) ;
const { event , payload , queued _at } = JSON . parse ( message ) ;
@ -286,7 +319,7 @@ const startWorker = (workerId) => {
const delta = now - queued _at ;
const delta = now - queued _at ;
const encodedPayload = typeof payload === 'object' ? JSON . stringify ( payload ) : payload ;
const encodedPayload = typeof payload === 'object' ? JSON . stringify ( payload ) : payload ;
log . silly ( req . requestId , ` Transmitting for ${ req . accountId } : ${ event } ${ encodedPayload } Delay: ${ delta } ms ` ) ;
log . silly ( req . requestId , ` Transmitting for ${ accountId } : ${ event } ${ encodedPayload } Delay: ${ delta } ms ` ) ;
output ( event , encodedPayload ) ;
output ( event , encodedPayload ) ;
} ;
} ;
@ -313,6 +346,7 @@ const startWorker = (workerId) => {
return ;
return ;
}
}
if ( ! req . accountId ) {
const queries = [
const queries = [
client . query ( ` SELECT 1 FROM blocks WHERE (account_id = $ 1 AND target_account_id IN ( ${ placeholders ( targetAccountIds , 2 ) } )) OR (account_id = $ 2 AND target_account_id = $ 1) UNION SELECT 1 FROM mutes WHERE account_id = $ 1 AND target_account_id IN ( ${ placeholders ( targetAccountIds , 2 ) } ) ` , [ req . accountId , unpackedPayload . account . id ] . concat ( targetAccountIds ) ) ,
client . query ( ` SELECT 1 FROM blocks WHERE (account_id = $ 1 AND target_account_id IN ( ${ placeholders ( targetAccountIds , 2 ) } )) OR (account_id = $ 2 AND target_account_id = $ 1) UNION SELECT 1 FROM mutes WHERE account_id = $ 1 AND target_account_id IN ( ${ placeholders ( targetAccountIds , 2 ) } ) ` , [ req . accountId , unpackedPayload . account . id ] . concat ( targetAccountIds ) ) ,
] ;
] ;
@ -333,6 +367,9 @@ const startWorker = (workerId) => {
done ( ) ;
done ( ) ;
log . error ( err ) ;
log . error ( err ) ;
} ) ;
} ) ;
} else {
transmit ( ) ;
}
} ) ;
} ) ;
} else {
} else {
transmit ( ) ;
transmit ( ) ;
@ -345,13 +382,15 @@ const startWorker = (workerId) => {
// Setup stream output to HTTP
// Setup stream output to HTTP
const streamToHttp = ( req , res ) => {
const streamToHttp = ( req , res ) => {
const accountId = req . accountId || req . remoteAddress ;
res . setHeader ( 'Content-Type' , 'text/event-stream' ) ;
res . setHeader ( 'Content-Type' , 'text/event-stream' ) ;
res . setHeader ( 'Transfer-Encoding' , 'chunked' ) ;
res . setHeader ( 'Transfer-Encoding' , 'chunked' ) ;
const heartbeat = setInterval ( ( ) => res . write ( ':thump\n' ) , 15000 ) ;
const heartbeat = setInterval ( ( ) => res . write ( ':thump\n' ) , 15000 ) ;
req . on ( 'close' , ( ) => {
req . on ( 'close' , ( ) => {
log . verbose ( req . requestId , ` Ending stream for ${ req . accountId } ` ) ;
log . verbose ( req . requestId , ` Ending stream for ${ accountId } ` ) ;
clearInterval ( heartbeat ) ;
clearInterval ( heartbeat ) ;
} ) ;
} ) ;
@ -383,8 +422,10 @@ const startWorker = (workerId) => {
// Setup stream end for WebSockets
// Setup stream end for WebSockets
const streamWsEnd = ( req , ws , closeHandler = false ) => ( id , listener ) => {
const streamWsEnd = ( req , ws , closeHandler = false ) => ( id , listener ) => {
const accountId = req . accountId || req . remoteAddress ;
ws . on ( 'close' , ( ) => {
ws . on ( 'close' , ( ) => {
log . verbose ( req . requestId , ` Ending stream for ${ req . accountId } ` ) ;
log . verbose ( req . requestId , ` Ending stream for ${ accountId } ` ) ;
unsubscribe ( id , listener ) ;
unsubscribe ( id , listener ) ;
if ( closeHandler ) {
if ( closeHandler ) {
closeHandler ( ) ;
closeHandler ( ) ;
@ -392,7 +433,7 @@ const startWorker = (workerId) => {
} ) ;
} ) ;
ws . on ( 'error' , ( ) => {
ws . on ( 'error' , ( ) => {
log . verbose ( req . requestId , ` Ending stream for ${ req . accountId } ` ) ;
log . verbose ( req . requestId , ` Ending stream for ${ accountId } ` ) ;
unsubscribe ( id , listener ) ;
unsubscribe ( id , listener ) ;
if ( closeHandler ) {
if ( closeHandler ) {
closeHandler ( ) ;
closeHandler ( ) ;
@ -401,6 +442,7 @@ const startWorker = (workerId) => {
} ;
} ;
app . use ( setRequestId ) ;
app . use ( setRequestId ) ;
app . use ( setRemoteAddress ) ;
app . use ( allowCrossDomain ) ;
app . use ( allowCrossDomain ) ;
app . use ( authenticationMiddleware ) ;
app . use ( authenticationMiddleware ) ;
app . use ( errorMiddleware ) ;
app . use ( errorMiddleware ) ;
@ -451,6 +493,7 @@ const startWorker = (workerId) => {
const req = ws . upgradeReq ;
const req = ws . upgradeReq ;
const location = url . parse ( req . url , true ) ;
const location = url . parse ( req . url , true ) ;
req . requestId = uuid . v4 ( ) ;
req . requestId = uuid . v4 ( ) ;
req . remoteAddress = ws . _socket . remoteAddress ;
ws . isAlive = true ;
ws . isAlive = true ;