@ -195,14 +195,14 @@ const startWorker = (workerId) => {
next ( ) ;
next ( ) ;
} ;
} ;
const accountFromToken = ( token , req , next ) => {
const accountFromToken = ( token , allowedScopes , req , next ) => {
pgPool . connect ( ( err , client , done ) => {
pgPool . connect ( ( err , client , done ) => {
if ( err ) {
if ( err ) {
next ( err ) ;
next ( err ) ;
return ;
return ;
}
}
client . query ( 'SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1' , [ token ] , ( err , result ) => {
client . query ( 'SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1' , [ token ] , ( err , result ) => {
done ( ) ;
done ( ) ;
if ( err ) {
if ( err ) {
@ -218,18 +218,29 @@ const startWorker = (workerId) => {
return ;
return ;
}
}
const scopes = result . rows [ 0 ] . scopes . split ( ' ' ) ;
if ( allowedScopes . size > 0 && ! scopes . some ( scope => allowedScopes . includes ( scope ) ) ) {
err = new Error ( 'Access token does not cover required scopes' ) ;
err . statusCode = 401 ;
next ( err ) ;
return ;
}
req . accountId = result . rows [ 0 ] . account _id ;
req . accountId = result . rows [ 0 ] . account _id ;
req . chosenLanguages = result . rows [ 0 ] . chosen _languages ;
req . chosenLanguages = result . rows [ 0 ] . chosen _languages ;
req . allowNotifications = scopes . some ( scope => [ 'read' , 'read:notifications' ] . includes ( scope ) ) ;
next ( ) ;
next ( ) ;
} ) ;
} ) ;
} ) ;
} ) ;
} ;
} ;
const accountFromRequest = ( req , next , required = true ) => {
const accountFromRequest = ( req , next , required = true , allowedScopes = [ 'read' ] ) => {
const authorization = req . headers . authorization ;
const authorization = req . headers . authorization ;
const location = url . parse ( req . url , true ) ;
const location = url . parse ( req . url , true ) ;
const accessToken = location . query . access _token ;
const accessToken = location . query . access _token || req . headers [ 'sec-websocket-protocol' ] ;
if ( ! authorization && ! accessToken ) {
if ( ! authorization && ! accessToken ) {
if ( required ) {
if ( required ) {
@ -246,7 +257,7 @@ const startWorker = (workerId) => {
const token = authorization ? authorization . replace ( /^Bearer / , '' ) : accessToken ;
const token = authorization ? authorization . replace ( /^Bearer / , '' ) : accessToken ;
accountFromToken ( token , req , next ) ;
accountFromToken ( token , allowedScopes , req , next ) ;
} ;
} ;
const PUBLIC _STREAMS = [
const PUBLIC _STREAMS = [
@ -261,6 +272,16 @@ const startWorker = (workerId) => {
const wsVerifyClient = ( info , cb ) => {
const wsVerifyClient = ( info , cb ) => {
const location = url . parse ( info . req . url , true ) ;
const location = url . parse ( info . req . url , true ) ;
const authRequired = ! PUBLIC _STREAMS . some ( stream => stream === location . query . stream ) ;
const authRequired = ! PUBLIC _STREAMS . some ( stream => stream === location . query . stream ) ;
const allowedScopes = [ ] ;
if ( authRequired ) {
allowedScopes . push ( 'read' ) ;
if ( location . query . stream === 'user:notification' ) {
allowedScopes . push ( 'read:notifications' ) ;
} else {
allowedScopes . push ( 'read:statuses' ) ;
}
}
accountFromRequest ( info . req , err => {
accountFromRequest ( info . req , err => {
if ( ! err ) {
if ( ! err ) {
@ -269,7 +290,7 @@ const startWorker = (workerId) => {
log . error ( info . req . requestId , err . toString ( ) ) ;
log . error ( info . req . requestId , err . toString ( ) ) ;
cb ( false , 401 , 'Unauthorized' ) ;
cb ( false , 401 , 'Unauthorized' ) ;
}
}
} , authRequired ) ;
} , authRequired , allowedScopes ) ;
} ;
} ;
const PUBLIC _ENDPOINTS = [
const PUBLIC _ENDPOINTS = [
@ -286,7 +307,18 @@ const startWorker = (workerId) => {
}
}
const authRequired = ! PUBLIC _ENDPOINTS . some ( endpoint => endpoint === req . path ) ;
const authRequired = ! PUBLIC _ENDPOINTS . some ( endpoint => endpoint === req . path ) ;
accountFromRequest ( req , next , authRequired ) ;
const allowedScopes = [ ] ;
if ( authRequired ) {
allowedScopes . push ( 'read' ) ;
if ( req . path === '/api/v1/streaming/user/notification' ) {
allowedScopes . push ( 'read:notifications' ) ;
} else {
allowedScopes . push ( 'read:statuses' ) ;
}
}
accountFromRequest ( req , next , authRequired , allowedScopes ) ;
} ;
} ;
const errorMiddleware = ( err , req , res , { } ) => {
const errorMiddleware = ( err , req , res , { } ) => {
@ -339,6 +371,10 @@ const startWorker = (workerId) => {
return ;
return ;
}
}
if ( event === 'notification' && ! req . allowNotifications ) {
return ;
}
// Only messages that may require filtering are statuses, since notifications
// Only messages that may require filtering are statuses, since notifications
// are already personalized and deletes do not matter
// are already personalized and deletes do not matter
if ( ! needsFiltering || event !== 'update' ) {
if ( ! needsFiltering || event !== 'update' ) {