@ -16,274 +16,14 @@ class ProcessFeedService < BaseService
end
end
def process_entries ( xml , account )
def process_entries ( xml , account )
xml . xpath ( '//xmlns:entry' , xmlns : TagManager :: XMLNS ) . reverse_each . map { | entry | ProcessEntry . new . call ( entry , account ) } . compact
xml . xpath ( '//xmlns:entry' , xmlns : TagManager :: XMLNS ) . reverse_each . map { | entry | process_entry ( entry , account ) } . compact
end
end
class ProcessEntry
def process_entry ( xml , account )
def call ( xml , account )
activity = Ostatus :: Activity :: General . new ( xml , account )
@account = account
activity . specialize & . perform if activity . status?
@xml = xml
rescue ActiveRecord :: RecordInvalid = > e
Rails . logger . debug " Nothing was saved for #{ id } because: #{ e } "
return if skip_unsupported_type?
nil
case verb
when :post , :share
return create_status
when :delete
return delete_status
end
rescue ActiveRecord :: RecordInvalid = > e
Rails . logger . debug " Nothing was saved for #{ id } because: #{ e } "
nil
end
private
def create_status
if redis . exists ( " delete_upon_arrival: #{ @account . id } : #{ id } " )
Rails . logger . debug " Delete for status #{ id } was queued, ignoring "
return
end
status , just_created = nil
Rails . logger . debug " Creating remote status #{ id } "
if verb == :share
original_status = shared_status_from_xml ( @xml . at_xpath ( './/activity:object' , activity : TagManager :: AS_XMLNS ) )
return nil if original_status . nil?
end
ApplicationRecord . transaction do
status , just_created = status_from_xml ( @xml )
return if status . nil?
return status unless just_created
if verb == :share
status . reblog = original_status . reblog? ? original_status . reblog : original_status
end
status . save!
end
if thread? ( @xml ) && status . thread . nil?
Rails . logger . debug " Trying to attach #{ status . id } ( #{ id ( @xml ) } ) to #{ thread ( @xml ) . first } "
ThreadResolveWorker . perform_async ( status . id , thread ( @xml ) . second )
end
notify_about_mentions! ( status ) unless status . reblog?
notify_about_reblog! ( status ) if status . reblog? && status . reblog . account . local?
Rails . logger . debug " Queuing remote status #{ status . id } ( #{ id } ) for distribution "
LinkCrawlWorker . perform_async ( status . id ) unless status . spoiler_text?
DistributionWorker . perform_async ( status . id )
status
end
def notify_about_mentions! ( status )
status . mentions . includes ( :account ) . each do | mention |
mentioned_account = mention . account
next unless mentioned_account . local?
NotifyService . new . call ( mentioned_account , mention )
end
end
def notify_about_reblog! ( status )
NotifyService . new . call ( status . reblog . account , status )
end
def delete_status
Rails . logger . debug " Deleting remote status #{ id } "
status = Status . find_by ( uri : id , account : @account )
if status . nil?
redis . setex ( " delete_upon_arrival: #{ @account . id } : #{ id } " , 6 * 3_600 , id )
else
RemoveStatusService . new . call ( status )
end
end
def skip_unsupported_type?
! ( [ :post , :share , :delete ] . include? ( verb ) && [ :activity , :note , :comment ] . include? ( type ) )
end
def shared_status_from_xml ( entry )
status = find_status ( id ( entry ) )
return status unless status . nil?
FetchRemoteStatusService . new . call ( url ( entry ) )
end
def status_from_xml ( entry )
# Return early if status already exists in db
status = find_status ( id ( entry ) )
return [ status , false ] unless status . nil?
account = @account
return [ nil , false ] if account . suspended?
status = Status . create! (
uri : id ( entry ) ,
url : url ( entry ) ,
account : account ,
text : content ( entry ) ,
spoiler_text : content_warning ( entry ) ,
created_at : published ( entry ) ,
reply : thread? ( entry ) ,
language : content_language ( entry ) ,
visibility : visibility_scope ( entry ) ,
conversation : find_or_create_conversation ( entry ) ,
thread : thread? ( entry ) ? find_status ( thread ( entry ) . first ) : nil
)
mentions_from_xml ( status , entry )
hashtags_from_xml ( status , entry )
media_from_xml ( status , entry )
[ status , true ]
end
def find_or_create_conversation ( xml )
uri = xml . at_xpath ( './ostatus:conversation' , ostatus : TagManager :: OS_XMLNS ) & . attribute ( 'ref' ) & . content
return if uri . nil?
if TagManager . instance . local_id? ( uri )
local_id = TagManager . instance . unique_tag_to_local_id ( uri , 'Conversation' )
return Conversation . find_by ( id : local_id )
end
Conversation . find_by ( uri : uri ) || Conversation . create! ( uri : uri )
end
def find_status ( uri )
if TagManager . instance . local_id? ( uri )
local_id = TagManager . instance . unique_tag_to_local_id ( uri , 'Status' )
return Status . find_by ( id : local_id )
end
Status . find_by ( uri : uri )
end
def mentions_from_xml ( parent , xml )
processed_account_ids = [ ]
xml . xpath ( './xmlns:link[@rel="mentioned"]' , xmlns : TagManager :: XMLNS ) . each do | link |
next if [ TagManager :: TYPES [ :group ] , TagManager :: TYPES [ :collection ] ] . include? link [ 'ostatus:object-type' ]
mentioned_account = account_from_href ( link [ 'href' ] )
next if mentioned_account . nil? || processed_account_ids . include? ( mentioned_account . id )
mentioned_account . mentions . where ( status : parent ) . first_or_create ( status : parent )
# So we can skip duplicate mentions
processed_account_ids << mentioned_account . id
end
end
def account_from_href ( href )
url = Addressable :: URI . parse ( href ) . normalize
if TagManager . instance . web_domain? ( url . host )
Account . find_local ( url . path . gsub ( '/users/' , '' ) )
else
Account . where ( uri : href ) . or ( Account . where ( url : href ) ) . first || FetchRemoteAccountService . new . call ( href )
end
end
def hashtags_from_xml ( parent , xml )
tags = xml . xpath ( './xmlns:category' , xmlns : TagManager :: XMLNS ) . map { | category | category [ 'term' ] } . select ( & :present? )
ProcessHashtagsService . new . call ( parent , tags )
end
def media_from_xml ( parent , xml )
do_not_download = DomainBlock . find_by ( domain : parent . account . domain ) & . reject_media?
xml . xpath ( './xmlns:link[@rel="enclosure"]' , xmlns : TagManager :: XMLNS ) . each do | link |
next unless link [ 'href' ]
media = MediaAttachment . where ( status : parent , remote_url : link [ 'href' ] ) . first_or_initialize ( account : parent . account , status : parent , remote_url : link [ 'href' ] )
parsed_url = Addressable :: URI . parse ( link [ 'href' ] ) . normalize
next if ! %w( http https ) . include? ( parsed_url . scheme ) || parsed_url . host . empty?
media . save
next if do_not_download
begin
media . file_remote_url = link [ 'href' ]
media . save!
rescue ActiveRecord :: RecordInvalid
next
end
end
end
def id ( xml = @xml )
xml . at_xpath ( './xmlns:id' , xmlns : TagManager :: XMLNS ) . content
end
def verb ( xml = @xml )
raw = xml . at_xpath ( './activity:verb' , activity : TagManager :: AS_XMLNS ) . content
TagManager :: VERBS . key ( raw )
rescue
:post
end
def type ( xml = @xml )
raw = xml . at_xpath ( './activity:object-type' , activity : TagManager :: AS_XMLNS ) . content
TagManager :: TYPES . key ( raw )
rescue
:activity
end
def url ( xml = @xml )
link = xml . at_xpath ( './xmlns:link[@rel="alternate"]' , xmlns : TagManager :: XMLNS )
link . nil? ? nil : link [ 'href' ]
end
def content ( xml = @xml )
xml . at_xpath ( './xmlns:content' , xmlns : TagManager :: XMLNS ) . content
end
def content_language ( xml = @xml )
xml . at_xpath ( './xmlns:content' , xmlns : TagManager :: XMLNS ) [ 'xml:lang' ] & . presence || 'en'
end
def content_warning ( xml = @xml )
xml . at_xpath ( './xmlns:summary' , xmlns : TagManager :: XMLNS ) & . content || ''
end
def visibility_scope ( xml = @xml )
xml . at_xpath ( './mastodon:scope' , mastodon : TagManager :: MTDN_XMLNS ) & . content & . to_sym || :public
end
def published ( xml = @xml )
xml . at_xpath ( './xmlns:published' , xmlns : TagManager :: XMLNS ) . content
end
def thread? ( xml = @xml )
! xml . at_xpath ( './thr:in-reply-to' , thr : TagManager :: THR_XMLNS ) . nil?
end
def thread ( xml = @xml )
thr = xml . at_xpath ( './thr:in-reply-to' , thr : TagManager :: THR_XMLNS )
[ thr [ 'ref' ] , thr [ 'href' ] ]
end
def account? ( xml = @xml )
! xml . at_xpath ( './xmlns:author' , xmlns : TagManager :: XMLNS ) . nil?
end
def redis
Redis . current
end
end
end
end
end