From 419f72f3bb0cb14ab9e9a44b2a94375785021ac3 Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Sun, 5 Nov 2023 18:57:03 +0100 Subject: [PATCH] Store Bluesky firehose host as part of subscription state The host has changed recently and the cursors are, apparently, incompatible with each other, so we need to migrate to the new one, and this seems like the easiest way to do it: 1. Store the host as part of the subscription state 2. Roll out the version that uses that new column as part of all queries 3. Switch to the new host 4. Roll out the version with the new host - The new instance will start processing messages with a 0 cursor and so start anew - The old instance will die off --- sql/02_add_subscription_host.sql | 4 ++++ src/processes/post_indexer.rs | 6 +++--- src/services/bluesky/client.rs | 4 ++-- src/services/database.rs | 13 +++++++++---- 4 files changed, 18 insertions(+), 9 deletions(-) create mode 100644 sql/02_add_subscription_host.sql diff --git a/sql/02_add_subscription_host.sql b/sql/02_add_subscription_host.sql new file mode 100644 index 0000000..17fba92 --- /dev/null +++ b/sql/02_add_subscription_host.sql @@ -0,0 +1,4 @@ +ALTER TABLE SubscriptionState ADD COLUMN host TEXT DEFAULT 'wss://bsky.social'; +ALTER TABLE SubscriptionState ALTER COLUMN host DROP DEFAULT; +ALTER TABLE SubscriptionState DROP CONSTRAINT subscriptionstate_service_key; +ALTER TABLE SubscriptionState ADD CONSTRAINT service_host_unique UNIQUE(service, host); diff --git a/src/processes/post_indexer.rs b/src/processes/post_indexer.rs index 70e2a18..7fa054c 100644 --- a/src/processes/post_indexer.rs +++ b/src/processes/post_indexer.rs @@ -51,12 +51,12 @@ impl PostIndexer { async fn process_from_last_point(&self) -> Result<()> { let cursor = self .database - .fetch_subscription_cursor(&self.config.feed_generator_did) + .fetch_subscription_cursor(Bluesky::FIREHOSE_HOST, &self.config.feed_generator_did) .await?; if cursor.is_none() { self.database - .create_subscription_state(&self.config.feed_generator_did) + .create_subscription_state(Bluesky::FIREHOSE_HOST, &self.config.feed_generator_did) .await?; } @@ -106,7 +106,7 @@ impl CommitProcessor for PostIndexer { self.config.feed_generator_did, commit.seq, commit.time ); self.database - .update_subscription_cursor(&self.config.feed_generator_did, commit.seq) + .update_subscription_cursor(Bluesky::FIREHOSE_HOST, &self.config.feed_generator_did, commit.seq) .await?; } diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index edc1294..05fced3 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -23,8 +23,8 @@ pub struct Bluesky { } impl Bluesky { - const XRPC_HOST: &str = "https://bsky.social"; - const FIREHOSE_HOST: &str = "wss://bsky.social"; + pub const XRPC_HOST: &str = "https://bsky.social"; + pub const FIREHOSE_HOST: &str = "wss://bsky.social"; pub fn unauthenticated() -> Self { Self { diff --git a/src/services/database.rs b/src/services/database.rs index 4b99e4d..d0f9c97 100644 --- a/src/services/database.rs +++ b/src/services/database.rs @@ -213,48 +213,53 @@ impl Database { .await?) } - pub async fn fetch_subscription_cursor(&self, did: &str) -> Result> { + pub async fn fetch_subscription_cursor(&self, host: &str, did: &str) -> Result> { let mut params = Parameters::new(); Ok(query( &select("cursor") .from("SubscriptionState") .where_(format!("service = {}", params.next())) + .where_(format!("host = {}", params.next())) .to_string(), ) .bind(did) + .bind(host) .map(|r: PgRow| r.get("cursor")) .fetch_optional(&self.connection_pool) .await?) } - pub async fn create_subscription_state(&self, did: &str) -> Result { + pub async fn create_subscription_state(&self, host: &str, did: &str) -> Result { let mut params = Parameters::new(); Ok(query( &insert_into("SubscriptionState") - .columns(("service", "cursor")) + .columns(("service", "cursor", "host")) .values([params.next_array()]) .to_string(), ) .bind(did) .bind(0) + .bind(host) .execute(&self.connection_pool) .await .map(|result| result.rows_affected() > 0)?) } - pub async fn update_subscription_cursor(&self, did: &str, cursor: i32) -> Result { + pub async fn update_subscription_cursor(&self, host: &str, did: &str, cursor: i32) -> Result { let mut params = Parameters::new(); Ok(query( &update("SubscriptionState") .set("cursor", params.next()) .where_(format!("service = {}", params.next())) + .where_(format!("host = {}", params.next())) .to_string(), ) .bind(cursor) .bind(did) + .bind(host) .execute(&self.connection_pool) .await .map(|result| result.rows_affected() > 0)?)