343 lines
10 KiB
Go
343 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
collaborationStateWaitingHuman = "waiting_human"
|
|
collaborationStateReviewing = "reviewing_human"
|
|
collaborationStateTakeover = "ai_takeover"
|
|
)
|
|
|
|
type collaborationSession struct {
|
|
Key string
|
|
ConversationKey string
|
|
State string
|
|
Msg autoReplyMessage
|
|
RawData map[string]interface{}
|
|
ClientID int32
|
|
ConversationID string
|
|
RobotID string
|
|
LastCustomerMessageAt time.Time
|
|
LastHumanReplyAt time.Time
|
|
LastUpdatedAt time.Time
|
|
Generation int64
|
|
HumanReplies []string
|
|
ReviewScheduled bool
|
|
TakeoverStartedAt time.Time
|
|
LastTakeoverActivityAt time.Time
|
|
}
|
|
|
|
func (e *AutoReplyEngine) maybeHandleCollaborationCustomer(job AutoReplyJob, msg autoReplyMessage) bool {
|
|
cfg := e.getConfig()
|
|
if job.SkipCollaboration || !cfg.Collaboration.Enabled {
|
|
return false
|
|
}
|
|
if strings.TrimSpace(msg.ConversationID) == "" || msg.isSelfMessage() || msg.SenderIdentity == senderIdentityInternal {
|
|
return false
|
|
}
|
|
key := e.collaborationKeyForMessage(msg)
|
|
now := time.Now()
|
|
e.mu.Lock()
|
|
if e.collaborations == nil {
|
|
e.collaborations = make(map[string]*collaborationSession)
|
|
}
|
|
session := e.collaborations[key]
|
|
if session != nil && session.State == collaborationStateTakeover {
|
|
session.Msg = msg
|
|
session.RawData = job.RawData
|
|
session.LastCustomerMessageAt = now
|
|
session.LastTakeoverActivityAt = now
|
|
session.LastUpdatedAt = now
|
|
e.mu.Unlock()
|
|
e.noteReason("collaboration_takeover_instant_reply")
|
|
return false
|
|
}
|
|
if session == nil {
|
|
session = &collaborationSession{
|
|
Key: key,
|
|
ConversationKey: e.humanAssistConversationKey(msg),
|
|
ClientID: msg.ClientID,
|
|
ConversationID: msg.ConversationID,
|
|
RobotID: msg.RobotID,
|
|
}
|
|
e.collaborations[key] = session
|
|
}
|
|
session.State = collaborationStateWaitingHuman
|
|
session.Msg = msg
|
|
session.RawData = job.RawData
|
|
session.ClientID = msg.ClientID
|
|
session.ConversationID = msg.ConversationID
|
|
session.RobotID = msg.RobotID
|
|
session.LastCustomerMessageAt = now
|
|
session.LastUpdatedAt = now
|
|
session.HumanReplies = nil
|
|
session.ReviewScheduled = false
|
|
session.Generation++
|
|
generation := session.Generation
|
|
e.mu.Unlock()
|
|
e.noteReason("collaboration_waiting_human")
|
|
go e.finishCollaborationWaitAfterDelay(key, generation)
|
|
return true
|
|
}
|
|
|
|
func (e *AutoReplyEngine) finishCollaborationWaitAfterDelay(key string, generation int64) {
|
|
cfg := e.getConfig()
|
|
wait := time.Duration(cfg.Collaboration.HumanWaitSeconds) * time.Second
|
|
if wait <= 0 {
|
|
wait = 180 * time.Second
|
|
}
|
|
time.Sleep(wait)
|
|
var retry AutoReplyJob
|
|
var recordMsg autoReplyMessage
|
|
shouldTakeover := false
|
|
e.mu.Lock()
|
|
session := e.collaborations[key]
|
|
if session != nil && session.Generation == generation && session.State == collaborationStateWaitingHuman && len(session.HumanReplies) == 0 {
|
|
session.State = collaborationStateTakeover
|
|
session.TakeoverStartedAt = time.Now()
|
|
session.LastTakeoverActivityAt = session.TakeoverStartedAt
|
|
session.LastUpdatedAt = session.TakeoverStartedAt
|
|
recordMsg = session.Msg
|
|
retry = AutoReplyJob{
|
|
ClientID: session.Msg.ClientID,
|
|
RawData: session.RawData,
|
|
ReceivedAt: session.LastCustomerMessageAt,
|
|
SkipHumanAssist: true,
|
|
SkipCollaboration: true,
|
|
ForceNoCooldown: true,
|
|
SupplementReason: "collaboration_takeover",
|
|
}
|
|
shouldTakeover = true
|
|
}
|
|
e.mu.Unlock()
|
|
if !shouldTakeover {
|
|
return
|
|
}
|
|
e.incStatus("collaboration_takeover")
|
|
e.noteReason("collaboration_takeover")
|
|
e.addRecord(AutoReplyRecord{
|
|
RobotID: recordMsg.RobotID,
|
|
ClientID: recordMsg.ClientID,
|
|
UserID: recordMsg.RobotID,
|
|
ConversationID: recordMsg.ConversationID,
|
|
Source: recordMsg.sourceLabel(),
|
|
FromWxID: recordMsg.FromWxID,
|
|
FromNickName: recordMsg.FromNickName,
|
|
Question: recordMsg.Content,
|
|
Action: "takeover",
|
|
Reason: "collaboration_takeover",
|
|
SenderIdentity: recordMsg.SenderIdentity,
|
|
IdentitySource: recordMsg.IdentitySource,
|
|
})
|
|
e.enqueueCollaborationRetry(retry, recordMsg, "collaboration_takeover_queue_full")
|
|
}
|
|
|
|
func (e *AutoReplyEngine) observeCollaborationHumanReply(msg autoReplyMessage) bool {
|
|
content := strings.TrimSpace(msg.Content)
|
|
if content == "" || strings.TrimSpace(msg.ConversationID) == "" {
|
|
return false
|
|
}
|
|
if e.consumeAutoSentMessage(msg) {
|
|
return true
|
|
}
|
|
cfg := e.getConfig()
|
|
if !cfg.Collaboration.Enabled {
|
|
return false
|
|
}
|
|
if len([]rune(content)) < cfg.HumanAssist.MinimumHumanReplyLengthRunes {
|
|
return false
|
|
}
|
|
conversationKey := e.humanAssistConversationKey(msg)
|
|
type reviewTarget struct {
|
|
Key string
|
|
Generation int64
|
|
}
|
|
targets := make([]reviewTarget, 0, 1)
|
|
e.mu.Lock()
|
|
for key, session := range e.collaborations {
|
|
if session == nil || session.ConversationKey != conversationKey {
|
|
continue
|
|
}
|
|
session.HumanReplies = append(session.HumanReplies, content)
|
|
session.LastHumanReplyAt = time.Now()
|
|
session.LastUpdatedAt = session.LastHumanReplyAt
|
|
if session.State == collaborationStateTakeover {
|
|
session.State = collaborationStateReviewing
|
|
} else if session.State == collaborationStateWaitingHuman {
|
|
session.State = collaborationStateReviewing
|
|
}
|
|
session.Generation++
|
|
if !session.ReviewScheduled {
|
|
session.ReviewScheduled = true
|
|
targets = append(targets, reviewTarget{Key: key, Generation: session.Generation})
|
|
}
|
|
e.status.HumanAssistObservedCount++
|
|
}
|
|
e.mu.Unlock()
|
|
for _, target := range targets {
|
|
go e.reviewCollaborationHumanReplyAfterDelay(target.Key, target.Generation)
|
|
}
|
|
return len(targets) > 0
|
|
}
|
|
|
|
func (e *AutoReplyEngine) reviewCollaborationHumanReplyAfterDelay(key string, generation int64) {
|
|
cfg := e.getConfig()
|
|
delay := time.Duration(cfg.Collaboration.AfterHumanReplyDelaySeconds) * time.Second
|
|
if delay > 0 {
|
|
time.Sleep(delay)
|
|
}
|
|
var session *collaborationSession
|
|
e.mu.Lock()
|
|
current := e.collaborations[key]
|
|
if current != nil && current.Generation == generation && current.State == collaborationStateReviewing {
|
|
copySession := *current
|
|
copySession.HumanReplies = append([]string(nil), current.HumanReplies...)
|
|
session = ©Session
|
|
}
|
|
e.mu.Unlock()
|
|
if session == nil {
|
|
return
|
|
}
|
|
assessment := e.assessHumanReply(session.Msg, session.HumanReplies)
|
|
if assessment.Decision == "sufficient" {
|
|
e.finishCollaborationSession(key)
|
|
e.incStatus("ignored")
|
|
e.noteReason("collaboration_human_sufficient")
|
|
e.addRecord(AutoReplyRecord{
|
|
RobotID: session.Msg.RobotID,
|
|
ClientID: session.Msg.ClientID,
|
|
UserID: session.Msg.RobotID,
|
|
ConversationID: session.Msg.ConversationID,
|
|
Source: session.Msg.sourceLabel(),
|
|
FromWxID: session.Msg.FromWxID,
|
|
FromNickName: session.Msg.FromNickName,
|
|
Question: session.Msg.Content,
|
|
Action: "ignored",
|
|
Reason: "collaboration_human_sufficient: " + assessment.Reason,
|
|
Answer: strings.Join(session.HumanReplies, "\n"),
|
|
SenderIdentity: session.Msg.SenderIdentity,
|
|
IdentitySource: session.Msg.IdentitySource,
|
|
})
|
|
return
|
|
}
|
|
e.noteReason("collaboration_human_need_supplement")
|
|
e.incStatus("collaboration_supplemented")
|
|
e.finishCollaborationSession(key)
|
|
retry := AutoReplyJob{
|
|
ClientID: session.Msg.ClientID,
|
|
RawData: session.RawData,
|
|
ReceivedAt: session.LastCustomerMessageAt,
|
|
SkipHumanAssist: true,
|
|
SkipCollaboration: true,
|
|
ForceNoCooldown: true,
|
|
SupplementReason: "collaboration_supplemented",
|
|
}
|
|
e.enqueueCollaborationRetry(retry, session.Msg, "collaboration_supplement_queue_full")
|
|
}
|
|
|
|
func (e *AutoReplyEngine) collaborationSweepLoop() {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
e.cleanupIdleCollaborations()
|
|
}
|
|
}
|
|
|
|
func (e *AutoReplyEngine) cleanupIdleCollaborations() {
|
|
cfg := e.getConfig()
|
|
idle := time.Duration(cfg.Collaboration.TakeoverIdleExitSeconds) * time.Second
|
|
if idle <= 0 {
|
|
idle = 300 * time.Second
|
|
}
|
|
now := time.Now()
|
|
closed := 0
|
|
e.mu.Lock()
|
|
for key, session := range e.collaborations {
|
|
if session == nil {
|
|
delete(e.collaborations, key)
|
|
continue
|
|
}
|
|
if session.State == collaborationStateTakeover && now.Sub(session.LastCustomerMessageAt) >= idle {
|
|
delete(e.collaborations, key)
|
|
closed++
|
|
}
|
|
}
|
|
e.mu.Unlock()
|
|
if closed > 0 {
|
|
e.noteReason("collaboration_takeover_idle_closed")
|
|
}
|
|
}
|
|
|
|
func (e *AutoReplyEngine) finishCollaborationSession(key string) {
|
|
e.mu.Lock()
|
|
delete(e.collaborations, key)
|
|
e.mu.Unlock()
|
|
}
|
|
|
|
func (e *AutoReplyEngine) collaborationCountsLocked() (int, int) {
|
|
waiting := 0
|
|
takeover := 0
|
|
for _, session := range e.collaborations {
|
|
if session == nil {
|
|
continue
|
|
}
|
|
switch session.State {
|
|
case collaborationStateWaitingHuman, collaborationStateReviewing:
|
|
waiting++
|
|
case collaborationStateTakeover:
|
|
takeover++
|
|
}
|
|
}
|
|
return waiting, takeover
|
|
}
|
|
|
|
func (e *AutoReplyEngine) isCollaborationTakeoverMessage(msg autoReplyMessage) bool {
|
|
cfg := e.getConfig()
|
|
if !cfg.Collaboration.Enabled {
|
|
return false
|
|
}
|
|
key := e.collaborationKeyForMessage(msg)
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
session := e.collaborations[key]
|
|
return session != nil && session.State == collaborationStateTakeover
|
|
}
|
|
|
|
func (e *AutoReplyEngine) collaborationKeyForMessage(msg autoReplyMessage) string {
|
|
return e.contextKeyForMessage(msg)
|
|
}
|
|
|
|
func (e *AutoReplyEngine) enqueueCollaborationRetry(job AutoReplyJob, msg autoReplyMessage, failureReason string) {
|
|
select {
|
|
case e.queue <- job:
|
|
default:
|
|
e.setLastErrorWithScope(autoReplyErrorScopeRecords, failureReason)
|
|
e.addRecord(AutoReplyRecord{
|
|
RobotID: msg.RobotID,
|
|
ClientID: msg.ClientID,
|
|
UserID: msg.RobotID,
|
|
ConversationID: msg.ConversationID,
|
|
Source: msg.sourceLabel(),
|
|
FromWxID: msg.FromWxID,
|
|
FromNickName: msg.FromNickName,
|
|
Question: msg.Content,
|
|
Action: "failed",
|
|
Reason: failureReason,
|
|
SenderIdentity: msg.SenderIdentity,
|
|
IdentitySource: msg.IdentitySource,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (e *AutoReplyEngine) collaborationRetryReason(job AutoReplyJob, fallback string) string {
|
|
reason := strings.TrimSpace(job.SupplementReason)
|
|
if reason == "" {
|
|
return fallback
|
|
}
|
|
return fmt.Sprintf("%s:%s", fallback, reason)
|
|
}
|