RIO (中文)
An AIO network library based on IOURING, without using CGO, and following the design pattern of the standard library.
Supported protocols: TCP, UDP, UNIX, UNIXGRAM (IP is the proxy standard library).
RIO is a library that follows the usage pattern of the standard library and can be put into use very conveniently. Therefore, it is not a toy and can replace NET at a very low cost.
- Linux kernel version must be
>= 6.13. - Scenarios that only use
DialrequirePINandUNPINto pin the kernel thread ofIOURING. NetworkingMode=mirroredcannot be enabled inWSL2.- Since
DIRECT FDdoes not supportCLOEXEC, it is necessary to close allFDwhen the program exits (close all links when both net.Http and fasthttp implement closure).
- Based on the implementation of
net.Listener,net.Connandnet.PacketConn. - Use
BATCHto reduce the overhead ofSYSTEM CALL. - Support
TLS. - Support
MULTISHOT_ACCEPTMULTISHOT_RECVandMULTISHOT_RECV_FROM. - Support
SEND_ZCandSENDMSG_ZC. - Support
NAPI. - Support
PERSIONALITY. - Supports
CURVEto dynamically adjust the timeout ofWAIT CQEto fit different scenarios.
TCP
HTTP
| Endpoint | Platform | IP | OS | SKU |
|---|---|---|---|---|
| Client | WSL2 | 192.168.100.1 | Ubuntu22.04 (6.13.6-microsoft-standard-WSL2) | 4C 16G |
| Server | Hyper-V | 192.168.100.120 | Ubuntu24.10 (6.13.12-061312-generic) | 4C 0.5G |
Syscall
| Lib | Proportion | Desc |
|---|---|---|
| RIO | 33% (3%) | 33% is the single publisher mode, and 3% is the SQ-POLL mode. |
| NET | 74% | Reading, writing, Epoll, etc. account for a total of 74%. |
go get -u github.com/brickingsoft/rioFor basic use, replace net with github.com/brickingsoft/rio.
// replace net.Listen() with rio.Listen()
ln, lnErr := rio.Listen("tcp", ":9000")
// replace net.Dial() with rio.Dial()
conn, dialErr := rio.Dial("tcp", "127.0.0.1:9000")Use rio.Conn to set or get socketopt.
rc := conn.(rio.Conn)
setErr := rc.SetSocketOptInt(level, name, value)
value, getErr := rc.GetSocketOptInt(level, name)Use the built-in security approach.
// server("github.com/brickingsoft/rio/security")
ln, _ = security.Listen("tcp", ":9000", config)
// client("github.com/brickingsoft/rio/security")
conn, _ = security.Dial("tcp", "127.0.0.1:9000", config)For server, use http.Server.Serve() replace Listener.
rio.Preset(
aio.WithNAPIBusyPollTimeout(time.Microsecond * 50),
)
ln, lnErr := rio.Listen("tcp", ":9000")
if lnErr != nil {
panic(lnErr)
return
}
srv := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("hello world"))
}),
}
done := make(chan struct{}, 1)
go func(ln net.Listener, srv *http.Server, done chan<- struct{}) {
if srvErr := srv.Serve(ln); srvErr != nil {
if errors.Is(srvErr, http.ErrServerClosed) {
close(done)
return
}
panic(srvErr)
return
}
close(done)
}(ln, srv, done)
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGABRT, syscall.SIGTERM)
<-signalCh
if shutdownErr := srv.Shutdown(context.Background()); shutdownErr != nil {
panic(shutdownErr)
}
<-doneFor client, reset http.DefaultTransport.
http.DefaultTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := rio.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}
return dialer.DialContext(ctx, network, addr)
},
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
resp, getErr := http.Get("http://127.0.0.1:9000/")tcpConn, ok := conn.(*rio.TCPConn)
udpConn, ok := conn.(*rio.UDPConn)
unixConn, ok := conn.(*rio.UnixConn)
rioConn, ok := conn.(rio.Conn)rio.ListenConfig is similar to net.ListenConfig and listens by configuration.
config := rio.ListenConfig{
Control: nil,
KeepAlive: 0,
KeepAliveConfig: net.KeepAliveConfig{},
MultipathTCP: false,
ReusePort: false,
}
ln, lnErr := config.Listen(context.Background(), "tcp", ":9000")rio.Dialer is similar to net.Dialer and dials by configuration.
dialer := rio.Dialer{
Timeout: 0,
Deadline: time.Time{},
KeepAlive: 0,
KeepAliveConfig: net.KeepAliveConfig{},
LocalAddr: nil,
FallbackDelay: 0,
MultipathTCP: false,
Control: nil,
ControlContext: nil,
}
conn, dialErr := dialer.DialContext(context.Background(), "tcp", "127.0.0.1:9000")Because IOURING has a resource handling step in its setup and shutdown process, and its lifecycle is tied to the user's maximum lifecycle.
To prevent an instance from shutting down when it shouldn't, its lifecycle can be manually controlled via PIN and UNPIN, generally for scenarios where there is only DIAL or where there is more than one LISTEN.
// Calling before presetting and launching a link
rio.Pin()
// Called after all links are closed
rio.Unpin()For postgres example, create a Dialer and use the Connector to open database.
type RIOPQDialer struct{}
func (d *RIOPQDialer) Dial(network, address string) (net.Conn, error) {
return rio.Dial(network, address)
}
func (d *RIOPQDialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
return rio.DialTimeout(network, address, timeout)
}connector, connectorErr := pq.NewConnector("{dsn}")
if connectorErr != nil {
panic(connectorErr)
}
connector.Dialer(&RIOPQDialer{})
db := sql.OpenDB(connector)Predefined aio.NCurve aio.SCurve and aio.LCurve.
| Name | Desc | Scenes |
|---|---|---|
| NCurve | Nil | For non-single publishers only |
| SCurve | Short | For short links under single publisher |
| LCurve | Long | For long links under multiple publishers |
Customize IOURING with presets.
rio.Peset(
// Set the size of the IOURING, default is 16384, maximum is 32768.
aio.WithEntries(liburing.DefaultEntries),
// Set the Flags of the IOURING.
// Optimized for single threading by default, how you need to turn on SQPOLL can be set.
aio.WithFlags(liburing.IORING_SETUP_SINGLE_ISSUER),
// Whether to enable SEND ZERO COPY.
// Not turned on by default. Note: Some pressure testing tools cannot detect the return value.
aio.WithSendZCEnabled(false),
// Whether to disable multishot mode.
// Not disabled by default.
// Multishots can significantly reduce SQE casts, but will require additional resources such as registering and deregistering BufferAndRing.
// Disable multishot mode is typically used in conjunction with enabling SQPOLL to significantly reduce the overhead of SYSCALL.
aio.WithMultishotDisabled(false),
// Set BufferAndRing config.
// Effective in non-prohibited multishot mode.
// A BufferAndRing serves only one Fd.
// The parameter size is the size of the buffer, which is recommended to be a page size.
// The parameter count is the number of buffer nodes in the ring.
// The parameter idle timeout is the amount of time to idle before logging out when it is no longer in use.
aio.WithBufferAndRingConfig(4096, 32, 5*time.Second),
// Set the CQE wait timeout curve.
aio.WithWaitCQETimeoutCurve(aio.SCurve),
// Set the NAPI.
// The minimum unit of timeout time is microsecond, which is not turned on by default.
aio.WithNAPIBusyPollTimeout(50*time.Microsecond),
)




