Non-Serial ("Snowflake") IDs (#4801)

* Use non-serial IDs

This change makes a number of nontrivial tweaks to the data model in
Mastodon:

* All IDs are now 8 byte integers (rather than mixed 4- and 8-byte)
* IDs are now assigned as:
  * Top 6 bytes: millisecond-resolution time from epoch
  * Bottom 2 bytes: serial (within the millisecond) sequence number
  * See /lib/tasks/db.rake's `define_timestamp_id` for details, but
    note that the purpose of these changes is to make it difficult to
    determine the number of objects in a table from the ID of any
    object.
* The Redis sorted set used for the feed will have values used to look
  up toots, rather than scores. This is almost always the same as the
  existing behavior, except in the case of boosted toots. This change
  was made because Redis stores scores as double-precision floats,
  which cannot store the new ID format exactly. Note that this doesn't
  cause problems with sorting/pagination, because ZREVRANGEBYSCORE
  sorts lexicographically when scores are tied. (This will still cause
  sorting issues when the ID gains a new significant digit, but that's
  extraordinarily uncommon.)

Note a couple of tradeoffs have been made in this commit:

* lib/tasks/db.rake is used to enforce many/most column constraints,
  because this commit seems likely to take a while to bring upstream.
  Enforcing a post-migrate hook is an easier way to maintain the code
  in the interim.
* Boosted toots will appear in the timeline as many times as they have
  been boosted. This is a tradeoff due to the way the feed is saved in
  Redis at the moment, but will be handled by a future commit.

This would effectively close Mastodon's #1059, as it is a
snowflake-like system of generating IDs. However, given how involved
the changes were simply within Mastodon, it may have unexpected
interactions with some clients, if they store IDs as doubles
(or as 4-byte integers). This was a problem that Twitter ran into with
their "snowflake" transition, particularly in JavaScript clients that
treated IDs as JS integers, rather than strings. It therefore would be
useful to test these changes at least in the web interface and popular
clients before pushing them to all users.

* Fix JavaScript interface with long IDs

Somewhat predictably, the JS interface handled IDs as numbers, which in
JS are IEEE double-precision floats. This loses some precision when
working with numbers as large as those generated by the new ID scheme,
so we instead handle them here as strings. This is relatively simple,
and doesn't appear to have caused any problems, but should definitely
be tested more thoroughly than the built-in tests. Several days of use
appear to support this working properly.

BREAKING CHANGE:

The major(!) change here is that IDs are now returned as strings by the
REST endpoints, rather than as integers. In practice, relatively few
changes were required to make the existing JS UI work with this change,
but it will likely hit API clients pretty hard: it's an entirely
different type to consume. (The one API client I tested, Tusky, handles
this with no problems, however.)

Twitter ran into this issue when introducing Snowflake IDs, and decided
to instead introduce an `id_str` field in JSON responses. I have opted
to *not* do that, and instead force all IDs to 64-bit integers
represented by strings in one go. (I believe Twitter exacerbated their
problem by rolling out the changes three times: once for statuses, once
for DMs, and once for user IDs, as well as by leaving an integer ID
value in JSON. As they said, "If you’re using the `id` field with JSON
in a Javascript-related language, there is a very high likelihood that
the integers will be silently munged by Javascript interpreters. In most
cases, this will result in behavior such as being unable to load or
delete a specific direct message, because the ID you're sending to the
API is different than the actual identifier associated with the
message." [1]) However, given that this is a significant change for API
users, alternatives or a transition time may be appropriate.

1: https://blog.twitter.com/developer/en_us/a/2011/direct-messages-going-snowflake-on-sep-30-2011.html

* Restructure feed pushes/unpushes

This was necessary because the previous behavior used Redis zset scores
to identify statuses, but those are IEEE double-precision floats, so we
can't actually use them to identify all 64-bit IDs. However, it leaves
the code in a much better state for refactoring reblog handling /
coalescing.

Feed-management code has been consolidated in FeedManager, including:

* BatchedRemoveStatusService no longer directly manipulates feed zsets
* RemoveStatusService no longer directly manipulates feed zsets
* PrecomputeFeedService has moved its logic to FeedManager#populate_feed

(PrecomputeFeedService largely made lots of calls to FeedManager, but
didn't follow the normal adding-to-feed process.)

This has the effect of unifying all of the feed push/unpush logic in
FeedManager, making it much more tractable to update it in the future.

Due to some additional checks that must be made during, for example,
batch status removals, some Redis pipelining has been removed. It does
not appear that this should cause significantly increased load, but if
necessary, some optimizations are possible in batch cases. These were
omitted in the pursuit of simplicity, but a batch_push and batch_unpush
would be possible in the future.

Tests were added to verify that pushes happen under expected conditions,
and to verify reblog behavior (both on pushing and unpushing). In the
case of unpushing, this includes testing behavior that currently leads
to confusion such as Mastodon's #2817, but this codifies that the
behavior is currently expected.

* Rubocop fixes

I could swear I made these changes already, but I must have lost them
somewhere along the line.

* Address review comments

This addresses the first two comments from review of this feature:

https://github.com/tootsuite/mastodon/pull/4801#discussion_r139336735
https://github.com/tootsuite/mastodon/pull/4801#discussion_r139336931

This adds an optional argument to FeedManager#key, the subtype of feed
key to generate. It also tests to ensure that FeedManager's settings are
such that reblogs won't be tracked forever.

* Hardcode IdToBigints migration columns

This addresses a comment during review:
https://github.com/tootsuite/mastodon/pull/4801#discussion_r139337452

This means we'll need to make sure that all _id columns going forward
are bigints, but that should happen automatically in most cases.

* Additional fixes for stringified IDs in JSON

These should be the last two. These were identified using eslint to try
to identify any plain casts to JavaScript numbers. (Some such casts are
legitimate, but these were not.)

Adding the following to .eslintrc.yml will identify casts to numbers:

~~~
  no-restricted-syntax:
  - warn
  - selector: UnaryExpression[operator='+'] > :not(Literal)
    message: Avoid the use of unary +
  - selector: CallExpression[callee.name='Number']
    message: Casting with Number() may coerce string IDs to numbers
~~~

The remaining three casts appear legitimate: two casts to array indices,
one in a server to turn an environment variable into a number.

* Only implement timestamp IDs for Status IDs

Per discussion in #4801, this is only being merged in for Status IDs at
this point. We do this in a migration, as there is no longer use for
a post-migration hook. We keep the initialization of the timestamp_id
function as a Rake task, as it is also needed after db:schema:load (as
db/schema.rb doesn't store Postgres functions).

* Change internal streaming payloads to stringified IDs as well

This is equivalent to 591a9af356faf2d5c7e66e3ec715502796c875cd from
#5019, with an extra change for the addition to FeedManager#unpush.

* Ensure we have a status_id_seq sequence

Apparently this is not a given when specifying a custom ID function,
so now we ensure it gets created. This uses the generic version of this
function to more easily support adding additional tables with timestamp
IDs in the future, although it would be possible to cut this down to a
less generic version if necessary. It is only run during db:schema:load
or the relevant migration, so the overhead is extraordinarily minimal.

* Transition reblogs to new Redis format

This provides a one-way migration to transition old Redis reblog entries
into the new format, with a separate tracking entry for reblogs.

It is not invertible because doing so could (if timestamp IDs are used)
require a database query for each status in each users' feed, which is
likely to be a significant toll on major instances.

* Address review comments from @akihikodaki

No functional changes.

* Additional review changes

* Heredoc cleanup

* Run db:schema:load hooks for test in development

This matches the behavior in Rails'
ActiveRecord::Tasks::DatabaseTasks.each_current_configuration, which
would otherwise break `rake db:setup` in development.

It also moves some functionality out to a library, which will be a good
place to put additional related functionality in the near future.
master
aschmitz 7 years ago committed by Eugen Rochko
parent 2076c557c9
commit 468523f4ad
  1. 5
      app/controllers/api/v1/accounts/relationships_controller.rb
  2. 128
      app/lib/feed_manager.rb
  3. 2
      app/models/feed.rb
  4. 37
      app/services/batched_remove_status_service.rb
  5. 38
      app/services/precompute_feed_service.rb
  6. 8
      app/services/remove_status_service.rb
  7. 32
      db/migrate/20170920024819_status_ids_to_timestamp_ids.rb
  8. 63
      db/migrate/20170920032311_fix_reblogs_in_feeds.rb
  9. 2
      db/schema.rb
  10. 126
      lib/mastodon/timestamp_ids.rb
  11. 56
      lib/tasks/db.rake
  12. 109
      spec/lib/feed_manager_spec.rb
  13. 2
      spec/models/feed_spec.rb
  14. 3
      spec/services/batched_remove_status_service_spec.rb
  15. 2
      spec/services/precompute_feed_service_spec.rb

@ -7,7 +7,10 @@ class Api::V1::Accounts::RelationshipsController < Api::BaseController
respond_to :json
def index
@accounts = Account.where(id: account_ids).select('id')
accounts = Account.where(id: account_ids).select('id')
# .where doesn't guarantee that our results are in the same order
# we requested them, so return the "right" order to the requestor.
@accounts = accounts.index_by(&:id).values_at(*account_ids)
render json: @accounts, each_serializer: REST::RelationshipSerializer, relationships: relationships
end

@ -7,8 +7,13 @@ class FeedManager
MAX_ITEMS = 400
def key(type, id)
"feed:#{type}:#{id}"
# Must be <= MAX_ITEMS or the tracking sets will grow forever
REBLOG_FALLOFF = 40
def key(type, id, subtype = nil)
return "feed:#{type}:#{id}" unless subtype
"feed:#{type}:#{id}:#{subtype}"
end
def filter?(timeline_type, status, receiver_id)
@ -22,23 +27,36 @@ class FeedManager
end
def push(timeline_type, account, status)
timeline_key = key(timeline_type, account.id)
return false unless add_to_feed(timeline_type, account, status)
if status.reblog?
# If the original status is within 40 statuses from top, do not re-insert it into the feed
rank = redis.zrevrank(timeline_key, status.reblog_of_id)
return if !rank.nil? && rank < 40
redis.zadd(timeline_key, status.id, status.reblog_of_id)
else
redis.zadd(timeline_key, status.id, status.id)
trim(timeline_type, account.id)
end
trim(timeline_type, account.id)
PushUpdateWorker.perform_async(account.id, status.id) if push_update_required?(timeline_type, account.id)
true
end
def unpush(timeline_type, account, status)
return false unless remove_from_feed(timeline_type, account, status)
payload = Oj.dump(event: :delete, payload: status.id.to_s)
Redis.current.publish("timeline:#{account.id}", payload)
true
end
def trim(type, account_id)
redis.zremrangebyrank(key(type, account_id), '0', (-(FeedManager::MAX_ITEMS + 1)).to_s)
timeline_key = key(type, account_id)
reblog_key = key(type, account_id, 'reblogs')
# Remove any items past the MAX_ITEMS'th entry in our feed
redis.zremrangebyrank(timeline_key, '0', (-(FeedManager::MAX_ITEMS + 1)).to_s)
# Get the score of the REBLOG_FALLOFF'th item in our feed, and stop
# tracking anything after it for deduplication purposes.
falloff_rank = FeedManager::REBLOG_FALLOFF - 1
falloff_range = redis.zrevrange(timeline_key, falloff_rank, falloff_rank, with_scores: true)
falloff_score = falloff_range&.first&.last&.to_i || 0
redis.zremrangebyscore(reblog_key, 0, falloff_score)
end
def push_update_required?(timeline_type, account_id)
@ -54,11 +72,9 @@ class FeedManager
query = query.where('id > ?', oldest_home_score)
end
redis.pipelined do
query.each do |status|
next if status.direct_visibility? || filter?(:home, status, into_account)
redis.zadd(timeline_key, status.id, status.id)
end
query.each do |status|
next if status.direct_visibility? || filter?(:home, status, into_account)
add_to_feed(:home, into_account, status)
end
trim(:home, into_account.id)
@ -69,11 +85,8 @@ class FeedManager
oldest_home_score = redis.zrange(timeline_key, 0, 0, with_scores: true)&.first&.last&.to_i || 0
from_account.statuses.select('id').where('id > ?', oldest_home_score).reorder(nil).find_in_batches do |statuses|
redis.pipelined do
statuses.each do |status|
redis.zrem(timeline_key, status.id)
redis.zremrangebyscore(timeline_key, status.id, status.id)
end
statuses.each do |status|
unpush(:home, into_account, status)
end
end
end
@ -81,9 +94,20 @@ class FeedManager
def clear_from_timeline(account, target_account)
timeline_key = key(:home, account.id)
timeline_status_ids = redis.zrange(timeline_key, 0, -1)
target_status_ids = Status.where(id: timeline_status_ids, account: target_account).ids
target_statuses = Status.where(id: timeline_status_ids, account: target_account)
redis.zrem(timeline_key, target_status_ids) if target_status_ids.present?
target_statuses.each do |status|
unpush(:home, account, status)
end
end
def populate_feed(account)
prepopulate_limit = FeedManager::MAX_ITEMS / 4
statuses = Status.as_home_timeline(account).order(account_id: :desc).limit(prepopulate_limit)
statuses.reverse_each do |status|
next if filter_from_home?(status, account)
add_to_feed(:home, account, status)
end
end
private
@ -131,4 +155,58 @@ class FeedManager
should_filter
end
# Adds a status to an account's feed, returning true if a status was
# added, and false if it was not added to the feed. Note that this is
# an internal helper: callers must call trim or push updates if
# either action is appropriate.
def add_to_feed(timeline_type, account, status)
timeline_key = key(timeline_type, account.id)
reblog_key = key(timeline_type, account.id, 'reblogs')
if status.reblog?
# If the original status or a reblog of it is within
# REBLOG_FALLOFF statuses from the top, do not re-insert it into
# the feed
rank = redis.zrevrank(timeline_key, status.reblog_of_id)
return false if !rank.nil? && rank < FeedManager::REBLOG_FALLOFF
reblog_rank = redis.zrevrank(reblog_key, status.reblog_of_id)
return false unless reblog_rank.nil?
redis.zadd(timeline_key, status.id, status.id)
redis.zadd(reblog_key, status.id, status.reblog_of_id)
else
redis.zadd(timeline_key, status.id, status.id)
end
true
end
# Removes an individual status from a feed, correctly handling cases
# with reblogs, and returning true if a status was removed. As with
# `add_to_feed`, this does not trigger push updates, so callers must
# do so if appropriate.
def remove_from_feed(timeline_type, account, status)
timeline_key = key(timeline_type, account.id)
reblog_key = key(timeline_type, account.id, 'reblogs')
if status.reblog?
# 1. If the reblogging status is not in the feed, stop.
status_rank = redis.zrevrank(timeline_key, status.id)
return false if status_rank.nil?
# 2. Remove the reblogged status from the `:reblogs` zset.
redis.zrem(reblog_key, status.reblog_of_id)
# 3. Add the reblogged status to the feed using the reblogging
# status' ID as its score, and the reblogged status' ID as its
# value.
redis.zadd(timeline_key, status.id, status.reblog_of_id)
# 4. Remove the reblogging status from the feed (as normal)
end
redis.zrem(timeline_key, status.id)
end
end

@ -19,7 +19,7 @@ class Feed
def from_redis(limit, max_id, since_id)
max_id = '+inf' if max_id.blank?
since_id = '-inf' if since_id.blank?
unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:last).map(&:to_i)
unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:first).map(&:to_i)
Status.where(id: unhydrated).cache_ids
end

@ -29,7 +29,7 @@ class BatchedRemoveStatusService < BaseService
statuses.group_by(&:account_id).each do |_, account_statuses|
account = account_statuses.first.account
unpush_from_home_timelines(account_statuses)
unpush_from_home_timelines(account, account_statuses)
if account.local?
batch_stream_entries(account, account_statuses)
@ -72,14 +72,15 @@ class BatchedRemoveStatusService < BaseService
end
end
def unpush_from_home_timelines(statuses)
account = statuses.first.account
recipients = account.followers.local.pluck(:id)
def unpush_from_home_timelines(account, statuses)
recipients = account.followers.local.to_a
recipients << account.id if account.local?
recipients << account if account.local?
recipients.each do |follower_id|
unpush(follower_id, statuses)
recipients.each do |follower|
statuses.each do |status|
FeedManager.instance.unpush(:home, follower, status)
end
end
end
@ -109,28 +110,6 @@ class BatchedRemoveStatusService < BaseService
end
end
def unpush(follower_id, statuses)
key = FeedManager.instance.key(:home, follower_id)
originals = statuses.reject(&:reblog?)
reblogs = statuses.select(&:reblog?)
# Quickly remove all originals
redis.pipelined do
originals.each do |status|
redis.zremrangebyscore(key, status.id, status.id)
redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
end
end
# For reblogs, re-add original status to feed, unless the reblog
# was not in the feed in the first place
reblogs.each do |status|
redis.zadd(key, status.reblog_of_id, status.reblog_of_id) unless redis.zscore(key, status.reblog_of_id).nil?
redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
end
end
def redis
Redis.current
end

@ -1,43 +1,7 @@
# frozen_string_literal: true
class PrecomputeFeedService < BaseService
LIMIT = FeedManager::MAX_ITEMS / 4
def call(account)
@account = account
populate_feed
end
private
attr_reader :account
def populate_feed
pairs = statuses.reverse_each.lazy.reject(&method(:status_filtered?)).map(&method(:process_status)).to_a
redis.pipelined do
redis.zadd(account_home_key, pairs) if pairs.any?
redis.del("account:#{@account.id}:regeneration")
end
end
def process_status(status)
[status.id, status.reblog? ? status.reblog_of_id : status.id]
end
def status_filtered?(status)
FeedManager.instance.filter?(:home, status, account.id)
end
def account_home_key
FeedManager.instance.key(:home, account.id)
end
def statuses
Status.as_home_timeline(account).order(account_id: :desc).limit(LIMIT)
end
def redis
Redis.current
FeedManager.instance.populate_feed(account)
end
end

@ -102,13 +102,7 @@ class RemoveStatusService < BaseService
end
def unpush(type, receiver, status)
if status.reblog? && !redis.zscore(FeedManager.instance.key(type, receiver.id), status.reblog_of_id).nil?
redis.zadd(FeedManager.instance.key(type, receiver.id), status.reblog_of_id, status.reblog_of_id)
else
redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id)
end
Redis.current.publish("timeline:#{receiver.id}", @payload)
FeedManager.instance.unpush(type, receiver, status)
end
def remove_from_hashtags

@ -0,0 +1,32 @@
class StatusIdsToTimestampIds < ActiveRecord::Migration[5.1]
def up
# Prepare the function we will use to generate IDs.
Rake::Task['db:define_timestamp_id'].execute
# Set up the statuses.id column to use our timestamp-based IDs.
ActiveRecord::Base.connection.execute(<<~SQL)
ALTER TABLE statuses
ALTER COLUMN id
SET DEFAULT timestamp_id('statuses')
SQL
# Make sure we have a sequence to use.
Rake::Task['db:ensure_id_sequences_exist'].execute
end
def down
# Revert the column to the old method of just using the sequence
# value for new IDs. Set the current ID sequence to the maximum
# existing ID, such that the next sequence will be one higher.
# We lock the table during this so that the ID won't get clobbered,
# but ID is indexed, so this should be a fast operation.
ActiveRecord::Base.connection.execute(<<~SQL)
LOCK statuses;
SELECT setval('statuses_id_seq', (SELECT MAX(id) FROM statuses));
ALTER TABLE statuses
ALTER COLUMN id
SET DEFAULT nextval('statuses_id_seq');"
SQL
end
end

@ -0,0 +1,63 @@
class FixReblogsInFeeds < ActiveRecord::Migration[5.1]
def up
redis = Redis.current
fm = FeedManager.instance
# find_each is batched on the database side.
User.includes(:account).find_each do |user|
account = user.account
# Old scheme:
# Each user's feed zset had a series of score:value entries,
# where "regular" statuses had the same score and value (their
# ID). Reblogs had a score of the reblogging status' ID, and a
# value of the reblogged status' ID.
# New scheme:
# The feed contains only entries with the same score and value.
# Reblogs result in the reblogging status being added to the
# feed, with an entry in a reblog tracking zset (where the score
# is once again set to the reblogging status' ID, and the value
# is set to the reblogged status' ID). This is safe for Redis'
# float coersion because in this reblog tracking zset, we only
# need the rebloggging status' ID to be able to stop tracking
# entries after they have gotten too far down the feed, which
# does not require an exact value.
# So, first, we iterate over the user's feed to find any reblogs.
timeline_key = fm.key(:home, account.id)
reblog_key = fm.key(:home, account.id, 'reblogs')
redis.zrange(timeline_key, 0, -1, with_scores: true).each do |entry|
next if entry[0] == entry[1]
# The score and value don't match, so this is a reblog.
# (note that we're transitioning from IDs < 53 bits so we
# don't have to worry about the loss of precision)
reblogged_id, reblogging_id = entry
# Remove the old entry
redis.zrem(timeline_key, reblogged_id)
# Add a new one for the reblogging status
redis.zadd(timeline_key, reblogging_id, reblogging_id)
# Track the fact that this was a reblog
redis.zadd(reblog_key, reblogging_id, reblogged_id)
end
end
end
def down
# We *deliberately* do nothing here. This means that reverting
# this and the associated changes to the FeedManager code could
# allow one superfluous reblog of any given status, but in the case
# where things have gone wrong and a revert is necessary, this
# appears preferable to requiring a database hit for every status
# in every users' feed simply to revert.
# Note that this is operating under the assumption that entries
# with >53-bit IDs have already been entered. Otherwise, we could
# just use the data in Redis to reverse this transition.
end
end

@ -321,7 +321,7 @@ ActiveRecord::Schema.define(version: 20170927215609) do
t.index ["account_id", "status_id"], name: "index_status_pins_on_account_id_and_status_id", unique: true
end
create_table "statuses", force: :cascade do |t|
create_table "statuses", id: :bigint, default: -> { "timestamp_id('statuses'::text)" }, force: :cascade do |t|
t.string "uri"
t.text "text", default: "", null: false
t.datetime "created_at", null: false

@ -0,0 +1,126 @@
# frozen_string_literal: true
module Mastodon
module TimestampIds
def self.define_timestamp_id
conn = ActiveRecord::Base.connection
# Make sure we don't already have a `timestamp_id` function.
unless conn.execute(<<~SQL).values.first.first
SELECT EXISTS(
SELECT * FROM pg_proc WHERE proname = 'timestamp_id'
);
SQL
# The function doesn't exist, so we'll define it.
conn.execute(<<~SQL)
CREATE OR REPLACE FUNCTION timestamp_id(table_name text)
RETURNS bigint AS
$$
DECLARE
time_part bigint;
sequence_base bigint;
tail bigint;
BEGIN
-- Our ID will be composed of the following:
-- 6 bytes (48 bits) of millisecond-level timestamp
-- 2 bytes (16 bits) of sequence data
-- The 'sequence data' is intended to be unique within a
-- given millisecond, yet obscure the 'serial number' of
-- this row.
-- To do this, we hash the following data:
-- * Table name (if provided, skipped if not)
-- * Secret salt (should not be guessable)
-- * Timestamp (again, millisecond-level granularity)
-- We then take the first two bytes of that value, and add
-- the lowest two bytes of the table ID sequence number
-- (`table_name`_id_seq). This means that even if we insert
-- two rows at the same millisecond, they will have
-- distinct 'sequence data' portions.
-- If this happens, and an attacker can see both such IDs,
-- they can determine which of the two entries was inserted
-- first, but not the total number of entries in the table
-- (even mod 2**16).
-- The table name is included in the hash to ensure that
-- different tables derive separate sequence bases so rows
-- inserted in the same millisecond in different tables do
-- not reveal the table ID sequence number for one another.
-- The secret salt is included in the hash to ensure that
-- external users cannot derive the sequence base given the
-- timestamp and table name, which would allow them to
-- compute the table ID sequence number.
time_part := (
-- Get the time in milliseconds
((date_part('epoch', now()) * 1000))::bigint
-- And shift it over two bytes
<< 16);
sequence_base := (
'x' ||
-- Take the first two bytes (four hex characters)
substr(
-- Of the MD5 hash of the data we documented
md5(table_name ||
'#{SecureRandom.hex(16)}' ||
time_part::text
),
1, 4
)
-- And turn it into a bigint
)::bit(16)::bigint;
-- Finally, add our sequence number to our base, and chop
-- it to the last two bytes
tail := (
(sequence_base + nextval(table_name || '_id_seq'))
& 65535);
-- Return the time part and the sequence part. OR appears
-- faster here than addition, but they're equivalent:
-- time_part has no trailing two bytes, and tail is only
-- the last two bytes.
RETURN time_part | tail;
END
$$ LANGUAGE plpgsql VOLATILE;
SQL
end
end
def self.ensure_id_sequences_exist
conn = ActiveRecord::Base.connection
# Find tables using timestamp IDs.
default_regex = /timestamp_id\('(?<seq_prefix>\w+)'/
conn.tables.each do |table|
# We're only concerned with "id" columns.
next unless (id_col = conn.columns(table).find { |col| col.name == 'id' })
# And only those that are using timestamp_id.
next unless (data = default_regex.match(id_col.default_function))
seq_name = data[:seq_prefix] + '_id_seq'
# If we were on Postgres 9.5+, we could do CREATE SEQUENCE IF
# NOT EXISTS, but we can't depend on that. Instead, catch the
# possible exception and ignore it.
# Note that seq_name isn't a column name, but it's a
# relation, like a column, and follows the same quoting rules
# in Postgres.
conn.execute(<<~SQL)
DO $$
BEGIN
CREATE SEQUENCE #{conn.quote_column_name(seq_name)};
EXCEPTION WHEN duplicate_table THEN
-- Do nothing, we have the sequence already.
END
$$ LANGUAGE plpgsql;
SQL
end
end
end
end

@ -1,5 +1,36 @@
# frozen_string_literal: true
require Rails.root.join('lib', 'mastodon', 'timestamp_ids')
def each_schema_load_environment
# If we're in development, also run this for the test environment.
# This is a somewhat hacky way to do this, so here's why:
# 1. We have to define this before we load the schema, or we won't
# have a timestamp_id function when we get to it in the schema.
# 2. db:setup calls db:schema:load_if_ruby, which calls
# db:schema:load, which we define above as having a prerequisite
# of this task.
# 3. db:schema:load ends up running
# ActiveRecord::Tasks::DatabaseTasks.load_schema_current, which
# calls a private method `each_current_configuration`, which
# explicitly also does the loading for the `test` environment
# if the current environment is `development`, so we end up
# needing to do the same, and we can't even use the same method
# to do it.
if Rails.env == 'development'
test_conf = ActiveRecord::Base.configurations['test']
if test_conf['database']&.present?
ActiveRecord::Base.establish_connection(:test)
yield
ActiveRecord::Base.establish_connection(Rails.env.to_sym)
end
end
yield
end
namespace :db do
namespace :migrate do
desc 'Setup the db or migrate depending on state of db'
@ -16,4 +47,29 @@ namespace :db do
end
end
end
# Before we load the schema, define the timestamp_id function.
# Idiomatically, we might do this in a migration, but then it
# wouldn't end up in schema.rb, so we'd need to figure out a way to
# get it in before doing db:setup as well. This is simpler, and
# ensures it's always in place.
Rake::Task['db:schema:load'].enhance ['db:define_timestamp_id']
# After we load the schema, make sure we have sequences for each
# table using timestamp IDs.
Rake::Task['db:schema:load'].enhance do
Rake::Task['db:ensure_id_sequences_exist'].invoke
end
task :define_timestamp_id do
each_schema_load_environment do
Mastodon::TimestampIds.define_timestamp_id
end
end
task :ensure_id_sequences_exist do
each_schema_load_environment do
Mastodon::TimestampIds.ensure_id_sequences_exist
end
end
end

@ -1,6 +1,10 @@
require 'rails_helper'
RSpec.describe FeedManager do
it 'tracks at least as many statuses as reblogs' do
expect(FeedManager::REBLOG_FALLOFF).to be <= FeedManager::MAX_ITEMS
end
describe '#key' do
subject { FeedManager.instance.key(:home, 1) }
@ -150,5 +154,110 @@ RSpec.describe FeedManager do
expect(Redis.current.zcard("feed:type:#{account.id}")).to eq FeedManager::MAX_ITEMS
end
it 'sends push updates for non-home timelines' do
account = Fabricate(:account)
status = Fabricate(:status)
allow(Redis.current).to receive_messages(publish: nil)
FeedManager.instance.push('type', account, status)
expect(Redis.current).to have_received(:publish).with("timeline:#{account.id}", any_args).at_least(:once)
end
context 'reblogs' do
it 'saves reblogs of unseen statuses' do
account = Fabricate(:account)
reblogged = Fabricate(:status)
reblog = Fabricate(:status, reblog: reblogged)
expect(FeedManager.instance.push('type', account, reblog)).to be true
end
it 'does not save a new reblog of a recent status' do
account = Fabricate(:account)
reblogged = Fabricate(:status)
reblog = Fabricate(:status, reblog: reblogged)
FeedManager.instance.push('type', account, reblogged)
expect(FeedManager.instance.push('type', account, reblog)).to be false
end
it 'saves a new reblog of an old status' do
account = Fabricate(:account)
reblogged = Fabricate(:status)
reblog = Fabricate(:status, reblog: reblogged)
FeedManager.instance.push('type', account, reblogged)
# Fill the feed with intervening statuses
FeedManager::REBLOG_FALLOFF.times do
FeedManager.instance.push('type', account, Fabricate(:status))
end
expect(FeedManager.instance.push('type', account, reblog)).to be true
end
it 'does not save a new reblog of a recently-reblogged status' do
account = Fabricate(:account)
reblogged = Fabricate(:status)
reblogs = 2.times.map { Fabricate(:status, reblog: reblogged) }
# The first reblog will be accepted
FeedManager.instance.push('type', account, reblogs.first)
# The second reblog should be ignored
expect(FeedManager.instance.push('type', account, reblogs.last)).to be false
end
it 'saves a new reblog of a long-ago-reblogged status' do
account = Fabricate(:account)
reblogged = Fabricate(:status)
reblogs = 2.times.map { Fabricate(:status, reblog: reblogged) }
# The first reblog will be accepted
FeedManager.instance.push('type', account, reblogs.first)
# Fill the feed with intervening statuses
FeedManager::REBLOG_FALLOFF.times do
FeedManager.instance.push('type', account, Fabricate(:status))
end
# The second reblog should also be accepted
expect(FeedManager.instance.push('type', account, reblogs.last)).to be true
end
end
end
describe '#unpush' do
it 'leaves a reblogged status when deleting the reblog' do
account = Fabricate(:account)
reblogged = Fabricate(:status)
status = Fabricate(:status, reblog: reblogged)
FeedManager.instance.push('type', account, status)
# The reblogging status should show up under normal conditions.
expect(Redis.current.zrange("feed:type:#{account.id}", 0, -1)).to eq [status.id.to_s]
FeedManager.instance.unpush('type', account, status)
# Because we couldn't tell if the status showed up any other way,
# we had to stick the reblogged status in by itself.
expect(Redis.current.zrange("feed:type:#{account.id}", 0, -1)).to eq [reblogged.id.to_s]
end
it 'sends push updates' do
account = Fabricate(:account)
status = Fabricate(:status)
FeedManager.instance.push('type', account, status)
allow(Redis.current).to receive_messages(publish: nil)
FeedManager.instance.unpush('type', account, status)
deletion = Oj.dump(event: :delete, payload: status.id.to_s)
expect(Redis.current).to have_received(:publish).with("timeline:#{account.id}", deletion)
end
end
end

@ -9,7 +9,7 @@ RSpec.describe Feed, type: :model do
Fabricate(:status, account: account, id: 3)
Fabricate(:status, account: account, id: 10)
Redis.current.zadd(FeedManager.instance.key(:home, account.id),
[[4, 'deleted'], [3, 'val3'], [2, 'val2'], [1, 'val1']])
[[4, 4], [3, 3], [2, 2], [1, 1]])
feed = Feed.new(:home, account)
results = feed.get(3)

@ -5,7 +5,7 @@ RSpec.describe BatchedRemoveStatusService do
let!(:alice) { Fabricate(:account) }
let!(:bob) { Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://example.com/salmon') }
let!(:jeff) { Fabricate(:account) }
let!(:jeff) { Fabricate(:user).account }
let!(:hank) { Fabricate(:account, username: 'hank', protocol: :activitypub, domain: 'example.com', inbox_url: 'http://example.com/inbox') }
let(:status1) { PostStatusService.new.call(alice, 'Hello @bob@example.com') }
@ -19,6 +19,7 @@ RSpec.describe BatchedRemoveStatusService do
stub_request(:post, 'http://example.com/inbox').to_return(status: 200)
Fabricate(:subscription, account: alice, callback_url: 'http://example.com/push', confirmed: true, expires_at: 30.days.from_now)
jeff.user.update(current_sign_in_at: Time.now)
jeff.follow!(alice)
hank.follow!(alice)

@ -16,7 +16,7 @@ RSpec.describe PrecomputeFeedService do
subject.call(account)
expect(Redis.current.zscore(FeedManager.instance.key(:home, account.id), reblog.id)).to eq status.id
expect(Redis.current.zscore(FeedManager.instance.key(:home, account.id), reblog.id)).to eq status.id.to_f
end
it 'does not raise an error even if it could not find any status' do

Loading…
Cancel
Save