@@ -5,245 +5,18 @@ import (
55 "fmt"
66 "io"
77 "iter"
8- "net"
9- "net/http"
108 "os"
119 "os/exec"
1210 "sync"
13- "sync/atomic"
1411 "testing"
15- "time"
1612
17- "github.com/google/jsonschema-go/jsonschema"
1813 gomcp "github.com/modelcontextprotocol/go-sdk/mcp"
1914 "github.com/stretchr/testify/assert"
2015 "github.com/stretchr/testify/require"
2116
2217 "github.com/docker/docker-agent/pkg/tools"
2318)
2419
25- // listenTCPRetry retries Listen on addr until it succeeds or a short
26- // deadline passes (the port may be momentarily busy after a previous
27- // server shutdown). The returned listener should be closed by the caller.
28- func listenTCPRetry (t * testing.T , addr string ) net.Listener {
29- t .Helper ()
30- var lc net.ListenConfig
31- var ln net.Listener
32- require .Eventually (t , func () bool {
33- var err error
34- ln , err = lc .Listen (t .Context (), "tcp" , addr )
35- return err == nil
36- }, 2 * time .Second , 50 * time .Millisecond , "port %s not available in time" , addr )
37- return ln
38- }
39-
40- // startMCPServer creates a minimal MCP server on addr with the given tools
41- // and returns a function to shut it down.
42- func startMCPServer (t * testing.T , addr string , mcpTools ... * gomcp.Tool ) (shutdown func ()) {
43- t .Helper ()
44-
45- s := gomcp .NewServer (& gomcp.Implementation {Name : "test-server" , Version : "1.0.0" }, nil )
46- for _ , tool := range mcpTools {
47- s .AddTool (tool , func (_ context.Context , _ * gomcp.CallToolRequest ) (* gomcp.CallToolResult , error ) {
48- return & gomcp.CallToolResult {
49- Content : []gomcp.Content {& gomcp.TextContent {Text : "ok-" + tool .Name }},
50- }, nil
51- })
52- }
53-
54- srvLn := listenTCPRetry (t , addr )
55- srv := & http.Server {
56- Handler : gomcp .NewStreamableHTTPHandler (func (* http.Request ) * gomcp.Server { return s }, nil ),
57- }
58- go func () { _ = srv .Serve (srvLn ) }()
59-
60- return func () { _ = srv .Close () }
61- }
62-
63- // allocateAddr returns a free TCP address on localhost.
64- func allocateAddr (t * testing.T ) string {
65- t .Helper ()
66- ln := listenTCPRetry (t , "127.0.0.1:0" )
67- addr := ln .Addr ().String ()
68- ln .Close ()
69- return addr
70- }
71-
72- // TestRemoteReconnectAfterServerRestart verifies that a Toolset backed by a
73- // real remote (streamable-HTTP) MCP server transparently recovers when the
74- // server is restarted.
75- //
76- // The scenario:
77- // 1. Start a minimal MCP server with a "ping" tool.
78- // 2. Connect a Toolset, call "ping" — succeeds.
79- // 3. Shut down the server (simulates crash / restart).
80- // 4. Start a **new** server on the same address.
81- // 5. Call "ping" again — this must succeed after automatic reconnection.
82- //
83- // Without the ErrSessionMissing recovery logic the second call would fail
84- // because the new server does not know the old session ID.
85- func TestRemoteReconnectAfterServerRestart (t * testing.T ) {
86- t .Parallel ()
87-
88- addr := allocateAddr (t )
89-
90- var callCount atomic.Int32
91-
92- // startServer creates a minimal MCP server on addr with a "ping" tool
93- // and returns a function to shut it down.
94- startServer := func (t * testing.T ) (shutdown func ()) {
95- t .Helper ()
96-
97- s := gomcp .NewServer (& gomcp.Implementation {Name : "test-server" , Version : "1.0.0" }, nil )
98- s .AddTool (& gomcp.Tool {
99- Name : "ping" ,
100- InputSchema : & jsonschema.Schema {Type : "object" },
101- }, func (_ context.Context , _ * gomcp.CallToolRequest ) (* gomcp.CallToolResult , error ) {
102- n := callCount .Add (1 )
103- return & gomcp.CallToolResult {
104- Content : []gomcp.Content {& gomcp.TextContent {Text : fmt .Sprintf ("pong-%d" , n )}},
105- }, nil
106- })
107-
108- // Retry Listen until the port is available (e.g. after a server shutdown).
109- srvLn := listenTCPRetry (t , addr )
110-
111- srv := & http.Server {
112- Handler : gomcp .NewStreamableHTTPHandler (func (* http.Request ) * gomcp.Server { return s }, nil ),
113- }
114- go func () { _ = srv .Serve (srvLn ) }()
115-
116- return func () { _ = srv .Close () }
117- }
118-
119- callPing := func (t * testing.T , ts * Toolset ) string {
120- t .Helper ()
121- result , callErr := ts .callTool (t .Context (), tools.ToolCall {
122- Function : tools.FunctionCall {Name : "ping" , Arguments : "{}" },
123- })
124- require .NoError (t , callErr )
125- return result .Output
126- }
127-
128- // --- Step 1–2: Start first server, connect toolset ---
129- shutdown1 := startServer (t )
130-
131- ts := NewRemoteToolset ("test" , fmt .Sprintf ("http://%s/mcp" , addr ), "streamable-http" , nil , nil )
132- require .NoError (t , ts .Start (t .Context ()))
133-
134- toolList , err := ts .Tools (t .Context ())
135- require .NoError (t , err )
136- require .Len (t , toolList , 1 )
137- assert .Equal (t , "test_ping" , toolList [0 ].Name )
138-
139- // --- Step 3: Call succeeds on original server ---
140- assert .Equal (t , "pong-1" , callPing (t , ts ))
141-
142- // --- Step 4: Shut down the server ---
143- shutdown1 ()
144-
145- // Capture the supervisor's restart channel so we can verify reconnection.
146- restartedCh := ts .supervisor .Restarted ()
147-
148- // --- Step 5–6: Start a fresh server, call again ---
149- shutdown2 := startServer (t )
150- t .Cleanup (func () {
151- _ = ts .Stop (t .Context ())
152- shutdown2 ()
153- })
154-
155- // This call triggers ErrSessionMissing recovery and must succeed transparently.
156- assert .Equal (t , "pong-2" , callPing (t , ts ))
157-
158- // Verify that watchConnection actually restarted the connection by checking
159- // that the restarted channel was closed (signaling reconnect completion).
160- select {
161- case <- restartedCh :
162- // Success: the channel was closed, meaning reconnect happened
163- case <- time .After (100 * time .Millisecond ):
164- t .Fatal ("reconnect did not complete: restarted channel was not closed" )
165- }
166- }
167-
168- // TestRemoteReconnectRefreshesTools verifies that after a remote MCP server
169- // restarts with a different set of tools, the Toolset picks up the new tools
170- // and notifies the runtime via the toolsChangedHandler.
171- //
172- // This is the scenario from https://github.com/docker/docker-agent/issues/2244:
173- // - Server v1 exposes tools [alpha, shared].
174- // - Client connects and caches [alpha, shared].
175- // - Server v1 shuts down; server v2 starts with tools [beta, shared].
176- // - A tool call to "shared" triggers reconnection.
177- // - After reconnection, Tools() must return [beta, shared], not the stale [alpha, shared].
178- // - The toolsChangedHandler must be called so the runtime refreshes its own state.
179- func TestRemoteReconnectRefreshesTools (t * testing.T ) {
180- t .Parallel ()
181-
182- addr := allocateAddr (t )
183-
184- // "shared" exists on both servers so we can call it to trigger reconnect.
185- sharedTool := & gomcp.Tool {Name : "shared" , InputSchema : & jsonschema.Schema {Type : "object" }}
186- alphaTool := & gomcp.Tool {Name : "alpha" , InputSchema : & jsonschema.Schema {Type : "object" }}
187- betaTool := & gomcp.Tool {Name : "beta" , InputSchema : & jsonschema.Schema {Type : "object" }}
188-
189- // --- Start server v1 with tools "alpha" + "shared" ---
190- shutdown1 := startMCPServer (t , addr , alphaTool , sharedTool )
191-
192- ts := NewRemoteToolset ("ns" , fmt .Sprintf ("http://%s/mcp" , addr ), "streamable-http" , nil , nil )
193-
194- // Track toolsChangedHandler invocations.
195- toolsChangedCh := make (chan struct {}, 1 )
196- ts .SetToolsChangedHandler (func () {
197- select {
198- case toolsChangedCh <- struct {}{}:
199- default :
200- }
201- })
202-
203- require .NoError (t , ts .Start (t .Context ()))
204-
205- // Verify initial tools.
206- toolList , err := ts .Tools (t .Context ())
207- require .NoError (t , err )
208- require .Len (t , toolList , 2 )
209- toolNames := []string {toolList [0 ].Name , toolList [1 ].Name }
210- assert .Contains (t , toolNames , "ns_alpha" )
211- assert .Contains (t , toolNames , "ns_shared" )
212-
213- // --- Shut down server v1, start server v2 with tools "beta" + "shared" ---
214- shutdown1 ()
215-
216- shutdown2 := startMCPServer (t , addr , betaTool , sharedTool )
217- t .Cleanup (func () {
218- _ = ts .Stop (t .Context ())
219- shutdown2 ()
220- })
221-
222- // Call "shared" to trigger ErrSessionMissing → reconnect.
223- result , callErr := ts .callTool (t .Context (), tools.ToolCall {
224- Function : tools.FunctionCall {Name : "shared" , Arguments : "{}" },
225- })
226- require .NoError (t , callErr )
227- assert .Equal (t , "ok-shared" , result .Output )
228-
229- // Wait for the toolsChangedHandler to be called (signals reconnect + refresh).
230- select {
231- case <- toolsChangedCh :
232- // Good — the handler was called.
233- case <- time .After (30 * time .Second ):
234- t .Fatal ("timed out waiting for toolsChangedHandler after reconnect" )
235- }
236-
237- // Verify the toolset now reports the new server's tools.
238- toolList , err = ts .Tools (t .Context ())
239- require .NoError (t , err )
240- require .Len (t , toolList , 2 , "expected exactly two tools from the new server" )
241- toolNames = []string {toolList [0 ].Name , toolList [1 ].Name }
242- assert .Contains (t , toolNames , "ns_beta" , "expected the new server's tool, got stale tool" )
243- assert .Contains (t , toolNames , "ns_shared" )
244- assert .NotContains (t , toolNames , "ns_alpha" , "stale tool from old server should not be present" )
245- }
246-
24720// failingInitClient is a mock mcpClient whose Initialize method returns a
24821// configurable error for the first N calls, then succeeds.
24922type failingInitClient struct {
0 commit comments