Skip to content

Commit fa3ac9e

Browse files
authored
Merge pull request #2518 from simonferquel-clanker/fix/steer-queue-newline-separator
runtime: append newline to non-last steer messages on multi-drain
2 parents a3b9d61 + e2c2552 commit fa3ac9e

2 files changed

Lines changed: 244 additions & 14 deletions

File tree

‎pkg/runtime/loop.go‎

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,65 @@ func (r *LocalRuntime) appendSteerAndEmit(sess *session.Session, sm QueuedMessag
4747
events <- UserMessage(sm.Content, sess.ID, sm.MultiContent, len(sess.Messages)-1)
4848
}
4949

50+
// drainAndEmitSteered drains all messages from the steer queue and injects
51+
// them into the session as individual user messages. When multiple messages
52+
// are drained, a "\n" is appended to the content of every non-last message.
53+
// Some chat templates concatenate consecutive user messages without a
54+
// separator before tokenisation, which would cause trailing/leading word
55+
// fragments from adjacent messages to be glued together. The "\n" prevents
56+
// this without merging the messages into one.
57+
//
58+
// It also snapshots the message count before any messages are added and
59+
// returns it alongside the drained flag so the caller can pass it to
60+
// compactIfNeeded without a separate len(sess.GetAllMessages()) call.
61+
//
62+
// NOTE: the appended \n is persisted in the session message and included in
63+
// UserMessageEvent. This is a deliberate trade-off: because the runtime passes
64+
// chat.Message slices directly to the provider, this is the only injection
65+
// point that doesn't require restructuring. TUI consumers may see a trailing
66+
// newline on non-last steered messages in multi-drain batches.
67+
//
68+
// Returns (true, messageCountBefore) if any messages were drained and emitted;
69+
// (false, 0) otherwise.
70+
func (r *LocalRuntime) drainAndEmitSteered(ctx context.Context, sess *session.Session, events chan<- Event) (bool, int) {
71+
steered := r.steerQueue.Drain(ctx)
72+
if len(steered) == 0 {
73+
return false, 0
74+
}
75+
messageCountBefore := len(sess.GetAllMessages())
76+
for i, sm := range steered {
77+
if i < len(steered)-1 {
78+
sm = appendNewlineToQueuedMessage(sm)
79+
}
80+
r.appendSteerAndEmit(sess, sm, events)
81+
}
82+
return true, messageCountBefore
83+
}
84+
85+
// appendNewlineToQueuedMessage returns sm with "\n" appended to its text
86+
// content; never mutates the caller's slice contents.
87+
// For plain-text messages Content is extended. For multi-content messages
88+
// only the last part is considered: if it is a text part, "\n" is appended
89+
// to it in a shallow copy of the slice. If the last part is not text type
90+
// (e.g. image), sm is returned unchanged — non-text parts carry their own
91+
// provider envelope that acts as a separator.
92+
func appendNewlineToQueuedMessage(sm QueuedMessage) QueuedMessage {
93+
if len(sm.MultiContent) == 0 {
94+
sm.Content += "\n"
95+
return sm
96+
}
97+
// Only act if the last part is a text part.
98+
last := len(sm.MultiContent) - 1
99+
if sm.MultiContent[last].Type != chat.MessagePartTypeText {
100+
return sm
101+
}
102+
// Shallow-copy the slice so we don't mutate the original.
103+
parts := append([]chat.MessagePart(nil), sm.MultiContent...)
104+
parts[last].Text += "\n"
105+
sm.MultiContent = parts
106+
return sm
107+
}
108+
50109
// finalizeEventChannel performs cleanup at the end of a RunStream goroutine:
51110
// restores the previous elicitation channel, emits the StreamStopped event,
52111
// fires hooks, and closes the events channel.
@@ -302,11 +361,7 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
302361

303362
// Drain steer messages queued while idle or before the first model call
304363
// (covers idle-window and first-turn-miss races).
305-
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
306-
messageCountBeforeSteer := len(sess.GetAllMessages())
307-
for _, sm := range steered {
308-
r.appendSteerAndEmit(sess, sm, events)
309-
}
364+
if drained, messageCountBeforeSteer := r.drainAndEmitSteered(ctx, sess, events); drained {
310365
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeSteer, events)
311366
}
312367

@@ -435,11 +490,7 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
435490
toolModelOverride = resolveToolCallModelOverride(res.Calls, agentTools)
436491

437492
// Drain steer messages that arrived during tool calls.
438-
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
439-
for _, sm := range steered {
440-
r.appendSteerAndEmit(sess, sm, events)
441-
}
442-
493+
if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained {
443494
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
444495
continue
445496
}
@@ -449,10 +500,7 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
449500
r.executeStopHooks(ctx, sess, a, res.Content, events)
450501

451502
// Re-check steer queue: closes the race between the mid-loop drain and this stop.
452-
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
453-
for _, sm := range steered {
454-
r.appendSteerAndEmit(sess, sm, events)
455-
}
503+
if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained {
456504
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
457505
continue
458506
}

