wmsqlitemodernc

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package wmsqlitemodernc provides a SQLite Pub/Sub driver for Watermill compatible with the Golang standard library SQL package.

It works without CGO.

SQLite3 does not support querying `FOR UPDATE`, which is used for row locking when subscribers in the same consumer group read an event batch in official Watermill SQL PubSub implementations. Current architectural decision is to lock a consumer group offset using `unixepoch()+lockTimeout` time stamp. While one consumed message is processing per group, the offset lock time is extended by `lockTimeout` periodically by `time.Ticker`. If the subscriber is unable to finish the consumer group batch, other subscribers will take over the lock as soon as the grace period runs out. A time lock fulfills the role of a traditional database network timeout that terminates transactions when its client disconnects.

All the normal SQLite limitations apply to Watermill. The connections are file handles. Create new connections for concurrent processing. If you must share a connection, protect it with a mutual exclusion lock. If you are writing within a transaction, create a connection for that transaction only.

Index

Constants

View Source
const (
	// DefaultMessageBatchSize is the default number of messages
	// for [SubscriberOptions] that a subscription
	// will collect when consuming messages from the database.
	DefaultMessageBatchSize = 100

	// DefaultSubscriberLockTimeout is the default duration of the row lock
	// setting for [SubscriberOptions]. Must be in full seconds.
	DefaultSubscriberLockTimeout = 5 * time.Second

	// DefaultAckDeadline is the default duration of the message acknowledgement deadline
	// setting for [SubscriberOptions].
	DefaultAckDeadline = 30 * time.Second

	// DefaultConsumerGroupName is the default subscription
	// consumer group name.
	DefaultConsumerGroupName = "default"
)

Variables

This section is empty.

Functions

func NewExpiringKeyRepository

func NewExpiringKeyRepository(ctx context.Context, config ExpiringKeyRepositoryConfiguration) (_ middleware.ExpiringKeyRepository, err error)

NewExpiringKeyRepository creates a repository that tracks key duplicates within a certain time frame. Starts a context-bound background routine to clean up expired keys. Use as a configuration option for middleware.Deduplicator.

func NewPublisher

func NewPublisher(db SQLiteConnection, options PublisherOptions) (message.Publisher, error)

NewPublisher creates a message.Publisher instance from a connection interface which could be a database handle or a transaction.

func NewSubscriber

func NewSubscriber(db SQLiteDatabase, options SubscriberOptions) (message.Subscriber, error)

NewSubscriber creates a new subscriber with the given options.

Types

type ConsumerGroupMatcher

type ConsumerGroupMatcher interface {
	// MatchTopic returns a consumer group name
	// for a given topic. This name must follow the same
	// naming conventions as the topic name.
	MatchTopic(topic string) (consumerGroupName string, err error)
}

ConsumerGroupMatcher associates a subscriber with a consumer group based on the subscription topic name.

func NewStaticConsumerGroupMatcher

func NewStaticConsumerGroupMatcher(consumerGroupName string) ConsumerGroupMatcher

NewStaticConsumerGroupMatcher creates a new ConsumerGroupMatcher that

returns the same consumer group name for any topic.

type ConsumerGroupMatcherFunc

type ConsumerGroupMatcherFunc func(topic string) (consumerGroupName string, err error)

ConsumerGroupMatcherFunc is a convenience type that implements the ConsumerGroupMatcher interface.

func (ConsumerGroupMatcherFunc) MatchTopic

func (f ConsumerGroupMatcherFunc) MatchTopic(topic string) (consumerGroupName string, err error)

MatchTopic satisfies the ConsumerGroupMatcher interface.

type Error

type Error uint8

Error represents common errors that might occur during the SQLite driver configuration or operations.

const (
	// ErrUnknown indicates that operation failed due to an unknown reason.
	ErrUnknown Error = iota

	// ErrDatabaseConnectionIsNil indicates that configuration contained a nil database connection.
	ErrDatabaseConnectionIsNil

	// ErrPublisherIsClosed indicates that the publisher is closed and does not accept any more events.
	ErrPublisherIsClosed

	// ErrSubscriberIsClosed indicates that the subscriber is closed can no longer respond to events.
	ErrSubscriberIsClosed

	// ErrAttemptedTableInitializationWithinTransaction indicates that a database handle is a transaction
	// while trying to initialize SQLite tables. SQLite does not support table creation within transactions.
	// Attempting to create a table within a transaction can lead to data inconsistencies and errors.
	//
	// A transaction is probably used for single use event operations. Attempting to create a table
	// in such a scenario adds unnecessary overhead. Initialize the tables once when the application starts.
	ErrAttemptedTableInitializationWithinTransaction

	// ErrInvalidTopicName indicates that the topic name contains invalid characters.
	// Valid characters match the following regular expression pattern: `[^A-Za-z0-9\-\$\:\.\_]`.
	ErrInvalidTopicName
)

func (Error) Error

func (e Error) Error() string

type ExpiringKeyRepositoryConfiguration

