diff --git a/sql/02_add_subscription_host.sql b/sql/02_add_subscription_host.sql new file mode 100644 index 0000000..17fba92 --- /dev/null +++ b/sql/02_add_subscription_host.sql @@ -0,0 +1,4 @@ +ALTER TABLE SubscriptionState ADD COLUMN host TEXT DEFAULT 'wss://bsky.social'; +ALTER TABLE SubscriptionState ALTER COLUMN host DROP DEFAULT; +ALTER TABLE SubscriptionState DROP CONSTRAINT subscriptionstate_service_key; +ALTER TABLE SubscriptionState ADD CONSTRAINT service_host_unique UNIQUE(service, host); diff --git a/src/processes/post_indexer.rs b/src/processes/post_indexer.rs index 70e2a18..7fa054c 100644 --- a/src/processes/post_indexer.rs +++ b/src/processes/post_indexer.rs @@ -51,12 +51,12 @@ impl PostIndexer { async fn process_from_last_point(&self) -> Result<()> { let cursor = self .database - .fetch_subscription_cursor(&self.config.feed_generator_did) + .fetch_subscription_cursor(Bluesky::FIREHOSE_HOST, &self.config.feed_generator_did) .await?; if cursor.is_none() { self.database - .create_subscription_state(&self.config.feed_generator_did) + .create_subscription_state(Bluesky::FIREHOSE_HOST, &self.config.feed_generator_did) .await?; } @@ -106,7 +106,7 @@ impl CommitProcessor for PostIndexer { self.config.feed_generator_did, commit.seq, commit.time ); self.database - .update_subscription_cursor(&self.config.feed_generator_did, commit.seq) + .update_subscription_cursor(Bluesky::FIREHOSE_HOST, &self.config.feed_generator_did, commit.seq) .await?; } diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index edc1294..05fced3 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -23,8 +23,8 @@ pub struct Bluesky { } impl Bluesky { - const XRPC_HOST: &str = "https://bsky.social"; - const FIREHOSE_HOST: &str = "wss://bsky.social"; + pub const XRPC_HOST: &str = "https://bsky.social"; + pub const FIREHOSE_HOST: &str = "wss://bsky.social"; pub fn unauthenticated() -> Self { Self { diff --git a/src/services/database.rs b/src/services/database.rs index 4b99e4d..d0f9c97 100644 --- a/src/services/database.rs +++ b/src/services/database.rs @@ -213,48 +213,53 @@ impl Database { .await?) } - pub async fn fetch_subscription_cursor(&self, did: &str) -> Result> { + pub async fn fetch_subscription_cursor(&self, host: &str, did: &str) -> Result> { let mut params = Parameters::new(); Ok(query( &select("cursor") .from("SubscriptionState") .where_(format!("service = {}", params.next())) + .where_(format!("host = {}", params.next())) .to_string(), ) .bind(did) + .bind(host) .map(|r: PgRow| r.get("cursor")) .fetch_optional(&self.connection_pool) .await?) } - pub async fn create_subscription_state(&self, did: &str) -> Result { + pub async fn create_subscription_state(&self, host: &str, did: &str) -> Result { let mut params = Parameters::new(); Ok(query( &insert_into("SubscriptionState") - .columns(("service", "cursor")) + .columns(("service", "cursor", "host")) .values([params.next_array()]) .to_string(), ) .bind(did) .bind(0) + .bind(host) .execute(&self.connection_pool) .await .map(|result| result.rows_affected() > 0)?) } - pub async fn update_subscription_cursor(&self, did: &str, cursor: i32) -> Result { + pub async fn update_subscription_cursor(&self, host: &str, did: &str, cursor: i32) -> Result { let mut params = Parameters::new(); Ok(query( &update("SubscriptionState") .set("cursor", params.next()) .where_(format!("service = {}", params.next())) + .where_(format!("host = {}", params.next())) .to_string(), ) .bind(cursor) .bind(did) + .bind(host) .execute(&self.connection_pool) .await .map(|result| result.rows_affected() > 0)?)