Skip to content

Commit 33558e7

Browse files
authored
Add xid8 (#45)
1 parent f9406f3 commit 33558e7

File tree

3 files changed

+39
-5
lines changed

3 files changed

+39
-5
lines changed

‎.github/workflows/master.yml‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ jobs:
88
uses: ThreeDotsLabs/watermill/.github/workflows/tests.yml@master
99
with:
1010
stress-tests: true
11+
runs-on: ubuntu-latest-16core

‎docker-compose.yml‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ services:
55
mysql:
66
image: mysql:8.0
77
restart: unless-stopped
8-
command: [ "--max_connections=5000" ]
8+
command: [ "--max_connections=50000" ]
99
ports:
1010
- 3306:3306
1111
environment:
@@ -15,7 +15,7 @@ services:
1515
postgres:
1616
image: postgres:15.3
1717
restart: unless-stopped
18-
command: postgres -c 'max_connections=5000'
18+
command: postgres -c 'max_connections=50000'
1919
ports:
2020
- 5432:5432
2121
environment:

‎pkg/sql/schema_adapter_postgresql.go‎

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package sql
22

33
import (
44
"database/sql"
5+
"database/sql/driver"
56
"encoding/json"
7+
"errors"
68
"fmt"
79
"hash/fnv"
10+
"strconv"
811
"strings"
912

1013
"github.com/ThreeDotsLabs/watermill/message"
11-
"github.com/jackc/pgx/v5/pgtype"
1214
)
1315

1416
// DefaultPostgreSQLSchema is a default implementation of SchemaAdapter based on PostgreSQL.
@@ -178,7 +180,7 @@ func (s DefaultPostgreSQLSchema) SelectQuery(params SelectQueryParams) (Query, e
178180
` + nextOffsetQuery.Query + `
179181
)
180182
181-
SELECT "offset", transaction_id, uuid, payload, metadata FROM ` + s.MessagesTable(params.Topic) + `
183+
SELECT "offset", transaction_id::text, uuid, payload, metadata FROM ` + s.MessagesTable(params.Topic) + `
182184
183185
WHERE
184186
(
@@ -203,7 +205,7 @@ func (s DefaultPostgreSQLSchema) SelectQuery(params SelectQueryParams) (Query, e
203205

204206
func (s DefaultPostgreSQLSchema) UnmarshalMessage(params UnmarshalMessageParams) (Row, error) {
205207
r := Row{}
206-
var transactionID pgtype.Uint64
208+
var transactionID XID8
207209

208210
err := params.Row.Scan(&r.Offset, &transactionID, &r.UUID, &r.Payload, &r.Metadata)
209211
if err != nil {
@@ -259,3 +261,34 @@ func DefaultSchemaInitializationLock(appName string) int {
259261
// PostgreSQL advisory locks use int32 internally
260262
return int(h.Sum32() & 0x7fffffff)
261263
}
264+
265+
type XID8 uint64
266+
267+
func (x *XID8) Scan(src interface{}) error {
268+
if src == nil {
269+
return errors.New("cannot scan nil value into XID8")
270+
}
271+
272+
switch v := src.(type) {
273+
case string:
274+
val, err := strconv.ParseUint(v, 10, 64)
275+
if err != nil {
276+
return err
277+
}
278+
*x = XID8(val)
279+
return nil
280+
case []byte:
281+
val, err := strconv.ParseUint(string(v), 10, 64)
282+
if err != nil {
283+
return err
284+
}
285+
*x = XID8(val)
286+
return nil
287+
default:
288+
return errors.New("unsupported Scan value type for XID8")
289+
}
290+
}
291+
292+
func (x *XID8) Value() (driver.Value, error) {
293+
return uint64(*x), nil
294+
}

0 commit comments

Comments
 (0)