diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index 0dfbe1c026..1a7ddb2723 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -413,7 +413,31 @@ export class LogicalReplicationClient { return false; } - if (await this.#doesPublicationExist()) { + const publicationExists = await this.#doesPublicationExist(); + + if (publicationExists) { + // Validate the existing publication is correctly configured + const validationError = await this.#validatePublicationConfiguration(); + + if (validationError) { + this.logger.error("Publication exists but is misconfigured", { + name: this.options.name, + table: this.options.table, + slotName: this.options.slotName, + publicationName: this.options.publicationName, + error: validationError, + }); + + this.events.emit("error", new LogicalReplicationClientError(validationError)); + return false; + } + + this.logger.info("Publication exists and is correctly configured", { + name: this.options.name, + table: this.options.table, + publicationName: this.options.publicationName, + }); + return true; } @@ -459,6 +483,90 @@ export class LogicalReplicationClient { return res.rows[0].exists; } + async #validatePublicationConfiguration(): Promise { + if (!this.client) { + return "Cannot validate publication configuration: client not connected"; + } + + // Check if the publication has the correct table + const tablesRes = await this.client.query( + `SELECT schemaname, tablename + FROM pg_publication_tables + WHERE pubname = '${this.options.publicationName}';` + ); + + const tables = tablesRes.rows; + const expectedTable = this.options.table; + + // Check if the table is in the publication + const hasTable = tables.some( + (row) => row.tablename === expectedTable && row.schemaname === "public" + ); + + if (!hasTable) { + if (tables.length === 0) { + return `Publication '${this.options.publicationName}' exists but has NO TABLES configured. Expected table: "public.${expectedTable}". Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`; + } else { + const tableList = tables.map((t) => `"${t.schemaname}"."${t.tablename}"`).join(", "); + return `Publication '${this.options.publicationName}' exists but does not include the required table "public.${expectedTable}". Current tables: ${tableList}. Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`; + } + } + + // Check if the publication has the correct actions configured + if (this.options.publicationActions && this.options.publicationActions.length > 0) { + const actionsRes = await this.client.query( + `SELECT pubinsert, pubupdate, pubdelete, pubtruncate + FROM pg_publication + WHERE pubname = '${this.options.publicationName}';` + ); + + if (actionsRes.rows.length === 0) { + return `Publication '${this.options.publicationName}' not found when checking actions`; + } + + const actualActions = actionsRes.rows[0]; + const missingActions: string[] = []; + + for (const action of this.options.publicationActions) { + switch (action) { + case "insert": + if (!actualActions.pubinsert) missingActions.push("insert"); + break; + case "update": + if (!actualActions.pubupdate) missingActions.push("update"); + break; + case "delete": + if (!actualActions.pubdelete) missingActions.push("delete"); + break; + case "truncate": + if (!actualActions.pubtruncate) missingActions.push("truncate"); + break; + } + } + + if (missingActions.length > 0) { + const currentActions: string[] = []; + if (actualActions.pubinsert) currentActions.push("insert"); + if (actualActions.pubupdate) currentActions.push("update"); + if (actualActions.pubdelete) currentActions.push("delete"); + if (actualActions.pubtruncate) currentActions.push("truncate"); + + return `Publication '${ + this.options.publicationName + }' is missing required actions. Expected: [${this.options.publicationActions.join( + ", " + )}], Current: [${currentActions.join(", ")}], Missing: [${missingActions.join( + ", " + )}]. Run: ALTER PUBLICATION ${ + this.options.publicationName + } SET (publish = '${this.options.publicationActions.join(", ")}');`; + } + } + + // All validations passed + return null; + } + async #createSlot(): Promise { if (!this.client) { this.events.emit("error", new LogicalReplicationClientError("Cannot create slot"));