Skip to content

Commit 4e58ba6

Browse files
committed
Allow the fake data to be streamed with a small delay
Signed-off-by: David Gageot <david.gageot@docker.com>
1 parent 2d733d2 commit 4e58ba6

6 files changed

Lines changed: 294 additions & 12 deletions

File tree

‎cmd/root/api.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (f *apiFlags) runAPICommand(cmd *cobra.Command, args []string) error {
9898
}
9999

100100
// Start fake proxy if --fake is specified
101-
cleanup, err := setupFakeProxy(f.fakeResponses, &f.runConfig)
101+
cleanup, err := setupFakeProxy(f.fakeResponses, 0, &f.runConfig)
102102
if err != nil {
103103
return err
104104
}

‎cmd/root/record.go‎

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,25 @@ import (
1111
)
1212

1313
// setupFakeProxy starts a fake proxy if fakeResponses is non-empty.
14+
// streamDelayMs controls simulated streaming: 0 = disabled, >0 = delay in milliseconds between chunks.
1415
// It returns a cleanup function that must be called when done (typically via defer).
15-
func setupFakeProxy(fakeResponses string, runConfig *config.RuntimeConfig) (cleanup func() error, err error) {
16+
func setupFakeProxy(fakeResponses string, streamDelayMs int, runConfig *config.RuntimeConfig) (cleanup func() error, err error) {
1617
if fakeResponses == "" {
1718
return func() error { return nil }, nil
1819
}
1920

2021
// Normalize path by stripping .yaml suffix (go-vcr adds it automatically)
2122
fakeResponses = strings.TrimSuffix(fakeResponses, ".yaml")
2223

23-
proxyURL, cleanupFn, err := fake.StartProxy(fakeResponses)
24+
var opts []fake.ProxyOption
25+
if streamDelayMs > 0 {
26+
opts = append(opts,
27+
fake.WithSimulateStream(true),
28+
fake.WithStreamChunkDelay(time.Duration(streamDelayMs)*time.Millisecond),
29+
)
30+
}
31+
32+
proxyURL, cleanupFn, err := fake.StartProxy(fakeResponses, opts...)
2433
if err != nil {
2534
return nil, fmt.Errorf("failed to start fake proxy: %w", err)
2635
}

‎cmd/root/run.go‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"os"
99
"path/filepath"
1010
"runtime/pprof"
11-
"strings"
1211

1312
"github.com/mattn/go-isatty"
1413
"github.com/spf13/cobra"
@@ -37,6 +36,7 @@ type runExecFlags struct {
3736
sessionID string
3837
recordPath string
3938
fakeResponses string
39+
fakeStreamDelay int
4040
exitAfterResponse bool
4141
cpuProfile string
4242
forceTUI bool
@@ -86,6 +86,8 @@ func addRunOrExecFlags(cmd *cobra.Command, flags *runExecFlags) {
8686
cmd.PersistentFlags().StringVarP(&flags.sessionDB, "session-db", "s", filepath.Join(paths.GetHomeDir(), ".cagent", "session.db"), "Path to the session database")
8787
cmd.PersistentFlags().StringVar(&flags.sessionID, "session", "", "Continue from a previous session by ID")
8888
cmd.PersistentFlags().StringVar(&flags.fakeResponses, "fake", "", "Replay AI responses from cassette file (for testing)")
89+
cmd.PersistentFlags().IntVar(&flags.fakeStreamDelay, "fake-stream", 0, "Simulate streaming with delay in ms between chunks (default 15ms if no value given)")
90+
cmd.Flag("fake-stream").NoOptDefVal = "15" // --fake-stream without value uses 15ms
8991
cmd.PersistentFlags().StringVar(&flags.recordPath, "record", "", "Record AI API interactions to cassette file (auto-generates filename if empty)")
9092
cmd.PersistentFlags().Lookup("record").NoOptDefVal = "true"
9193
cmd.PersistentFlags().BoolVar(&flags.exitAfterResponse, "exit-after-response", false, "Exit TUI after first assistant response completes")
@@ -142,7 +144,7 @@ func (f *runExecFlags) runOrExec(ctx context.Context, out *cli.Printer, args []s
142144
}
143145

144146
// Start fake proxy if --fake is specified
145-
fakeCleanup, err := setupFakeProxy(f.fakeResponses, &f.runConfig)
147+
fakeCleanup, err := setupFakeProxy(f.fakeResponses, f.fakeStreamDelay, &f.runConfig)
146148
if err != nil {
147149
return err
148150
}

‎e2e/proxy_test.go‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func startRecordingAIProxy(t *testing.T) (*httptest.Server, *config.RuntimeConfi
2929
recorder.ModeRecordOnce,
3030
matcher,
3131
fake.APIKeyHeaderUpdater,
32+
nil,
3233
)
3334
require.NoError(t, err)
3435

‎pkg/fake/proxy.go‎

Lines changed: 132 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package fake
44

55
import (
6+
"bufio"
67
"bytes"
78
"context"
89
"fmt"
@@ -21,10 +22,42 @@ import (
2122
"gopkg.in/dnaeon/go-vcr.v4/pkg/recorder"
2223
)
2324

25+
// ProxyOptions configures the fake proxy behavior.
26+
type ProxyOptions struct {
27+
// SimulateStream adds delays between SSE chunks to simulate real streaming.
28+
SimulateStream bool
29+
// StreamChunkDelay is the delay between SSE chunks when SimulateStream is true.
30+
// Defaults to 15ms if not set.
31+
StreamChunkDelay time.Duration
32+
}
33+
34+
// ProxyOption is a function that configures ProxyOptions.
35+
type ProxyOption func(*ProxyOptions)
36+
37+
// WithSimulateStream enables simulated streaming with delays between chunks.
38+
func WithSimulateStream(enabled bool) ProxyOption {
39+
return func(o *ProxyOptions) {
40+
o.SimulateStream = enabled
41+
}
42+
}
43+
44+
// WithStreamChunkDelay sets the delay between SSE chunks.
45+
func WithStreamChunkDelay(d time.Duration) ProxyOption {
46+
return func(o *ProxyOptions) {
47+
o.StreamChunkDelay = d
48+
}
49+
}
50+
2451
// StartProxy starts an internal HTTP proxy that replays cassette responses.
2552
// It returns the proxy URL and a cleanup function that should be called when done.
26-
func StartProxy(cassettePath string) (string, func() error, error) {
27-
return StartProxyWithOptions(cassettePath, recorder.ModeReplayOnly, nil, nil)
53+
func StartProxy(cassettePath string, opts ...ProxyOption) (string, func() error, error) {
54+
options := &ProxyOptions{
55+
StreamChunkDelay: 15 * time.Millisecond,
56+
}
57+
for _, opt := range opts {
58+
opt(options)
59+
}
60+
return StartProxyWithOptions(cassettePath, recorder.ModeReplayOnly, nil, nil, options)
2861
}
2962

3063
// StartRecordingProxy starts a proxy that records AI API interactions to a cassette file.
@@ -51,7 +84,7 @@ func StartStreamingRecordingProxy(
5184
e := echo.New()
5285
e.HideBanner = true
5386
e.HidePort = true
54-
e.Any("/*", Handle(streamRec, headerUpdater))
87+
e.Any("/*", Handle(streamRec, headerUpdater, nil))
5588

5689
httpServer := httptest.NewServer(e)
5790

@@ -102,17 +135,23 @@ func APIKeyHeaderUpdater(host string, req *http.Request) {
102135
// - mode: recorder mode (ModeReplayOnly, ModeRecordOnce, etc.)
103136
// - matcher: custom matcher function (nil uses DefaultMatcher)
104137
// - headerUpdater: optional function to update request headers (for recording with real API keys)
138+
// - options: proxy options for stream simulation, etc.
105139
func StartProxyWithOptions(
106140
cassettePath string,
107141
mode recorder.Mode,
108142
matcher recorder.MatcherFunc,
109143
headerUpdater func(host string, req *http.Request),
144+
options *ProxyOptions,
110145
) (string, func() error, error) {
111146
hasMatcher := matcher != nil
112147
if !hasMatcher {
113148
matcher = DefaultMatcher(nil)
114149
}
115150

151+
if options == nil {
152+
options = &ProxyOptions{}
153+
}
154+
116155
transport, err := recorder.New(cassettePath,
117156
recorder.WithMode(mode),
118157
recorder.WithMatcher(matcher),
@@ -127,7 +166,7 @@ func StartProxyWithOptions(
127166
e := echo.New()
128167
e.HideBanner = true
129168
e.HidePort = true
130-
e.Any("/*", Handle(transport, headerUpdater))
169+
e.Any("/*", Handle(transport, headerUpdater, options))
131170

132171
httpServer := httptest.NewServer(e)
133172

@@ -227,7 +266,11 @@ func TargetURLForHost(host string) func(req *http.Request) string {
227266

228267
// Handle creates an echo handler that proxies requests through the VCR transport.
229268
// The headerUpdater is called with the host and request to update headers (e.g., for adding API keys).
230-
func Handle(transport http.RoundTripper, headerUpdater func(host string, req *http.Request)) echo.HandlerFunc {
269+
// The options parameter controls streaming simulation behavior.
270+
func Handle(transport http.RoundTripper, headerUpdater func(host string, req *http.Request), options *ProxyOptions) echo.HandlerFunc {
271+
if options == nil {
272+
options = &ProxyOptions{}
273+
}
231274
return func(c echo.Context) error {
232275
ctx := c.Request().Context()
233276

@@ -269,6 +312,9 @@ func Handle(transport http.RoundTripper, headerUpdater func(host string, req *ht
269312
c.Response().WriteHeader(resp.StatusCode)
270313

271314
if IsStreamResponse(resp) {
315+
if options.SimulateStream {
316+
return SimulatedStreamCopy(c, resp, options.StreamChunkDelay)
317+
}
272318
return StreamCopy(c, resp)
273319
}
274320

@@ -277,6 +323,44 @@ func Handle(transport http.RoundTripper, headerUpdater func(host string, req *ht
277323
}
278324
}
279325

326+
// SimulatedStreamCopy copies a streaming SSE response to the client with artificial delays
327+
// between events to simulate real-time streaming behavior.
328+
func SimulatedStreamCopy(c echo.Context, resp *http.Response, chunkDelay time.Duration) error {
329+
ctx := c.Request().Context()
330+
writer := c.Response().Writer
331+
332+
scanner := bufio.NewScanner(resp.Body)
333+
// SSE events can be large, increase buffer size
334+
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
335+
336+
for scanner.Scan() {
337+
select {
338+
case <-ctx.Done():
339+
slog.WarnContext(ctx, "client disconnected, stop streaming")
340+
return nil
341+
default:
342+
}
343+
344+
line := scanner.Text()
345+
// Write the line with newline
346+
if _, err := writer.Write([]byte(line + "\n")); err != nil {
347+
return err
348+
}
349+
c.Response().Flush()
350+
351+
// Add delay after data lines (SSE events start with "data:")
352+
if strings.HasPrefix(line, "data:") {
353+
select {
354+
case <-ctx.Done():
355+
return nil
356+
case <-time.After(chunkDelay):
357+
}
358+
}
359+
}
360+
361+
return scanner.Err()
362+
}
363+
280364
// streamReadResult holds the result of a streaming read operation.
281365
type streamReadResult struct {
282366
n int64
@@ -330,6 +414,8 @@ func StreamCopy(c echo.Context, resp *http.Response) error {
330414
}
331415

332416
// IsStreamResponse checks if the response should be streamed.
417+
// It checks Content-Type headers first, then falls back to peeking at the body
418+
// for SSE format (useful when headers are stripped in recorded cassettes).
333419
func IsStreamResponse(resp *http.Response) bool {
334420
ct := strings.ToLower(resp.Header.Get("Content-Type"))
335421
if strings.Contains(ct, "text/event-stream") {
@@ -341,7 +427,46 @@ func IsStreamResponse(resp *http.Response) bool {
341427
return true
342428
}
343429

344-
return strings.Contains(ct, "application/octet-stream") ||
430+
if strings.Contains(ct, "application/octet-stream") ||
345431
strings.Contains(ct, "application/x-ndjson") ||
346-
strings.Contains(ct, "application/stream+json")
432+
strings.Contains(ct, "application/stream+json") {
433+
return true
434+
}
435+
436+
// If no streaming headers detected, peek at the body to check for SSE format.
437+
// This handles cassettes where headers were stripped during recording.
438+
if resp.Body != nil {
439+
// Read enough to detect SSE prefixes ("data:" or "event:")
440+
peek := make([]byte, 6)
441+
n, err := resp.Body.Read(peek)
442+
if err == nil || n > 0 {
443+
// Reconstruct the body with the peeked bytes prepended
444+
resp.Body = &peekReader{peeked: peek[:n], rest: resp.Body}
445+
// Check for SSE format markers
446+
if bytes.HasPrefix(peek[:n], []byte("data:")) || bytes.HasPrefix(peek[:n], []byte("event:")) {
447+
return true
448+
}
449+
}
450+
}
451+
452+
return false
453+
}
454+
455+
// peekReader wraps a reader with already-peeked bytes.
456+
type peekReader struct {
457+
peeked []byte
458+
rest io.ReadCloser
459+
}
460+
461+
func (p *peekReader) Read(b []byte) (int, error) {
462+
if len(p.peeked) > 0 {
463+
n := copy(b, p.peeked)
464+
p.peeked = p.peeked[n:]
465+
return n, nil
466+
}
467+
return p.rest.Read(b)
468+
}
469+
470+
func (p *peekReader) Close() error {
471+
return p.rest.Close()
347472
}

0 commit comments

Comments
 (0)