‎pkg/runtime/runtime_test.go‎

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2588,3 +2588,185 @@ func TestSteer_EndOfIterationRaceIsConsumedInCurrentRunStream(t *testing.T) {
25882588
assert.NotContains(t, steerSessionMsg.Message.Content, "<system-reminder>",
25892589
"end-of-iteration steer must NOT use the system-reminder envelope")
25902590
}
2591+
2592+
func TestAppendNewlineToQueuedMessage(t *testing.T) {
2593+
t.Parallel()
2594+
2595+
t.Run("plain-text message gets newline appended to Content", func(t *testing.T) {
2596+
sm := QueuedMessage{Content: "hello"}
2597+
got := appendNewlineToQueuedMessage(sm)
2598+
assert.Equal(t, "hello\n", got.Content)
2599+
assert.Nil(t, got.MultiContent)
2600+
})
2601+
2602+
t.Run("multi-content message with last part text gets newline on that part", func(t *testing.T) {
2603+
sm := QueuedMessage{
2604+
MultiContent: []chat.MessagePart{
2605+
{Type: chat.MessagePartTypeImageURL, ImageURL: &chat.MessageImageURL{URL: "https://example.com/img.png"}},
2606+
{Type: chat.MessagePartTypeText, Text: "and this"},
2607+
},
2608+
}
2609+
got := appendNewlineToQueuedMessage(sm)
2610+
// Last part is text — \n appended to it.
2611+
assert.Equal(t, "and this\n", got.MultiContent[1].Text)
2612+
// Image part unchanged.
2613+
assert.Equal(t, chat.MessagePartTypeImageURL, got.MultiContent[0].Type)
2614+
})
2615+
2616+
t.Run("multi-content message with last part non-text is returned unchanged", func(t *testing.T) {
2617+
sm := QueuedMessage{
2618+
MultiContent: []chat.MessagePart{
2619+
{Type: chat.MessagePartTypeText, Text: "look at this"},
2620+
{Type: chat.MessagePartTypeImageURL, ImageURL: &chat.MessageImageURL{URL: "https://example.com/img.png"}},
2621+
},
2622+
}
2623+
got := appendNewlineToQueuedMessage(sm)
2624+
// Last part is image — non-text parts have their own envelope separator;
2625+
// return unchanged.
2626+
assert.Equal(t, "look at this", got.MultiContent[0].Text)
2627+
assert.Equal(t, chat.MessagePartTypeImageURL, got.MultiContent[1].Type)
2628+
})
2629+
2630+
t.Run("multi-content message with no text part is returned unchanged", func(t *testing.T) {
2631+
sm := QueuedMessage{
2632+
MultiContent: []chat.MessagePart{
2633+
{Type: chat.MessagePartTypeImageURL, ImageURL: &chat.MessageImageURL{URL: "https://example.com/img.png"}},
2634+
},
2635+
}
2636+
got := appendNewlineToQueuedMessage(sm)
2637+
// Image-only messages have no text part to append \n to; they are immune to
2638+
// the run-on tokenisation problem because non-text parts carry their own
2639+
// envelope that acts as a separator. Return unchanged.
2640+
require.Len(t, got.MultiContent, 1)
2641+
assert.Equal(t, chat.MessagePartTypeImageURL, got.MultiContent[0].Type)
2642+
})
2643+
2644+
t.Run("original QueuedMessage is not mutated", func(t *testing.T) {
2645+
parts := []chat.MessagePart{
2646+
{Type: chat.MessagePartTypeText, Text: "original"},
2647+
}
2648+
sm := QueuedMessage{MultiContent: parts}
2649+
_ = appendNewlineToQueuedMessage(sm)
2650+
assert.Equal(t, "original", parts[0].Text, "original slice must not be mutated")
2651+
})
2652+
2653+
t.Run("plain-text original not mutated", func(t *testing.T) {
2654+
sm := QueuedMessage{Content: "x"}
2655+
_ = appendNewlineToQueuedMessage(sm)
2656+
assert.Equal(t, "x", sm.Content)
2657+
})
2658+
}
2659+
2660+
// TestDrainAndEmitSteered_MultipleMessages verifies that when multiple messages
2661+
// are drained from the steer queue, each is emitted as a separate session
2662+
// message and non-last messages have "\n" appended to their content, preventing
2663+
// the LLM from tokenising adjacent words across message boundaries as a run-on
2664+
// string.
2665+
func TestDrainAndEmitSteered_MultipleMessages(t *testing.T) {
2666+
t.Parallel()
2667+
2668+
// Use a stream that never gets called — we only exercise drainAndEmitSteered directly.
2669+
prov := &mockProvider{id: "test/mock-model", stream: &mockStream{}}
2670+
root := agent.New("root", "You are a test agent", agent.WithModel(prov))
2671+
tm := team.New(team.WithAgents(root))
2672+
2673+
rt, err := NewLocalRuntime(tm, WithSessionCompaction(false), WithModelStore(mockModelStore{}))
2674+
require.NoError(t, err)
2675+
2676+
// Enqueue three plain-text steer messages before draining.
2677+
require.NoError(t, rt.Steer(QueuedMessage{Content: "first"}))
2678+
require.NoError(t, rt.Steer(QueuedMessage{Content: "second"}))
2679+
require.NoError(t, rt.Steer(QueuedMessage{Content: "third"}))
2680+
2681+
sess := session.New()
2682+
events := make(chan Event, 16)
2683+
2684+
drained, _ := rt.drainAndEmitSteered(t.Context(), sess, events)
2685+
close(events)
2686+
2687+
assert.True(t, drained, "should report messages were drained")
2688+
2689+
// Three separate session messages must have been added.
2690+
var userMsgs []string
2691+
for _, item := range sess.Messages {
2692+
if item.IsMessage() && item.Message.Message.Role == chat.MessageRoleUser {
2693+
userMsgs = append(userMsgs, item.Message.Message.Content)
2694+
}
2695+
}
2696+
require.Len(t, userMsgs, 3, "expected 3 independent user messages")
2697+
2698+
// Non-last messages must have "\n" appended; the last must not.
2699+
assert.Equal(t, "first\n", userMsgs[0])
2700+
assert.Equal(t, "second\n", userMsgs[1])
2701+
assert.Equal(t, "third", userMsgs[2])
2702+
2703+
// The UserMessageEvent contents must mirror the session messages.
2704+
var eventMsgs []string
2705+
for ev := range events {
2706+
if ue, ok := ev.(*UserMessageEvent); ok {
2707+
eventMsgs = append(eventMsgs, ue.Message)
2708+
}
2709+
}
2710+
require.Len(t, eventMsgs, 3)
2711+
assert.Equal(t, "first\n", eventMsgs[0])
2712+
assert.Equal(t, "second\n", eventMsgs[1])
2713+
assert.Equal(t, "third", eventMsgs[2])
2714+
}
2715+
2716+
// TestDrainAndEmitSteered_MultiContent verifies that the "\n" separator is
2717+
// correctly appended to multi-content messages: specifically to the last text
2718+
// part rather than the Content field.
2719+
func TestDrainAndEmitSteered_MultiContent(t *testing.T) {
2720+
t.Parallel()
2721+
2722+
prov := &mockProvider{id: "test/mock-model", stream: &mockStream{}}
2723+
root := agent.New("root", "You are a test agent", agent.WithModel(prov))
2724+
tm := team.New(team.WithAgents(root))
2725+
2726+
rt, err := NewLocalRuntime(tm, WithSessionCompaction(false), WithModelStore(mockModelStore{}))
2727+
require.NoError(t, err)
2728+
2729+
// Two multi-content messages.
2730+
require.NoError(t, rt.Steer(QueuedMessage{
2731+
Content: "first",
2732+
MultiContent: []chat.MessagePart{
2733+
{Type: chat.MessagePartTypeText, Text: "first"},
2734+
{Type: chat.MessagePartTypeImageURL, ImageURL: &chat.MessageImageURL{URL: "https://example.com/a.png"}},
2735+
{Type: chat.MessagePartTypeText, Text: "first-text-after-img"},
2736+
},
2737+
}))
2738+
require.NoError(t, rt.Steer(QueuedMessage{
2739+
Content: "second",
2740+
MultiContent: []chat.MessagePart{
2741+
{Type: chat.MessagePartTypeText, Text: "second"},
2742+
},
2743+
}))
2744+
2745+
sess := session.New()
2746+
events := make(chan Event, 16)
2747+
2748+
drained, _ := rt.drainAndEmitSteered(t.Context(), sess, events)
2749+
close(events)
2750+
2751+
assert.True(t, drained)
2752+
2753+
// Two session messages.
2754+
var items []session.Item
2755+
for _, item := range sess.Messages {
2756+
if item.IsMessage() && item.Message.Message.Role == chat.MessageRoleUser {
2757+
items = append(items, item)
2758+
}
2759+
}
2760+
require.Len(t, items, 2)
2761+
2762+
// First message: last text part must have "\n" appended.
2763+
firstParts := items[0].Message.Message.MultiContent
2764+
require.Len(t, firstParts, 3)
2765+
assert.Equal(t, "first-text-after-img\n", firstParts[2].Text, "last text part of non-last message should have \\n")
2766+
assert.Equal(t, "first", firstParts[0].Text, "other text parts must be unchanged")
2767+
2768+
// Second (last) message: no modification.
2769+
secondParts := items[1].Message.Message.MultiContent
2770+
require.Len(t, secondParts, 1)
2771+
assert.Equal(t, "second", secondParts[0].Text, "last message must not be modified")
2772+
}

0 commit comments

Comments
 (0)