@@ -24,9 +24,15 @@ type DefaultPostgreSQLSchema struct {
2424}
2525
2626func (s DefaultPostgreSQLSchema ) SchemaInitializingQueries (topic string ) []Query {
27+ // theoretically this primary key allows duplicate offsets for same transaction_id
28+ // but in practice it should not happen with SERIAL, so we can keep one index and make it more
29+ // storage efficient
30+ //
31+ // it's intended that transaction_id is first in the index, because we are using it alone
32+ // in the WHERE clause
2733 createMessagesTable := `
2834 CREATE TABLE IF NOT EXISTS ` + s .MessagesTable (topic ) + ` (
29- "offset" SERIAL ,
35+ "offset" BIGSERIAL ,
3036 "uuid" VARCHAR(36) NOT NULL,
3137 "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
3238 "payload" JSON DEFAULT NULL,
@@ -78,7 +84,60 @@ func (s DefaultPostgreSQLSchema) SelectQuery(topic string, consumerGroup string,
7884 // Query inspired by https://event-driven.io/en/ordering_in_postgres_outbox/
7985
8086 nextOffsetQuery := offsetsAdapter .NextOffsetQuery (topic , consumerGroup )
87+
88+ // We are using subquery to avoid problems with query planner mis-estimating
89+ // and performing expensive index scans, read more:
90+ // - https://pganalyze.com/blog/5mins-postgres-planner-order-by-limit
91+ // - https://pganalyze.com/docs/explain/insights/mis-estimate
92+ //
93+ // Example slow query plan:
94+ //
95+ // Limit (cost=8.65..8.83 rows=1 width=32) (actual time=184.350..184.353 rows=1 loops=1)
96+ // CTE last_processed
97+ // -> LockRows (cost=0.14..8.17 rows=1 width=22) (actual time=1.572..1.574 rows=1 loops=1)
98+ // -> Index Scan using <table name>_offsets_pkey on <table name>_offsets (cost=0.14..8.16 rows=1 width=22) (actual time=1.418..1.419 rows=1 loops=1)
99+ // Index Cond: ((consumer_group)::text = ''::text)
100+ // InitPlan 2 (returns $2)
101+ // -> CTE Scan on last_processed (cost=0.00..0.02 rows=1 width=8) (actual time=1.583..1.585 rows=1 loops=1)
102+ // InitPlan 3 (returns $3)
103+ // -> CTE Scan on last_processed last_processed_1 (cost=0.00..0.02 rows=1 width=8) (actual time=0.006..0.007 rows=1 loops=1)
104+ // InitPlan 4 (returns $4)
105+ // -> CTE Scan on last_processed last_processed_2 (cost=0.00..0.02 rows=1 width=8) (actual time=0.000..0.001 rows=1 loops=1)
106+ // -> Index Scan using <index name> on <table name> (cost=0.42..14462.65 rows=80423 width=32) (actual time=184.348..184.348 rows=1 loops=1)
107+ // Index Cond: (transaction_id < pg_snapshot_xmin(pg_current_snapshot()))
108+ //" Filter: (((transaction_id = $2) AND (""offset"" > $3)) OR (transaction_id > $4))"
109+ // Rows Removed by Filter: 241157
110+ // Planning Time: 8.242 ms
111+ // Execution Time: 185.214 ms
112+ //
113+ // Example performant query plan:
114+ // Limit (cost=3138.06..3138.06 rows=1 width=32) (actual time=0.579..0.580 rows=1 loops=1)
115+ // -> Sort (cost=3138.06..3339.11 rows=80423 width=32) (actual time=0.577..0.579 rows=1 loops=1)
116+ //" Sort Key: <table name>.transaction_id, <table name>.""offset"""
117+ // Sort Method: top-N heapsort Memory: 25kB
118+ // -> Bitmap Heap Scan on <table name> (cost=85.36..1931.71 rows=80423 width=32) (actual time=0.231..0.530 rows=112 loops=1)
119+ //" Recheck Cond: (((transaction_id = $2) AND (transaction_id < pg_snapshot_xmin(pg_current_snapshot())) AND (""offset"" > $3)) OR ((transaction_id > $4) AND (transaction_id < pg_snapshot_xmin(pg_current_snapshot()))))"
120+ // Heap Blocks: exact=26
121+ // CTE last_processed
122+ // -> LockRows (cost=0.14..8.17 rows=1 width=22) (actual time=0.186..0.188 rows=1 loops=1)
123+ // -> Index Scan using <table name>_offsets_pkey on <table name>_offsets (cost=0.14..8.16 rows=1 width=22) (actual time=0.033..0.035 rows=1 loops=1)
124+ // Index Cond: ((consumer_group)::text = ''::text)
125+ // InitPlan 2 (returns $2)
126+ // -> CTE Scan on last_processed (cost=0.00..0.02 rows=1 width=8) (actual time=0.189..0.191 rows=1 loops=1)
127+ // InitPlan 3 (returns $3)
128+ // -> CTE Scan on last_processed last_processed_1 (cost=0.00..0.02 rows=1 width=8) (actual time=0.000..0.000 rows=1 loops=1)
129+ // InitPlan 4 (returns $4)
130+ // -> CTE Scan on last_processed last_processed_2 (cost=0.00..0.02 rows=1 width=8) (actual time=0.000..0.000 rows=1 loops=1)
131+ // -> BitmapOr (cost=77.12..77.12 rows=1207 width=0) (actual time=0.219..0.219 rows=0 loops=1)
132+ // -> Bitmap Index Scan on <index name> (cost=0.00..4.43 rows=1 width=0) (actual time=0.208..0.208 rows=0 loops=1)
133+ //" Index Cond: ((transaction_id = $2) AND (transaction_id < pg_snapshot_xmin(pg_current_snapshot())) AND (""offset"" > $3))"
134+ // -> Bitmap Index Scan on <index name> (cost=0.00..32.48 rows=1206 width=0) (actual time=0.010..0.011 rows=112 loops=1)
135+ // Index Cond: ((transaction_id > $4) AND (transaction_id < pg_snapshot_xmin(pg_current_snapshot())))
136+ //Planning Time: 1.365 ms
137+ //Execution Time: 0.786 ms
138+
81139 selectQuery := `
140+ SELECT * FROM (
82141 WITH last_processed AS (
83142 ` + nextOffsetQuery .Query + `
84143 )
@@ -97,10 +156,11 @@ func (s DefaultPostgreSQLSchema) SelectQuery(topic string, consumerGroup string,
97156 )
98157 AND
99158 transaction_id < pg_snapshot_xmin(pg_current_snapshot())
100- ORDER BY
101- transaction_id ASC,
102- "offset" ASC
103- LIMIT ` + fmt .Sprintf ("%d" , s .batchSize ())
159+ ) AS messages
160+ ORDER BY
161+ transaction_id ASC,
162+ "offset" ASC
163+ LIMIT ` + fmt .Sprintf ("%d" , s .batchSize ())
104164
105165 return Query {selectQuery , nextOffsetQuery .Args }
106166}
0 commit comments