Store Bluesky firehose host as part of subscription state

The host has changed recently and the cursors are, apparently, incompatible with each
other, so we need to migrate to the new one, and this seems like the easiest way to
do it:

1. Store the host as part of the subscription state
2. Roll out the version that uses that new column as part of all queries
3. Switch to the new host
4. Roll out the version with the new host
   - The new instance will start processing messages with a 0 cursor and so start anew
   - The old instance will die off
This commit is contained in:
Aleksei Voronov 2023-11-05 18:57:03 +01:00
parent 3b03b11d58
commit 419f72f3bb
4 changed files with 18 additions and 9 deletions

View File

@ -0,0 +1,4 @@
ALTER TABLE SubscriptionState ADD COLUMN host TEXT DEFAULT 'wss://bsky.social';
ALTER TABLE SubscriptionState ALTER COLUMN host DROP DEFAULT;
ALTER TABLE SubscriptionState DROP CONSTRAINT subscriptionstate_service_key;
ALTER TABLE SubscriptionState ADD CONSTRAINT service_host_unique UNIQUE(service, host);

View File

@ -51,12 +51,12 @@ impl PostIndexer {
async fn process_from_last_point(&self) -> Result<()> { async fn process_from_last_point(&self) -> Result<()> {
let cursor = self let cursor = self
.database .database
.fetch_subscription_cursor(&self.config.feed_generator_did) .fetch_subscription_cursor(Bluesky::FIREHOSE_HOST, &self.config.feed_generator_did)
.await?; .await?;
if cursor.is_none() { if cursor.is_none() {
self.database self.database
.create_subscription_state(&self.config.feed_generator_did) .create_subscription_state(Bluesky::FIREHOSE_HOST, &self.config.feed_generator_did)
.await?; .await?;
} }
@ -106,7 +106,7 @@ impl CommitProcessor for PostIndexer {
self.config.feed_generator_did, commit.seq, commit.time self.config.feed_generator_did, commit.seq, commit.time
); );
self.database self.database
.update_subscription_cursor(&self.config.feed_generator_did, commit.seq) .update_subscription_cursor(Bluesky::FIREHOSE_HOST, &self.config.feed_generator_did, commit.seq)
.await?; .await?;
} }

View File

@ -23,8 +23,8 @@ pub struct Bluesky {
} }
impl Bluesky { impl Bluesky {
const XRPC_HOST: &str = "https://bsky.social"; pub const XRPC_HOST: &str = "https://bsky.social";
const FIREHOSE_HOST: &str = "wss://bsky.social"; pub const FIREHOSE_HOST: &str = "wss://bsky.social";
pub fn unauthenticated() -> Self { pub fn unauthenticated() -> Self {
Self { Self {

View File

@ -213,48 +213,53 @@ impl Database {
.await?) .await?)
} }
pub async fn fetch_subscription_cursor(&self, did: &str) -> Result<Option<i32>> { pub async fn fetch_subscription_cursor(&self, host: &str, did: &str) -> Result<Option<i32>> {
let mut params = Parameters::new(); let mut params = Parameters::new();
Ok(query( Ok(query(
&select("cursor") &select("cursor")
.from("SubscriptionState") .from("SubscriptionState")
.where_(format!("service = {}", params.next())) .where_(format!("service = {}", params.next()))
.where_(format!("host = {}", params.next()))
.to_string(), .to_string(),
) )
.bind(did) .bind(did)
.bind(host)
.map(|r: PgRow| r.get("cursor")) .map(|r: PgRow| r.get("cursor"))
.fetch_optional(&self.connection_pool) .fetch_optional(&self.connection_pool)
.await?) .await?)
} }
pub async fn create_subscription_state(&self, did: &str) -> Result<bool> { pub async fn create_subscription_state(&self, host: &str, did: &str) -> Result<bool> {
let mut params = Parameters::new(); let mut params = Parameters::new();
Ok(query( Ok(query(
&insert_into("SubscriptionState") &insert_into("SubscriptionState")
.columns(("service", "cursor")) .columns(("service", "cursor", "host"))
.values([params.next_array()]) .values([params.next_array()])
.to_string(), .to_string(),
) )
.bind(did) .bind(did)
.bind(0) .bind(0)
.bind(host)
.execute(&self.connection_pool) .execute(&self.connection_pool)
.await .await
.map(|result| result.rows_affected() > 0)?) .map(|result| result.rows_affected() > 0)?)
} }
pub async fn update_subscription_cursor(&self, did: &str, cursor: i32) -> Result<bool> { pub async fn update_subscription_cursor(&self, host: &str, did: &str, cursor: i32) -> Result<bool> {
let mut params = Parameters::new(); let mut params = Parameters::new();
Ok(query( Ok(query(
&update("SubscriptionState") &update("SubscriptionState")
.set("cursor", params.next()) .set("cursor", params.next())
.where_(format!("service = {}", params.next())) .where_(format!("service = {}", params.next()))
.where_(format!("host = {}", params.next()))
.to_string(), .to_string(),
) )
.bind(cursor) .bind(cursor)
.bind(did) .bind(did)
.bind(host)
.execute(&self.connection_pool) .execute(&self.connection_pool)
.await .await
.map(|result| result.rows_affected() > 0)?) .map(|result| result.rows_affected() > 0)?)