package main import ( "fmt" "strings" "time" ) const ( collaborationStateWaitingHuman = "waiting_human" collaborationStateReviewing = "reviewing_human" collaborationStateTakeover = "ai_takeover" ) type collaborationSession struct { Key string ConversationKey string State string Msg autoReplyMessage RawData map[string]interface{} ClientID int32 ConversationID string RobotID string LastCustomerMessageAt time.Time LastHumanReplyAt time.Time LastUpdatedAt time.Time Generation int64 HumanReplies []string ReviewScheduled bool TakeoverStartedAt time.Time LastTakeoverActivityAt time.Time } func (e *AutoReplyEngine) maybeHandleCollaborationCustomer(job AutoReplyJob, msg autoReplyMessage) bool { cfg := e.getConfig() if job.SkipCollaboration || !cfg.Collaboration.Enabled { return false } if strings.TrimSpace(msg.ConversationID) == "" || msg.isSelfMessage() || msg.SenderIdentity == senderIdentityInternal { return false } key := e.collaborationKeyForMessage(msg) now := time.Now() e.mu.Lock() if e.collaborations == nil { e.collaborations = make(map[string]*collaborationSession) } session := e.collaborations[key] if session != nil && session.State == collaborationStateTakeover { session.Msg = msg session.RawData = job.RawData session.LastCustomerMessageAt = now session.LastTakeoverActivityAt = now session.LastUpdatedAt = now e.mu.Unlock() e.noteReason("collaboration_takeover_instant_reply") return false } if session == nil { session = &collaborationSession{ Key: key, ConversationKey: e.humanAssistConversationKey(msg), ClientID: msg.ClientID, ConversationID: msg.ConversationID, RobotID: msg.RobotID, } e.collaborations[key] = session } session.State = collaborationStateWaitingHuman session.Msg = msg session.RawData = job.RawData session.ClientID = msg.ClientID session.ConversationID = msg.ConversationID session.RobotID = msg.RobotID session.LastCustomerMessageAt = now session.LastUpdatedAt = now session.HumanReplies = nil session.ReviewScheduled = false session.Generation++ generation := session.Generation e.mu.Unlock() e.noteReason("collaboration_waiting_human") go e.finishCollaborationWaitAfterDelay(key, generation) return true } func (e *AutoReplyEngine) finishCollaborationWaitAfterDelay(key string, generation int64) { cfg := e.getConfig() wait := time.Duration(cfg.Collaboration.HumanWaitSeconds) * time.Second if wait <= 0 { wait = 180 * time.Second } time.Sleep(wait) var retry AutoReplyJob var recordMsg autoReplyMessage shouldTakeover := false e.mu.Lock() session := e.collaborations[key] if session != nil && session.Generation == generation && session.State == collaborationStateWaitingHuman && len(session.HumanReplies) == 0 { session.State = collaborationStateTakeover session.TakeoverStartedAt = time.Now() session.LastTakeoverActivityAt = session.TakeoverStartedAt session.LastUpdatedAt = session.TakeoverStartedAt recordMsg = session.Msg retry = AutoReplyJob{ ClientID: session.Msg.ClientID, RawData: session.RawData, ReceivedAt: session.LastCustomerMessageAt, SkipHumanAssist: true, SkipCollaboration: true, ForceNoCooldown: true, SupplementReason: "collaboration_takeover", } shouldTakeover = true } e.mu.Unlock() if !shouldTakeover { return } e.incStatus("collaboration_takeover") e.noteReason("collaboration_takeover") e.addRecord(AutoReplyRecord{ RobotID: recordMsg.RobotID, ClientID: recordMsg.ClientID, UserID: recordMsg.RobotID, ConversationID: recordMsg.ConversationID, Source: recordMsg.sourceLabel(), FromWxID: recordMsg.FromWxID, FromNickName: recordMsg.FromNickName, Question: recordMsg.Content, Action: "takeover", Reason: "collaboration_takeover", SenderIdentity: recordMsg.SenderIdentity, IdentitySource: recordMsg.IdentitySource, }) e.enqueueCollaborationRetry(retry, recordMsg, "collaboration_takeover_queue_full") } func (e *AutoReplyEngine) observeCollaborationHumanReply(msg autoReplyMessage) bool { content := strings.TrimSpace(msg.Content) if content == "" || strings.TrimSpace(msg.ConversationID) == "" { return false } if e.consumeAutoSentMessage(msg) { return true } cfg := e.getConfig() if !cfg.Collaboration.Enabled { return false } if len([]rune(content)) < cfg.HumanAssist.MinimumHumanReplyLengthRunes { return false } conversationKey := e.humanAssistConversationKey(msg) type reviewTarget struct { Key string Generation int64 } targets := make([]reviewTarget, 0, 1) e.mu.Lock() for key, session := range e.collaborations { if session == nil || session.ConversationKey != conversationKey { continue } session.HumanReplies = append(session.HumanReplies, content) session.LastHumanReplyAt = time.Now() session.LastUpdatedAt = session.LastHumanReplyAt if session.State == collaborationStateTakeover { session.State = collaborationStateReviewing } else if session.State == collaborationStateWaitingHuman { session.State = collaborationStateReviewing } session.Generation++ if !session.ReviewScheduled { session.ReviewScheduled = true targets = append(targets, reviewTarget{Key: key, Generation: session.Generation}) } e.status.HumanAssistObservedCount++ } e.mu.Unlock() for _, target := range targets { go e.reviewCollaborationHumanReplyAfterDelay(target.Key, target.Generation) } return len(targets) > 0 } func (e *AutoReplyEngine) reviewCollaborationHumanReplyAfterDelay(key string, generation int64) { cfg := e.getConfig() delay := time.Duration(cfg.Collaboration.AfterHumanReplyDelaySeconds) * time.Second if delay > 0 { time.Sleep(delay) } var session *collaborationSession e.mu.Lock() current := e.collaborations[key] if current != nil && current.Generation == generation && current.State == collaborationStateReviewing { copySession := *current copySession.HumanReplies = append([]string(nil), current.HumanReplies...) session = ©Session } e.mu.Unlock() if session == nil { return } assessment := e.assessHumanReply(session.Msg, session.HumanReplies) if assessment.Decision == "sufficient" { e.finishCollaborationSession(key) e.incStatus("ignored") e.noteReason("collaboration_human_sufficient") e.addRecord(AutoReplyRecord{ RobotID: session.Msg.RobotID, ClientID: session.Msg.ClientID, UserID: session.Msg.RobotID, ConversationID: session.Msg.ConversationID, Source: session.Msg.sourceLabel(), FromWxID: session.Msg.FromWxID, FromNickName: session.Msg.FromNickName, Question: session.Msg.Content, Action: "ignored", Reason: "collaboration_human_sufficient: " + assessment.Reason, Answer: strings.Join(session.HumanReplies, "\n"), SenderIdentity: session.Msg.SenderIdentity, IdentitySource: session.Msg.IdentitySource, }) return } e.noteReason("collaboration_human_need_supplement") e.incStatus("collaboration_supplemented") e.finishCollaborationSession(key) retry := AutoReplyJob{ ClientID: session.Msg.ClientID, RawData: session.RawData, ReceivedAt: session.LastCustomerMessageAt, SkipHumanAssist: true, SkipCollaboration: true, ForceNoCooldown: true, SupplementReason: "collaboration_supplemented", } e.enqueueCollaborationRetry(retry, session.Msg, "collaboration_supplement_queue_full") } func (e *AutoReplyEngine) collaborationSweepLoop() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for range ticker.C { e.cleanupIdleCollaborations() } } func (e *AutoReplyEngine) cleanupIdleCollaborations() { cfg := e.getConfig() idle := time.Duration(cfg.Collaboration.TakeoverIdleExitSeconds) * time.Second if idle <= 0 { idle = 300 * time.Second } now := time.Now() closed := 0 e.mu.Lock() for key, session := range e.collaborations { if session == nil { delete(e.collaborations, key) continue } if session.State == collaborationStateTakeover && now.Sub(session.LastCustomerMessageAt) >= idle { delete(e.collaborations, key) closed++ } } e.mu.Unlock() if closed > 0 { e.noteReason("collaboration_takeover_idle_closed") } } func (e *AutoReplyEngine) finishCollaborationSession(key string) { e.mu.Lock() delete(e.collaborations, key) e.mu.Unlock() } func (e *AutoReplyEngine) collaborationCountsLocked() (int, int) { waiting := 0 takeover := 0 for _, session := range e.collaborations { if session == nil { continue } switch session.State { case collaborationStateWaitingHuman, collaborationStateReviewing: waiting++ case collaborationStateTakeover: takeover++ } } return waiting, takeover } func (e *AutoReplyEngine) isCollaborationTakeoverMessage(msg autoReplyMessage) bool { cfg := e.getConfig() if !cfg.Collaboration.Enabled { return false } key := e.collaborationKeyForMessage(msg) e.mu.Lock() defer e.mu.Unlock() session := e.collaborations[key] return session != nil && session.State == collaborationStateTakeover } func (e *AutoReplyEngine) collaborationKeyForMessage(msg autoReplyMessage) string { return e.contextKeyForMessage(msg) } func (e *AutoReplyEngine) enqueueCollaborationRetry(job AutoReplyJob, msg autoReplyMessage, failureReason string) { select { case e.queue <- job: default: e.setLastErrorWithScope(autoReplyErrorScopeRecords, failureReason) e.addRecord(AutoReplyRecord{ RobotID: msg.RobotID, ClientID: msg.ClientID, UserID: msg.RobotID, ConversationID: msg.ConversationID, Source: msg.sourceLabel(), FromWxID: msg.FromWxID, FromNickName: msg.FromNickName, Question: msg.Content, Action: "failed", Reason: failureReason, SenderIdentity: msg.SenderIdentity, IdentitySource: msg.IdentitySource, }) } } func (e *AutoReplyEngine) collaborationRetryReason(job AutoReplyJob, fallback string) string { reason := strings.TrimSpace(job.SupplementReason) if reason == "" { return fallback } return fmt.Sprintf("%s:%s", fallback, reason) }