-
Notifications
You must be signed in to change notification settings - Fork 154
[DERCBOT-1774] Refactor proactive mechanism with Coordinator #1988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
antoine-billiet-arkea-partnre
commented
Jan 26, 2026
- Add DeferredMessageCoordinator to manage deferred message lifecycle per conversation
- Add DeferredConnector interface for connectors supporting deferred mode
- Implement automatic deferred mode in IadvizeConnector.handleRequest()
- Remove AbstractProactiveAnswerHandler and ProactiveConversationStatus (over-engineering)
- Simplify RAGAnswerHandler to use standard bus.end() pattern
- Fix race condition: remove premature finally block that forced flush before async completion
- Add integration tests for deferred flow and race condition demonstration
- Add DEBUG logs for proactive message flow investigation
…inator - Add DeferredMessageCoordinator to manage deferred message lifecycle per conversation - Add DeferredConnector interface for connectors supporting deferred mode - Implement automatic deferred mode in IadvizeConnector.handleRequest() - Remove AbstractProactiveAnswerHandler and ProactiveConversationStatus (over-engineering) - Simplify RAGAnswerHandler to use standard bus.end() pattern - Fix race condition: remove premature finally block that forced flush before async completion - Add integration tests for deferred flow and race condition demonstration - Add DEBUG logs for proactive message flow investigation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Refactors the iAdvize proactive/deferred response mechanism to avoid race conditions by introducing a per-conversation deferred message coordinator and simplifying RAG answer handling to rely on bus.end() semantics.
Changes:
- Introduces
DeferredMessageCoordinatorand deferred-mode handling inIadvizeConnectorto collect actions and flush onlastAnswer=true. - Removes the proactive conversation abstraction (
AbstractProactiveAnswerHandler,ProactiveConversationStatus) and updatesRAGAnswerHandlerto useend()directly. - Adds extensive debug logging and new iAdvize deferred-flow/race-condition tests.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| bot/engine/src/main/kotlin/engine/config/RAGAnswerHandler.kt | Uses bus.end() for final RAG message and handles no-answer routing; adds timeline save logic. |
| bot/engine/src/main/kotlin/engine/TockBotBus.kt | Adds debug logs around deferred message sending/closing. |
| bot/engine/src/main/kotlin/connector/ConnectorQueue.kt | Adds debug logs around queueing/sending/dequeueing actions. |
| bot/engine/src/main/kotlin/connector/Connector.kt | Updates KDoc around lastAnswer semantics and deferred-mode expectations. |
| bot/engine/src/main/kotlin/engine/config/AbstractProactiveAnswerHandler.kt | Removes proactive handler abstraction. |
| bot/engine/src/main/kotlin/engine/config/ProactiveConversationStatus.kt | Removes proactive conversation status enum. |
| bot/connector-iadvize/src/main/kotlin/IadvizeConnectorCallback.kt | Makes stored actions thread-safe; adds coordinator reference for deferred mode. |
| bot/connector-iadvize/src/main/kotlin/IadvizeConnector.kt | Implements deferred mode (ack immediately + flush via GraphQL on lastAnswer=true) and adds debug logs/error message config. |
| bot/connector-iadvize/src/main/kotlin/DeferredMessageCoordinator.kt | Adds per-conversation coordinator to safely collect/flush deferred actions. |
| bot/connector-iadvize/src/main/kotlin/DeferredConnector.kt | Adds a deferred-mode connector interface (currently implemented by iAdvize). |
| bot/connector-iadvize/src/test/kotlin/IadvizeRaceConditionDemoTest.kt | Adds race-condition “demo” tests (currently includes a non-asserting test). |
| bot/connector-iadvize/src/test/kotlin/IadvizeDeferredEndFlowTest.kt | Adds deferred flow tests for coordinator behavior and iAdvize send/flush behavior. |
| // Then | ||
| Thread.sleep(100) | ||
| assertEquals(3, graphQLCallCount.get()) | ||
| assertTrue(capturedMessages.contains("Message 1")) | ||
| assertTrue(capturedMessages.contains("Message 2")) | ||
| assertTrue(capturedMessages.contains("Final")) |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Thread.sleep(...) to wait for async work is timing-dependent and can make this test flaky under CI load. Prefer a deterministic wait (eg CountDownLatch, Awaitility, or MockK verify(timeout=...)) based on the expected number of GraphQL calls/messages.
| // Then - all messages flushed via GraphQL | ||
| Thread.sleep(100) | ||
| assertEquals(2, graphQLCallCount.get()) | ||
| assertTrue(capturedMessages.contains("Debug info")) | ||
| assertTrue(capturedMessages.contains("RAG Response with footnotes")) | ||
| assertNull(callback.deferredCoordinator) |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Thread.sleep(...) to wait for async work is timing-dependent and can make this test flaky under CI load. Prefer a deterministic wait (eg CountDownLatch, Awaitility, or MockK verify(timeout=...)) rather than sleeping a fixed duration.
| // Save the dialog | ||
| if (connectorData.saveTimeline) { | ||
| runBlocking { | ||
| userTimelineDAO.save(userTimeline, botDefinition) | ||
| } | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dialog is already persisted by the connector controller when connectorData.saveTimeline is enabled (see bot/engine/src/main/kotlin/engine/TockConnectorController.kt:196-198). Saving again here causes an extra write for the same request. Consider removing this block or limiting it only to the specific execution mode where the controller does not persist the timeline.
| val event = WebhookActionConverter.toEvent(iadvizeRequest, applicationId) | ||
| // Note: controller.handle() returns immediately as RAG processing is async | ||
| // The coordinator timeout (via TockBotBus.deferMessageSending 60s) handles errors | ||
| // DO NOT use finally block here - it would execute before async processing completes | ||
| executor.executeBlocking { | ||
| controller.handle(event, ConnectorData(callback, metadata = metadata)) | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In deferred mode, controller.handle(...) is executed asynchronously but there is no error/timeout path that flushes collected messages if the handler throws or returns without sending a lastAnswer=true action. The comment about TockBotBus.deferMessageSending handling errors does not currently trigger forceFlushCoordinator(...). Wrap the controller.handle call with try/catch and, on exception (or if callback.deferredCoordinator is still non-null after handle completes), call forceFlushCoordinator to end the coordinator and optionally send the deferred error message.
| val caller = Thread.currentThread().stackTrace.getOrNull(2)?.methodName ?: "unknown" | ||
|
|
||
| logger.debug { "SendSentenceWithFootnotes.sendByGraphQL(): caller=$caller, conversationId=$conversationId, chatBotId=$chatBotId, messageLength=${messageText.length}" } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread.currentThread().stackTrace is computed unconditionally to populate caller, which is relatively expensive and will run even when debug logging is disabled. Consider removing this, or only computing the stack trace when debug is enabled (eg guard with logger.isDebugEnabled).
| val caller = Thread.currentThread().stackTrace.getOrNull(2)?.methodName ?: "unknown" | |
| logger.debug { "SendSentenceWithFootnotes.sendByGraphQL(): caller=$caller, conversationId=$conversationId, chatBotId=$chatBotId, messageLength=${messageText.length}" } | |
| if (logger.isDebugEnabled) { | |
| val caller = Thread.currentThread().stackTrace.getOrNull(2)?.methodName ?: "unknown" | |
| logger.debug { "SendSentenceWithFootnotes.sendByGraphQL(): caller=$caller, conversationId=$conversationId, chatBotId=$chatBotId, messageLength=${messageText.length}" } | |
| } |
| val caller = Thread.currentThread().stackTrace.getOrNull(2)?.methodName ?: "unknown" | ||
|
|
||
| logger.debug { "IadvizeReply.sendByGraphQL(): caller=$caller, replyType=${this::class.simpleName}, conversationId=$conversationId, chatBotId=$chatBotId" } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above: computing Thread.currentThread().stackTrace for caller is expensive and happens even if debug logs are disabled. Guard this computation behind a debug-enabled check or remove it.
| // Then - consumer likely got FEWER messages than expected | ||
| // because it didn't wait for producer | ||
| // This is the race condition! | ||
| println("Race condition demo: flushed ${flushedMessages.size} of 5 messages") | ||
| // We can't assert exact count because it's a race condition | ||
| // but typically it will be less than 5 | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test currently has no assertions, so it will always pass and won't detect regressions. Given the current synchronization (producer sleeps before each add), the outcome can be made deterministic: assert that the consumer flushes 0 messages and that messages remain queued after the producer completes (or mark this as @Disabled and move it to documentation/examples).
| // Then - message sent via GraphQL | ||
| Thread.sleep(100) // Allow queue to process | ||
| assertEquals(1, graphQLCallCount.get()) | ||
| assertEquals("Final answer", capturedMessages.firstOrNull()) | ||
| assertNull(callback.deferredCoordinator) // Cleaned up |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Thread.sleep(...) to wait for async work is timing-dependent and can make this test flaky under CI load. Prefer a deterministic wait (eg CountDownLatch, Awaitility, or MockK verify(timeout=...)) that completes as soon as the expected GraphQL calls/messages are observed.