Skip to content

Conversation

@antoine-billiet-arkea-partnre
  • Add DeferredMessageCoordinator to manage deferred message lifecycle per conversation
  • Add DeferredConnector interface for connectors supporting deferred mode
  • Implement automatic deferred mode in IadvizeConnector.handleRequest()
  • Remove AbstractProactiveAnswerHandler and ProactiveConversationStatus (over-engineering)
  • Simplify RAGAnswerHandler to use standard bus.end() pattern
  • Fix race condition: remove premature finally block that forced flush before async completion
  • Add integration tests for deferred flow and race condition demonstration
  • Add DEBUG logs for proactive message flow investigation

…inator

- Add DeferredMessageCoordinator to manage deferred message lifecycle per conversation
- Add DeferredConnector interface for connectors supporting deferred mode
- Implement automatic deferred mode in IadvizeConnector.handleRequest()
- Remove AbstractProactiveAnswerHandler and ProactiveConversationStatus (over-engineering)
- Simplify RAGAnswerHandler to use standard bus.end() pattern
- Fix race condition: remove premature finally block that forced flush before async completion
- Add integration tests for deferred flow and race condition demonstration
- Add DEBUG logs for proactive message flow investigation
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors the iAdvize proactive/deferred response mechanism to avoid race conditions by introducing a per-conversation deferred message coordinator and simplifying RAG answer handling to rely on bus.end() semantics.

Changes:

  • Introduces DeferredMessageCoordinator and deferred-mode handling in IadvizeConnector to collect actions and flush on lastAnswer=true.
  • Removes the proactive conversation abstraction (AbstractProactiveAnswerHandler, ProactiveConversationStatus) and updates RAGAnswerHandler to use end() directly.
  • Adds extensive debug logging and new iAdvize deferred-flow/race-condition tests.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
bot/engine/src/main/kotlin/engine/config/RAGAnswerHandler.kt Uses bus.end() for final RAG message and handles no-answer routing; adds timeline save logic.
bot/engine/src/main/kotlin/engine/TockBotBus.kt Adds debug logs around deferred message sending/closing.
bot/engine/src/main/kotlin/connector/ConnectorQueue.kt Adds debug logs around queueing/sending/dequeueing actions.
bot/engine/src/main/kotlin/connector/Connector.kt Updates KDoc around lastAnswer semantics and deferred-mode expectations.
bot/engine/src/main/kotlin/engine/config/AbstractProactiveAnswerHandler.kt Removes proactive handler abstraction.
bot/engine/src/main/kotlin/engine/config/ProactiveConversationStatus.kt Removes proactive conversation status enum.
bot/connector-iadvize/src/main/kotlin/IadvizeConnectorCallback.kt Makes stored actions thread-safe; adds coordinator reference for deferred mode.
bot/connector-iadvize/src/main/kotlin/IadvizeConnector.kt Implements deferred mode (ack immediately + flush via GraphQL on lastAnswer=true) and adds debug logs/error message config.
bot/connector-iadvize/src/main/kotlin/DeferredMessageCoordinator.kt Adds per-conversation coordinator to safely collect/flush deferred actions.
bot/connector-iadvize/src/main/kotlin/DeferredConnector.kt Adds a deferred-mode connector interface (currently implemented by iAdvize).
bot/connector-iadvize/src/test/kotlin/IadvizeRaceConditionDemoTest.kt Adds race-condition “demo” tests (currently includes a non-asserting test).
bot/connector-iadvize/src/test/kotlin/IadvizeDeferredEndFlowTest.kt Adds deferred flow tests for coordinator behavior and iAdvize send/flush behavior.

Comment on lines +338 to +343
// Then
Thread.sleep(100)
assertEquals(3, graphQLCallCount.get())
assertTrue(capturedMessages.contains("Message 1"))
assertTrue(capturedMessages.contains("Message 2"))
assertTrue(capturedMessages.contains("Final"))
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Thread.sleep(...) to wait for async work is timing-dependent and can make this test flaky under CI load. Prefer a deterministic wait (eg CountDownLatch, Awaitility, or MockK verify(timeout=...)) based on the expected number of GraphQL calls/messages.

Copilot uses AI. Check for mistakes.
Comment on lines +399 to +404
// Then - all messages flushed via GraphQL
Thread.sleep(100)
assertEquals(2, graphQLCallCount.get())
assertTrue(capturedMessages.contains("Debug info"))
assertTrue(capturedMessages.contains("RAG Response with footnotes"))
assertNull(callback.deferredCoordinator)
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Thread.sleep(...) to wait for async work is timing-dependent and can make this test flaky under CI load. Prefer a deterministic wait (eg CountDownLatch, Awaitility, or MockK verify(timeout=...)) rather than sleeping a fixed duration.

