Support pushing and receiving updates to poll tallies (#10209)
* Process incoming poll tallies update * Send Update on poll vote * Do not send Updates for a poll more often than once every 3 minutes * Include voters in people to notify of results update * Schedule closing poll worker on poll creation * Add new notification type for ending polls * Add front-end support for ended poll notifications * Fix UpdatePollSerializer * Fix Updates not being triggered by local votes * Fix tests failure * Fix web push notifications for closing polls * Minor cleanup * Notify voters of both remote and local polls when those close * Fix delivery of poll updates to mentioned accounts and votersmaster
parent
c11dff5049
commit
3a92885a86
@ -0,0 +1,27 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
class ActivityPub::UpdatePollSerializer < ActiveModel::Serializer |
||||
attributes :id, :type, :actor, :to |
||||
|
||||
has_one :object, serializer: ActivityPub::NoteSerializer |
||||
|
||||
def id |
||||
[ActivityPub::TagManager.instance.uri_for(object), '#updates/', object.poll.updated_at.to_i].join |
||||
end |
||||
|
||||
def type |
||||
'Update' |
||||
end |
||||
|
||||
def actor |
||||
ActivityPub::TagManager.instance.uri_for(object) |
||||
end |
||||
|
||||
def to |
||||
ActivityPub::TagManager.instance.to(object) |
||||
end |
||||
|
||||
def cc |
||||
ActivityPub::TagManager.instance.cc(object) |
||||
end |
||||
end |
@ -0,0 +1,64 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
class ActivityPub::ProcessPollService < BaseService |
||||
include JsonLdHelper |
||||
|
||||
def call(poll, json) |
||||
@json = json |
||||
return unless supported_context? && expected_type? |
||||
|
||||
previous_expires_at = poll.expires_at |
||||
|
||||
expires_at = begin |
||||
if @json['closed'].is_a?(String) |
||||
@json['closed'] |
||||
elsif !@json['closed'].nil? && !@json['closed'].is_a?(FalseClass) |
||||
Time.now.utc |
||||
else |
||||
@json['endTime'] |
||||
end |
||||
end |
||||
|
||||
items = begin |
||||
if @json['anyOf'].is_a?(Array) |
||||
@json['anyOf'] |
||||
else |
||||
@json['oneOf'] |
||||
end |
||||
end |
||||
|
||||
latest_options = items.map { |item| item['name'].presence || item['content'] } |
||||
|
||||
# If for some reasons the options were changed, it invalidates all previous |
||||
# votes, so we need to remove them |
||||
poll.votes.delete_all if latest_options != poll.options |
||||
|
||||
begin |
||||
poll.update!( |
||||
last_fetched_at: Time.now.utc, |
||||
expires_at: expires_at, |
||||
options: latest_options, |
||||
cached_tallies: items.map { |item| item.dig('replies', 'totalItems') || 0 } |
||||
) |
||||
rescue ActiveRecord::StaleObjectError |
||||
poll.reload |
||||
retry |
||||
end |
||||
|
||||
# If the poll had no expiration date set but now has, and people have voted, |
||||
# schedule a notification. |
||||
if previous_expires_at.nil? && poll.expires_at.present? && poll.votes.exists? |
||||
PollExpirationNotifyWorker.perform_at(poll.expires_at + 5.minutes, poll.id) |
||||
end |
||||
end |
||||
|
||||
private |
||||
|
||||
def supported_context? |
||||
super(@json) |
||||
end |
||||
|
||||
def expected_type? |
||||
equals_or_includes_any?(@json['type'], %w(Question)) |
||||
end |
||||
end |
@ -0,0 +1,62 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
class ActivityPub::DistributePollUpdateWorker |
||||
include Sidekiq::Worker |
||||
|
||||
sidekiq_options queue: 'push', unique: :until_executed, retry: 0 |
||||
|
||||
def perform(status_id) |
||||
@status = Status.find(status_id) |
||||
@account = @status.account |
||||
|
||||
return unless @status.poll |
||||
|
||||
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| |
||||
[payload, @account.id, inbox_url] |
||||
end |
||||
|
||||
relay! if relayable? |
||||
rescue ActiveRecord::RecordNotFound |
||||
true |
||||
end |
||||
|
||||
private |
||||
|
||||
def relayable? |
||||
@status.public_visibility? |
||||
end |
||||
|
||||
def inboxes |
||||
return @inboxes if defined?(@inboxes) |
||||
target_accounts = @status.mentions.map(&:account).reject(&:local?) |
||||
target_accounts += @status.reblogs.map(&:account).reject(&:local?) |
||||
target_accounts += @status.poll.votes.map(&:account).reject(&:local?) |
||||
target_accounts.uniq!(&:id) |
||||
@inboxes = target_accounts.select(&:activitypub?).pluck(&:inbox_url) |
||||
@inboxes += @account.followers.inboxes unless @status.direct_visibility? |
||||
@inboxes.uniq! |
||||
@inboxes |
||||
end |
||||
|
||||
def signed_payload |
||||
Oj.dump(ActivityPub::LinkedDataSignature.new(unsigned_payload).sign!(@account)) |
||||
end |
||||
|
||||
def unsigned_payload |
||||
ActiveModelSerializers::SerializableResource.new( |
||||
@status, |
||||
serializer: ActivityPub::UpdatePollSerializer, |
||||
adapter: ActivityPub::Adapter |
||||
).as_json |
||||
end |
||||
|
||||
def payload |
||||
@payload ||= @status.distributable? ? signed_payload : Oj.dump(unsigned_payload) |
||||
end |
||||
|
||||
def relay! |
||||
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url| |
||||
[payload, @account.id, inbox_url] |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,24 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
class PollExpirationNotifyWorker |
||||
include Sidekiq::Worker |
||||
|
||||
sidekiq_options unique: :until_executed |
||||
|
||||
def perform(poll_id) |
||||
poll = Poll.find(poll_id) |
||||
|
||||
# Notify poll owner and remote voters |
||||
if poll.local? |
||||
ActivityPub::DistributePollUpdateWorker.perform_async(poll.status.id) |
||||
NotifyService.new.call(poll.account, poll) |
||||
end |
||||
|
||||
# Notify local voters |
||||
poll.votes.includes(:account).map(&:account).filter(&:local?).each do |account| |
||||
NotifyService.new.call(account, poll) |
||||
end |
||||
rescue ActiveRecord::RecordNotFound |
||||
true |
||||
end |
||||
end |
Loading…
Reference in new issue