Skip to content

Commit 6ada568

Browse files
committed
Fix stdout server blocking
The Go server is a vanilla main program, which is really convenient to start with `go run`. Before this commit, andy writes to stdout from user code (e.g. `fmt.Println("Starting server ...`) would freeze the server and eventually time out. We could tell the user to never use stdout, but do any logging to stderr, e.g., via fmt.Log but doing fmt.Println is muscle memory in most Go developers, so this situation would be bound to create some frustration. This commit fixes this by: * In `client.Start`, wait until the server has announced it's ready before it starts reading from os.Stdin. * In `server.Start`, pipe os.Stdout and copy its writes to os.Stderr (which is where new writes to fmt.Println will end up)
1 parent 4efaf01 commit 6ada568

File tree

5 files changed

+285
-63
lines changed

5 files changed

+285
-63
lines changed

‎client.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func StartClientRaw(opts ClientRawOptions) (*ClientRaw, error) {
8585
}
8686
cmd.Env = env
8787

88-
conn, err := newConn(cmd)
88+
conn, err := newConn(cmd, opts.Timeout)
8989
if err != nil {
9090
return nil, err
9191
}
@@ -142,7 +142,7 @@ func (c *ClientRaw) Execute(body []byte) (Message, error) {
142142
select {
143143
case call = <-call.Done:
144144
case <-time.After(c.timeout):
145-
return Message{}, errors.New("timeout waiting for the server to respond; check that you're not writing anything to stdout inside the server")
145+
return Message{}, ErrTimeoutWaitingForServer
146146
}
147147

148148
if call.Error != nil {

‎client_test.go‎

Lines changed: 98 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package execrpc_test
22

33
import (
4+
"fmt"
45
"testing"
6+
"time"
57

68
"github.com/bep/execrpc"
79
"github.com/bep/execrpc/codecs"
810
"github.com/bep/execrpc/examples/model"
911
qt "github.com/frankban/quicktest"
12+
"golang.org/x/sync/errgroup"
1013
)
1114

1215
func TestExecRaw(t *testing.T) {
@@ -32,17 +35,36 @@ func TestExecRaw(t *testing.T) {
3235

3336
}
3437

38+
func newTestClient(t testing.TB, codec codecs.Codec[model.ExampleRequest, model.ExampleResponse], env ...string) *execrpc.Client[model.ExampleRequest, model.ExampleResponse] {
39+
client, err := execrpc.StartClient(
40+
execrpc.ClientOptions[model.ExampleRequest, model.ExampleResponse]{
41+
ClientRawOptions: execrpc.ClientRawOptions{
42+
Version: 1,
43+
Cmd: "go",
44+
Args: []string{"run", "./examples/servers/typed"},
45+
Env: env,
46+
},
47+
Codec: codec,
48+
},
49+
)
50+
if err != nil {
51+
t.Fatal(err)
52+
}
53+
return client
54+
}
55+
3556
func TestExecTyped(t *testing.T) {
3657
c := qt.New(t)
3758

38-
newCient := func(t testing.TB, codecID string, codec codecs.Codec[model.ExampleRequest, model.ExampleResponse]) *execrpc.Client[model.ExampleRequest, model.ExampleResponse] {
59+
newClient := func(t testing.TB, codec codecs.Codec[model.ExampleRequest, model.ExampleResponse], env ...string) *execrpc.Client[model.ExampleRequest, model.ExampleResponse] {
3960
client, err := execrpc.StartClient(
4061
execrpc.ClientOptions[model.ExampleRequest, model.ExampleResponse]{
4162
ClientRawOptions: execrpc.ClientRawOptions{
4263
Version: 1,
4364
Cmd: "go",
4465
Args: []string{"run", "./examples/servers/typed"},
45-
Env: []string{"EXECRPC_CODEC=" + codecID},
66+
Env: env,
67+
Timeout: 4 * time.Second,
4668
},
4769
Codec: codec,
4870
},
@@ -53,26 +75,33 @@ func TestExecTyped(t *testing.T) {
5375
return client
5476
}
5577

56-
c.Run("JSON", func(c *qt.C) {
57-
client := newCient(c, "json", codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{})
78+
runBasicTestForClient := func(c *qt.C, client *execrpc.Client[model.ExampleRequest, model.ExampleResponse]) model.ExampleResponse {
5879
result, err := client.Execute(model.ExampleRequest{Text: "world"})
5980
c.Assert(err, qt.IsNil)
6081
c.Assert(result.Err(), qt.IsNil)
6182
c.Assert(string(result.Hello), qt.Equals, "Hello world!")
6283
c.Assert(client.Close(), qt.IsNil)
84+
return result
85+
86+
}
87+
88+
c.Run("JSON", func(c *qt.C) {
89+
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json")
90+
runBasicTestForClient(c, client)
6391
})
6492

6593
c.Run("TOML", func(c *qt.C) {
66-
client := newCient(c, "toml", codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{})
67-
result, err := client.Execute(model.ExampleRequest{Text: "world"})
68-
c.Assert(err, qt.IsNil)
69-
c.Assert(result.Err(), qt.IsNil)
70-
c.Assert(string(result.Hello), qt.Equals, "Hello world!")
71-
c.Assert(client.Close(), qt.IsNil)
94+
client := newClient(c, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=toml")
95+
runBasicTestForClient(c, client)
7296
})
7397

7498
c.Run("Gob", func(c *qt.C) {
75-
client := newCient(c, "gob", codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{})
99+
client := newClient(c, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=gob")
100+
runBasicTestForClient(c, client)
101+
})
102+
103+
c.Run("Send log message from server", func(c *qt.C) {
104+
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json", "EXECRPC_SEND_LOG_MESSAGE=true")
76105
result, err := client.Execute(model.ExampleRequest{Text: "world"})
77106
c.Assert(err, qt.IsNil)
78107
c.Assert(result.Err(), qt.IsNil)
@@ -81,38 +110,74 @@ func TestExecTyped(t *testing.T) {
81110
})
82111

83112
c.Run("Error", func(c *qt.C) {
84-
client := newCient(c, "json", codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{})
85-
result, err := client.Execute(model.ExampleRequest{Text: "fail"})
113+
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json", "EXECRPC_CALL_SHOULD_FAIL=true")
114+
result, err := client.Execute(model.ExampleRequest{Text: "hello"})
86115
c.Assert(err, qt.IsNil)
87116
c.Assert(result.Err(), qt.IsNotNil)
88117
c.Assert(client.Close(), qt.IsNil)
89118
})
90119

120+
// The "stdout print tests" are just to make sure that the server behaves and does not hang.
121+
122+
c.Run("Print to stdout outside server before", func(c *qt.C) {
123+
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_PRINT_OUTSIDE_SERVER_BEFORE=true")
124+
runBasicTestForClient(c, client)
125+
})
126+
127+
c.Run("Print to stdout inside server", func(c *qt.C) {
128+
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_PRINT_INSIDE_SERVER=true")
129+
runBasicTestForClient(c, client)
130+
})
131+
132+
c.Run("Print to stdout outside server before", func(c *qt.C) {
133+
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_PRINT_OUTSIDE_SERVER_BEFORE=true")
134+
runBasicTestForClient(c, client)
135+
})
136+
137+
c.Run("Print to stdout inside after", func(c *qt.C) {
138+
client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_PRINT_OUTSIDE_SERVER_AFTER=true")
139+
runBasicTestForClient(c, client)
140+
})
141+
91142
}
92143

93-
func BenchmarkClient(b *testing.B) {
94-
newCient := func(t testing.TB, codecID string, codec codecs.Codec[model.ExampleRequest, model.ExampleResponse]) *execrpc.Client[model.ExampleRequest, model.ExampleResponse] {
95-
client, err := execrpc.StartClient(
96-
execrpc.ClientOptions[model.ExampleRequest, model.ExampleResponse]{
97-
ClientRawOptions: execrpc.ClientRawOptions{
98-
Version: 1,
99-
Cmd: "go",
100-
Args: []string{"run", "./examples/servers/typed"},
101-
Env: []string{"EXECRPC_CODEC=" + codecID},
102-
},
103-
Codec: codec,
104-
},
105-
)
106-
if err != nil {
107-
t.Fatal(err)
108-
}
109-
return client
144+
func TestExecTypedConcurrent(t *testing.T) {
145+
client := newTestClient(t, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json")
146+
var g errgroup.Group
147+
148+
for i := 0; i < 100; i++ {
149+
i := i
150+
g.Go(func() error {
151+
for j := 0; j < 10; j++ {
152+
text := fmt.Sprintf("%d-%d", i, j)
153+
result, err := client.Execute(model.ExampleRequest{Text: text})
154+
if err != nil {
155+
return err
156+
}
157+
if result.Err() != nil {
158+
return result.Err()
159+
}
160+
expect := fmt.Sprintf("Hello %s!", text)
161+
if string(result.Hello) != expect {
162+
return fmt.Errorf("unexpected result: %s", result.Hello)
163+
}
164+
}
165+
return nil
166+
})
110167
}
111168

169+
if err := g.Wait(); err != nil {
170+
t.Fatal(err)
171+
}
172+
173+
}
174+
175+
func BenchmarkClient(b *testing.B) {
176+
112177
const word = "World"
113178

114179
b.Run("JSON", func(b *testing.B) {
115-
client := newCient(b, "json", codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{})
180+
client := newTestClient(b, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json")
116181
b.RunParallel(func(pb *testing.PB) {
117182
for pb.Next() {
118183
_, err := client.Execute(model.ExampleRequest{Text: word})
@@ -124,7 +189,7 @@ func BenchmarkClient(b *testing.B) {
124189
})
125190

126191
b.Run("TOML", func(b *testing.B) {
127-
client := newCient(b, "toml", codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{})
192+
client := newTestClient(b, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=toml")
128193
b.RunParallel(func(pb *testing.PB) {
129194
for pb.Next() {
130195
_, err := client.Execute(model.ExampleRequest{Text: word})
@@ -136,7 +201,7 @@ func BenchmarkClient(b *testing.B) {
136201
})
137202

138203
b.Run("Gob", func(b *testing.B) {
139-
client := newCient(b, "gob", codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{})
204+
client := newTestClient(b, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=gob")
140205
b.RunParallel(func(pb *testing.PB) {
141206
for pb.Next() {
142207
_, err := client.Execute(model.ExampleRequest{Text: word})

‎conn.go‎

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,25 @@
11
package execrpc
22

33
import (
4+
"bufio"
45
"bytes"
6+
"context"
57
"errors"
68
"io"
79
"os"
810
"os/exec"
911
"regexp"
1012
"time"
13+
14+
"golang.org/x/sync/errgroup"
1115
)
1216

17+
// ErrTimeoutWaitingForServer is returned on timeouts starting the server.
18+
var ErrTimeoutWaitingForServer = errors.New("timed out waiting for server to start")
19+
1320
var brokenPipeRe = regexp.MustCompile("Broken pipe|pipe is being closed")
1421

15-
func newConn(cmd *exec.Cmd) (_ conn, err error) {
22+
func newConn(cmd *exec.Cmd, timeout time.Duration) (_ conn, err error) {
1623
in, err := cmd.StdinPipe()
1724
if err != nil {
1825
return conn{}, err
@@ -25,7 +32,13 @@ func newConn(cmd *exec.Cmd) (_ conn, err error) {
2532

2633
out, err := cmd.StdoutPipe()
2734
stdErr := &tailBuffer{limit: 1024}
28-
c := conn{out, in, stdErr, cmd}
35+
c := conn{
36+
ReadCloser: out,
37+
WriteCloser: in,
38+
stdErr: stdErr,
39+
cmd: cmd,
40+
timeout: timeout,
41+
}
2942
cmd.Stderr = io.MultiWriter(c.stdErr, os.Stderr)
3043

3144
return c, err
@@ -36,6 +49,8 @@ type conn struct {
3649
io.WriteCloser
3750
stdErr *tailBuffer
3851
cmd *exec.Cmd
52+
53+
timeout time.Duration
3954
}
4055

4156
// Close closes conn's WriteCloser, ReadClosers, and waits for the command to finish.
@@ -61,7 +76,60 @@ func (c conn) Start() error {
6176
if err != nil {
6277
return c.Close()
6378
}
64-
return err
79+
80+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
81+
defer cancel()
82+
g, ctx := errgroup.WithContext(ctx)
83+
84+
g.Go(func() error {
85+
// THe server will announce when it's ready to read from stdin
86+
// by writing a special string to stdout.
87+
for {
88+
select {
89+
case <-ctx.Done():
90+
return ErrTimeoutWaitingForServer
91+
default:
92+
done := make(chan bool)
93+
errc := make(chan error)
94+
go func() {
95+
var read []byte
96+
br := bufio.NewReader(c)
97+
for {
98+
select {
99+
case <-ctx.Done():
100+
return
101+
default:
102+
b, err := br.ReadByte()
103+
if err != nil {
104+
errc <- err
105+
break
106+
}
107+
read = append(read, b)
108+
if bytes.Contains(read, serverStarted) {
109+
remainder := bytes.Replace(read, serverStarted, nil, 1)
110+
if len(remainder) > 0 {
111+
os.Stdout.Write(remainder)
112+
}
113+
done <- true
114+
return
115+
}
116+
}
117+
}
118+
}()
119+
120+
select {
121+
case <-ctx.Done():
122+
return ErrTimeoutWaitingForServer
123+
case err := <-errc:
124+
return err
125+
case <-done:
126+
return nil
127+
}
128+
}
129+
}
130+
})
131+
132+
return g.Wait()
65133
}
66134

67135
// the server ends itself on EOF, this is just to give it some

0 commit comments

Comments
 (0)