Copilot uses AI. Check for mistakes.
Comment on lines +131 to +136
// Save the dialog
if (connectorData.saveTimeline) {
runBlocking {
userTimelineDAO.save(userTimeline, botDefinition)
}
}
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dialog is already persisted by the connector controller when connectorData.saveTimeline is enabled (see bot/engine/src/main/kotlin/engine/TockConnectorController.kt:196-198). Saving again here causes an extra write for the same request. Consider removing this block or limiting it only to the specific execution mode where the controller does not persist the timeline.

Copilot uses AI. Check for mistakes.
Comment on lines +526 to +532
val event = WebhookActionConverter.toEvent(iadvizeRequest, applicationId)
// Note: controller.handle() returns immediately as RAG processing is async
// The coordinator timeout (via TockBotBus.deferMessageSending 60s) handles errors
// DO NOT use finally block here - it would execute before async processing completes
executor.executeBlocking {
controller.handle(event, ConnectorData(callback, metadata = metadata))
}
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In deferred mode, controller.handle(...) is executed asynchronously but there is no error/timeout path that flushes collected messages if the handler throws or returns without sending a lastAnswer=true action. The comment about TockBotBus.deferMessageSending handling errors does not currently trigger forceFlushCoordinator(...). Wrap the controller.handle call with try/catch and, on exception (or if callback.deferredCoordinator is still non-null after handle completes), call forceFlushCoordinator to end the coordinator and optionally send the deferred error message.

Copilot uses AI. Check for mistakes.
Comment on lines +415 to +417
val caller = Thread.currentThread().stackTrace.getOrNull(2)?.methodName ?: "unknown"

logger.debug { "SendSentenceWithFootnotes.sendByGraphQL(): caller=$caller, conversationId=$conversationId, chatBotId=$chatBotId, messageLength=${messageText.length}" }
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.currentThread().stackTrace is computed unconditionally to populate caller, which is relatively expensive and will run even when debug logging is disabled. Consider removing this, or only computing the stack trace when debug is enabled (eg guard with logger.isDebugEnabled).

Suggested change
val caller = Thread.currentThread().stackTrace.getOrNull(2)?.methodName ?: "unknown"
logger.debug { "SendSentenceWithFootnotes.sendByGraphQL(): caller=$caller, conversationId=$conversationId, chatBotId=$chatBotId, messageLength=${messageText.length}" }
if (logger.isDebugEnabled) {
val caller = Thread.currentThread().stackTrace.getOrNull(2)?.methodName ?: "unknown"
logger.debug { "SendSentenceWithFootnotes.sendByGraphQL(): caller=$caller, conversationId=$conversationId, chatBotId=$chatBotId, messageLength=${messageText.length}" }
}

Copilot uses AI. Check for mistakes.
Comment on lines +449 to +451
val caller = Thread.currentThread().stackTrace.getOrNull(2)?.methodName ?: "unknown"

logger.debug { "IadvizeReply.sendByGraphQL(): caller=$caller, replyType=${this::class.simpleName}, conversationId=$conversationId, chatBotId=$chatBotId" }
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: computing Thread.currentThread().stackTrace for caller is expensive and happens even if debug logs are disabled. Guard this computation behind a debug-enabled check or remove it.

Copilot uses AI. Check for mistakes.
Comment on lines +133 to +139
// Then - consumer likely got FEWER messages than expected
// because it didn't wait for producer
// This is the race condition!
println("Race condition demo: flushed ${flushedMessages.size} of 5 messages")
// We can't assert exact count because it's a race condition
// but typically it will be less than 5
}
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test currently has no assertions, so it will always pass and won't detect regressions. Given the current synchronization (producer sleeps before each add), the outcome can be made deterministic: assert that the consumer flushes 0 messages and that messages remain queued after the producer completes (or mark this as @Disabled and move it to documentation/examples).

Copilot uses AI. Check for mistakes.
Comment on lines +319 to +323
// Then - message sent via GraphQL
Thread.sleep(100) // Allow queue to process
assertEquals(1, graphQLCallCount.get())
assertEquals("Final answer", capturedMessages.firstOrNull())
assertNull(callback.deferredCoordinator) // Cleaned up
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Thread.sleep(...) to wait for async work is timing-dependent and can make this test flaky under CI load. Prefer a deterministic wait (eg CountDownLatch, Awaitility, or MockK verify(timeout=...)) that completes as soon as the expected GraphQL calls/messages are observed.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant