Documentation
¶
Overview ¶
Package wmsqlitemodernc provides a SQLite Pub/Sub driver for Watermill compatible with the Golang standard library SQL package.
It works without CGO.
SQLite3 does not support querying `FOR UPDATE`, which is used for row locking when subscribers in the same consumer group read an event batch in official Watermill SQL PubSub implementations. Current architectural decision is to lock a consumer group offset using `unixepoch()+lockTimeout` time stamp. While one consumed message is processing per group, the offset lock time is extended by `lockTimeout` periodically by `time.Ticker`. If the subscriber is unable to finish the consumer group batch, other subscribers will take over the lock as soon as the grace period runs out. A time lock fulfills the role of a traditional database network timeout that terminates transactions when its client disconnects.
All the normal SQLite limitations apply to Watermill. The connections are file handles. Create new connections for concurrent processing. If you must share a connection, protect it with a mutual exclusion lock. If you are writing within a transaction, create a connection for that transaction only.
Index ¶
- Constants
- func NewExpiringKeyRepository(ctx context.Context, config ExpiringKeyRepositoryConfiguration) (_ middleware.ExpiringKeyRepository, err error)
- func NewPublisher(db SQLiteConnection, options PublisherOptions) (message.Publisher, error)
- func NewSubscriber(db SQLiteDatabase, options SubscriberOptions) (message.Subscriber, error)
- type ConsumerGroupMatcher
- type ConsumerGroupMatcherFunc
- type Error
- type ExpiringKeyRepositoryConfiguration
- type PublisherOptions
- type SQLiteConnection
- type SQLiteDatabase
- type SubscriberOptions
- type TableNameGenerator
- type TableNameGenerators
Constants ¶
const ( // DefaultMessageBatchSize is the default number of messages // for [SubscriberOptions] that a subscription // will collect when consuming messages from the database. DefaultMessageBatchSize = 100 // DefaultSubscriberLockTimeout is the default duration of the row lock // setting for [SubscriberOptions]. Must be in full seconds. DefaultSubscriberLockTimeout = 5 * time.Second // DefaultAckDeadline is the default duration of the message acknowledgement deadline // setting for [SubscriberOptions]. DefaultAckDeadline = 30 * time.Second // DefaultConsumerGroupName is the default subscription // consumer group name. DefaultConsumerGroupName = "default" )
Variables ¶
This section is empty.
Functions ¶
func NewExpiringKeyRepository ¶
func NewExpiringKeyRepository(ctx context.Context, config ExpiringKeyRepositoryConfiguration) (_ middleware.ExpiringKeyRepository, err error)
NewExpiringKeyRepository creates a repository that tracks key duplicates within a certain time frame. Starts a context-bound background routine to clean up expired keys. Use as a configuration option for middleware.Deduplicator.
func NewPublisher ¶
func NewPublisher(db SQLiteConnection, options PublisherOptions) (message.Publisher, error)
NewPublisher creates a message.Publisher instance from a connection interface which could be a database handle or a transaction.
func NewSubscriber ¶
func NewSubscriber(db SQLiteDatabase, options SubscriberOptions) (message.Subscriber, error)
NewSubscriber creates a new subscriber with the given options.
Types ¶
type ConsumerGroupMatcher ¶
type ConsumerGroupMatcher interface {
// MatchTopic returns a consumer group name
// for a given topic. This name must follow the same
// naming conventions as the topic name.
MatchTopic(topic string) (consumerGroupName string, err error)
}
ConsumerGroupMatcher associates a subscriber with a consumer group based on the subscription topic name.
func NewStaticConsumerGroupMatcher ¶
func NewStaticConsumerGroupMatcher(consumerGroupName string) ConsumerGroupMatcher
NewStaticConsumerGroupMatcher creates a new ConsumerGroupMatcher that
returns the same consumer group name for any topic.
type ConsumerGroupMatcherFunc ¶
ConsumerGroupMatcherFunc is a convenience type that implements the ConsumerGroupMatcher interface.
func (ConsumerGroupMatcherFunc) MatchTopic ¶
func (f ConsumerGroupMatcherFunc) MatchTopic(topic string) (consumerGroupName string, err error)
MatchTopic satisfies the ConsumerGroupMatcher interface.
type Error ¶
type Error uint8
Error represents common errors that might occur during the SQLite driver configuration or operations.
const ( // ErrUnknown indicates that operation failed due to an unknown reason. ErrUnknown Error = iota // ErrDatabaseConnectionIsNil indicates that configuration contained a nil database connection. ErrDatabaseConnectionIsNil // ErrPublisherIsClosed indicates that the publisher is closed and does not accept any more events. ErrPublisherIsClosed // ErrSubscriberIsClosed indicates that the subscriber is closed can no longer respond to events. ErrSubscriberIsClosed // ErrAttemptedTableInitializationWithinTransaction indicates that a database handle is a transaction // while trying to initialize SQLite tables. SQLite does not support table creation within transactions. // Attempting to create a table within a transaction can lead to data inconsistencies and errors. // // A transaction is probably used for single use event operations. Attempting to create a table // in such a scenario adds unnecessary overhead. Initialize the tables once when the application starts. ErrAttemptedTableInitializationWithinTransaction // ErrInvalidTopicName indicates that the topic name contains invalid characters. // Valid characters match the following regular expression pattern: `[^A-Za-z0-9\-\$\:\.\_]`. ErrInvalidTopicName )
type ExpiringKeyRepositoryConfiguration ¶
type ExpiringKeyRepositoryConfiguration struct {
// Database is SQLite3 database handle.
Database *sql.DB
// TableName is the name of the table used to store expiring keys.
// The
// Defaults to "watermill_expiring_keys".
TableName string
// Expiration is the duration after which a key is considered expired.
// If lower than five milliseconds, it is set to five milliseconds.
// Defaults to one minute.
Expiration time.Duration
// CleanUpInterval is the interval at which expired keys are cleaned up.
// Defaults to 15 seconds.
CleanUpInterval time.Duration
// CleanUpLogger tracks any problems that might emerge when cleaning up expired keys.
// Defaults to [watermill.NewStdLogger].
CleanUpLogger watermill.LoggerAdapter
}
ExpiringKeyRepositoryConfiguration intializes the expiring key repository in NewExpiringKeyRepository constructor.
type PublisherOptions ¶
type PublisherOptions struct {
// TableNameGenerators is a set of functions that generate table names for topics and offsets.
// Defaults to [TableNameGenerators.WithDefaultGeneratorsInsteadOfNils].
TableNameGenerators TableNameGenerators
// InitializeSchema enables initialization of schema database during publish.
// Schema is initialized once per topic per publisher instance.
// InitializeSchema is forbidden if using an ongoing transaction as database handle.
// It could result in an implicit commit of the transaction by a CREATE TABLE statement.
InitializeSchema bool
// Logger reports message publishing errors and traces. Defaults value is [watermill.NopLogger].
Logger watermill.LoggerAdapter
}
PublisherOptions configure message publishing behavior.
type SQLiteConnection ¶
type SQLiteConnection interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}
SQLiteDatabase is an SQLite database connection or a transaction.
type SQLiteDatabase ¶
type SQLiteDatabase interface {
SQLiteConnection
BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error)
}
SQLiteDatabase is an interface that represents an SQLite database.
type SubscriberOptions ¶
type SubscriberOptions struct {
// ConsumerGroupMatcher differentiates message consumers within the same topic.
// Messages are processed in batches.
// Therefore, another subscriber with the same consumer group name may only obtain
// messages whenever it is able to acquire the row lock.
// Default value is a static consumer group matcher that
// always returns [DefaultConsumerGroupName].
ConsumerGroupMatcher ConsumerGroupMatcher
// BatchSize is the number of messages to read in a single batch.
// Default value is [DefaultMessageBatchSize].
BatchSize int
// TableNameGenerators is a set of functions that generate table names for topics and offsets.
// Default value is [TableNameGenerators.WithDefaultGeneratorsInsteadOfNils].
TableNameGenerators TableNameGenerators
// PollInterval is the interval to wait between subsequent SELECT queries, if
// no more messages were found in the database (Prefer using the BackoffManager instead).
// Must be non-negative. Defaults to one second.
PollInterval time.Duration
// LockTimeout is the maximum duration of the row lock. If the subscription
// is unable to extend the lock before this time out ends, the lock will expire.
// Then, another subscriber in the same consumer group name may
// acquire the lock and continue processing messages.
//
// Duration must not be less than one second, because seconds are added
// to the SQLite `unixepoch` function, rounded to the nearest second.
// A zero duration would create a lock that expires immediately.
// There is no reason to set higher precision fractional duration,
// because the lock timeout will rarely ever trigger in a healthy system.
//
// Normally, the row lock is set to zero after each batch of messages is processed.
// LockTimeout might occur if a consuming node shuts down unexpectedly,
// before it is able to complete processing a batch of messages. Only
// in such rare cases the time out matters. And, it is better to set it
// to a higher value in order to avoid unnecessary batch re-processing.
// Therefore, a value lower than one second is impractical.
//
// Default value is [DefaultLockTimeout].
LockTimeout time.Duration
// AckDeadline is the time to wait for acking a message.
// If message is not acked within this time, it will be nacked and re-delivered.
//
// When messages are read in bulk, this time is calculated for each message separately.
//
// If you want to disable the acknowledgement deadline, set it to 0.
// Warning: when acknowledgement deadline is disabled, messages may block and
// prevent the subscriber from accepting new messages.
//
// Must be non-negative. Default value is [DefaultAckDeadline].
AckDeadline *time.Duration
// InitializeSchema option enables initializing schema on making a subscription.
InitializeSchema bool
// Logger reports message consumption errors and traces.
//
// Defaults to [watermill.NopLogger].
Logger watermill.LoggerAdapter
}
SubscriberOptions defines options for creating a subscriber. Every selection has a reasonable default value.
type TableNameGenerator ¶
TableNameGenerator creates a table name for a given topic either for a topic table or for offsets table.
type TableNameGenerators ¶
type TableNameGenerators struct {
Topic TableNameGenerator
Offsets TableNameGenerator
}
TableNameGenerators is a struct that holds two functions for generating topic and offsets table names. A [Publisher] and a [Subscriber] must use identical generators for topic and offsets tables in order to communicate with each other.
func (TableNameGenerators) WithDefaultGeneratorsInsteadOfNils ¶
func (t TableNameGenerators) WithDefaultGeneratorsInsteadOfNils() TableNameGenerators
WithDefaultGeneratorsInsteadOfNils returns a TableNameGenerators with default generators for topic and offsets tables if they were left nil.