type ExpiringKeyRepositoryConfiguration struct {
	// Database is SQLite3 database handle.
	Database *sql.DB

	// TableName is the name of the table used to store expiring keys.
	// The
	// Defaults to "watermill_expiring_keys".
	TableName string

	// Expiration is the duration after which a key is considered expired.
	// If lower than five milliseconds, it is set to five milliseconds.
	// Defaults to one minute.
	Expiration time.Duration

	// CleanUpInterval is the interval at which expired keys are cleaned up.
	// Defaults to 15 seconds.
	CleanUpInterval time.Duration

	// CleanUpLogger tracks any problems that might emerge when cleaning up expired keys.
	// Defaults to [watermill.NewStdLogger].
	CleanUpLogger watermill.LoggerAdapter
}

ExpiringKeyRepositoryConfiguration intializes the expiring key repository in NewExpiringKeyRepository constructor.

type PublisherOptions

type PublisherOptions struct {
	// TableNameGenerators is a set of functions that generate table names for topics and offsets.
	// Defaults to [TableNameGenerators.WithDefaultGeneratorsInsteadOfNils].
	TableNameGenerators TableNameGenerators

	// InitializeSchema enables initialization of schema database during publish.
	// Schema is initialized once per topic per publisher instance.
	// InitializeSchema is forbidden if using an ongoing transaction as database handle.
	// It could result in an implicit commit of the transaction by a CREATE TABLE statement.
	InitializeSchema bool

	// Logger reports message publishing errors and traces. Defaults value is [watermill.NopLogger].
	Logger watermill.LoggerAdapter
}

PublisherOptions configure message publishing behavior.

type SQLiteConnection

type SQLiteConnection interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

SQLiteDatabase is an SQLite database connection or a transaction.

type SQLiteDatabase

type SQLiteDatabase interface {
	SQLiteConnection
	BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error)
}

SQLiteDatabase is an interface that represents an SQLite database.

type SubscriberOptions

type SubscriberOptions struct {
	// ConsumerGroupMatcher differentiates message consumers within the same topic.
	// Messages are processed in batches.
	// Therefore, another subscriber with the same consumer group name may only obtain
	// messages whenever it is able to acquire the row lock.
	// Default value is a static consumer group matcher that
	// always returns [DefaultConsumerGroupName].
	ConsumerGroupMatcher ConsumerGroupMatcher

	// BatchSize is the number of messages to read in a single batch.
	// Default value is [DefaultMessageBatchSize].
	BatchSize int

	// TableNameGenerators is a set of functions that generate table names for topics and offsets.
	// Default value is [TableNameGenerators.WithDefaultGeneratorsInsteadOfNils].
	TableNameGenerators TableNameGenerators

	// PollInterval is the interval to wait between subsequent SELECT queries, if
	// no more messages were found in the database (Prefer using the BackoffManager instead).
	// Must be non-negative. Defaults to one second.
	PollInterval time.Duration

	// LockTimeout is the maximum duration of the row lock. If the subscription
	// is unable to extend the lock before this time out ends, the lock will expire.
	// Then, another subscriber in the same consumer group name may
	// acquire the lock and continue processing messages.
	//
	// Duration must not be less than one second, because seconds are added
	// to the SQLite `unixepoch` function, rounded to the nearest second.
	// A zero duration would create a lock that expires immediately.
	// There is no reason to set higher precision fractional duration,
	// because the lock timeout will rarely ever trigger in a healthy system.
	//
	// Normally, the row lock is set to zero after each batch of messages is processed.
	// LockTimeout might occur if a consuming node shuts down unexpectedly,
	// before it is able to complete processing a batch of messages. Only
	// in such rare cases the time out matters. And, it is better to set it
	// to a higher value in order to avoid unnecessary batch re-processing.
	// Therefore, a value lower than one second is impractical.
	//
	// Default value is [DefaultLockTimeout].
	LockTimeout time.Duration

	// AckDeadline is the time to wait for acking a message.
	// If message is not acked within this time, it will be nacked and re-delivered.
	//
	// When messages are read in bulk, this time is calculated for each message separately.
	//
	// If you want to disable the acknowledgement deadline, set it to 0.
	// Warning: when acknowledgement deadline is disabled, messages may block and
	// prevent the subscriber from accepting new messages.
	//
	// Must be non-negative. Default value is [DefaultAckDeadline].
	AckDeadline *time.Duration

	// InitializeSchema option enables initializing schema on making a subscription.
	InitializeSchema bool

	// Logger reports message consumption errors and traces.
	//
	// Defaults to [watermill.NopLogger].
	Logger watermill.LoggerAdapter
}

SubscriberOptions defines options for creating a subscriber. Every selection has a reasonable default value.

type TableNameGenerator

type TableNameGenerator func(topic string) string

TableNameGenerator creates a table name for a given topic either for a topic table or for offsets table.

type TableNameGenerators

type TableNameGenerators struct {
	Topic   TableNameGenerator
	Offsets TableNameGenerator
}

TableNameGenerators is a struct that holds two functions for generating topic and offsets table names. A [Publisher] and a [Subscriber] must use identical generators for topic and offsets tables in order to communicate with each other.

func (TableNameGenerators) WithDefaultGeneratorsInsteadOfNils

func (t TableNameGenerators) WithDefaultGeneratorsInsteadOfNils() TableNameGenerators

WithDefaultGeneratorsInsteadOfNils returns a TableNameGenerators with default generators for topic and offsets tables if they were left nil.