Files
qiweimanager-master/helper/auto_reply_collaboration.go

343 lines
10 KiB
Go

package main
import (
"fmt"
"strings"
"time"
)
const (
collaborationStateWaitingHuman = "waiting_human"
collaborationStateReviewing = "reviewing_human"
collaborationStateTakeover = "ai_takeover"
)
type collaborationSession struct {
Key string
ConversationKey string
State string
Msg autoReplyMessage
RawData map[string]interface{}
ClientID int32
ConversationID string
RobotID string
LastCustomerMessageAt time.Time
LastHumanReplyAt time.Time
LastUpdatedAt time.Time
Generation int64
HumanReplies []string
ReviewScheduled bool
TakeoverStartedAt time.Time
LastTakeoverActivityAt time.Time
}
func (e *AutoReplyEngine) maybeHandleCollaborationCustomer(job AutoReplyJob, msg autoReplyMessage) bool {
cfg := e.getConfig()
if job.SkipCollaboration || !cfg.Collaboration.Enabled {
return false
}
if strings.TrimSpace(msg.ConversationID) == "" || msg.isSelfMessage() || msg.SenderIdentity == senderIdentityInternal {
return false
}
key := e.collaborationKeyForMessage(msg)
now := time.Now()
e.mu.Lock()
if e.collaborations == nil {
e.collaborations = make(map[string]*collaborationSession)
}
session := e.collaborations[key]
if session != nil && session.State == collaborationStateTakeover {
session.Msg = msg
session.RawData = job.RawData
session.LastCustomerMessageAt = now
session.LastTakeoverActivityAt = now
session.LastUpdatedAt = now
e.mu.Unlock()
e.noteReason("collaboration_takeover_instant_reply")
return false
}
if session == nil {
session = &collaborationSession{
Key: key,
ConversationKey: e.humanAssistConversationKey(msg),
ClientID: msg.ClientID,
ConversationID: msg.ConversationID,
RobotID: msg.RobotID,
}
e.collaborations[key] = session
}
session.State = collaborationStateWaitingHuman
session.Msg = msg
session.RawData = job.RawData
session.ClientID = msg.ClientID
session.ConversationID = msg.ConversationID
session.RobotID = msg.RobotID
session.LastCustomerMessageAt = now
session.LastUpdatedAt = now
session.HumanReplies = nil
session.ReviewScheduled = false
session.Generation++
generation := session.Generation
e.mu.Unlock()
e.noteReason("collaboration_waiting_human")
go e.finishCollaborationWaitAfterDelay(key, generation)
return true
}
func (e *AutoReplyEngine) finishCollaborationWaitAfterDelay(key string, generation int64) {
cfg := e.getConfig()
wait := time.Duration(cfg.Collaboration.HumanWaitSeconds) * time.Second
if wait <= 0 {
wait = 180 * time.Second
}
time.Sleep(wait)
var retry AutoReplyJob
var recordMsg autoReplyMessage
shouldTakeover := false
e.mu.Lock()
session := e.collaborations[key]
if session != nil && session.Generation == generation && session.State == collaborationStateWaitingHuman && len(session.HumanReplies) == 0 {
session.State = collaborationStateTakeover
session.TakeoverStartedAt = time.Now()
session.LastTakeoverActivityAt = session.TakeoverStartedAt
session.LastUpdatedAt = session.TakeoverStartedAt
recordMsg = session.Msg
retry = AutoReplyJob{
ClientID: session.Msg.ClientID,
RawData: session.RawData,
ReceivedAt: session.LastCustomerMessageAt,
SkipHumanAssist: true,
SkipCollaboration: true,
ForceNoCooldown: true,
SupplementReason: "collaboration_takeover",
}
shouldTakeover = true
}
e.mu.Unlock()
if !shouldTakeover {
return
}
e.incStatus("collaboration_takeover")
e.noteReason("collaboration_takeover")
e.addRecord(AutoReplyRecord{
RobotID: recordMsg.RobotID,
ClientID: recordMsg.ClientID,
UserID: recordMsg.RobotID,
ConversationID: recordMsg.ConversationID,
Source: recordMsg.sourceLabel(),
FromWxID: recordMsg.FromWxID,
FromNickName: recordMsg.FromNickName,
Question: recordMsg.Content,
Action: "takeover",
Reason: "collaboration_takeover",
SenderIdentity: recordMsg.SenderIdentity,
IdentitySource: recordMsg.IdentitySource,
})
e.enqueueCollaborationRetry(retry, recordMsg, "collaboration_takeover_queue_full")
}
func (e *AutoReplyEngine) observeCollaborationHumanReply(msg autoReplyMessage) bool {
content := strings.TrimSpace(msg.Content)
if content == "" || strings.TrimSpace(msg.ConversationID) == "" {
return false
}
if e.consumeAutoSentMessage(msg) {
return true
}
cfg := e.getConfig()
if !cfg.Collaboration.Enabled {
return false
}
if len([]rune(content)) < cfg.HumanAssist.MinimumHumanReplyLengthRunes {
return false
}
conversationKey := e.humanAssistConversationKey(msg)
type reviewTarget struct {
Key string
Generation int64
}
targets := make([]reviewTarget, 0, 1)
e.mu.Lock()
for key, session := range e.collaborations {
if session == nil || session.ConversationKey != conversationKey {
continue
}
session.HumanReplies = append(session.HumanReplies, content)
session.LastHumanReplyAt = time.Now()
session.LastUpdatedAt = session.LastHumanReplyAt
if session.State == collaborationStateTakeover {
session.State = collaborationStateReviewing
} else if session.State == collaborationStateWaitingHuman {
session.State = collaborationStateReviewing
}
session.Generation++
if !session.ReviewScheduled {
session.ReviewScheduled = true
targets = append(targets, reviewTarget{Key: key, Generation: session.Generation})
}
e.status.HumanAssistObservedCount++
}
e.mu.Unlock()
for _, target := range targets {
go e.reviewCollaborationHumanReplyAfterDelay(target.Key, target.Generation)
}
return len(targets) > 0
}
func (e *AutoReplyEngine) reviewCollaborationHumanReplyAfterDelay(key string, generation int64) {
cfg := e.getConfig()
delay := time.Duration(cfg.Collaboration.AfterHumanReplyDelaySeconds) * time.Second
if delay > 0 {
time.Sleep(delay)
}
var session *collaborationSession
e.mu.Lock()
current := e.collaborations[key]
if current != nil && current.Generation == generation && current.State == collaborationStateReviewing {
copySession := *current
copySession.HumanReplies = append([]string(nil), current.HumanReplies...)
session = &copySession
}
e.mu.Unlock()
if session == nil {
return
}
assessment := e.assessHumanReply(session.Msg, session.HumanReplies)
if assessment.Decision == "sufficient" {
e.finishCollaborationSession(key)
e.incStatus("ignored")
e.noteReason("collaboration_human_sufficient")
e.addRecord(AutoReplyRecord{
RobotID: session.Msg.RobotID,
ClientID: session.Msg.ClientID,
UserID: session.Msg.RobotID,
ConversationID: session.Msg.ConversationID,
Source: session.Msg.sourceLabel(),
FromWxID: session.Msg.FromWxID,
FromNickName: session.Msg.FromNickName,
Question: session.Msg.Content,
Action: "ignored",
Reason: "collaboration_human_sufficient: " + assessment.Reason,
Answer: strings.Join(session.HumanReplies, "\n"),
SenderIdentity: session.Msg.SenderIdentity,
IdentitySource: session.Msg.IdentitySource,
})
return
}
e.noteReason("collaboration_human_need_supplement")
e.incStatus("collaboration_supplemented")
e.finishCollaborationSession(key)
retry := AutoReplyJob{
ClientID: session.Msg.ClientID,
RawData: session.RawData,
ReceivedAt: session.LastCustomerMessageAt,
SkipHumanAssist: true,
SkipCollaboration: true,
ForceNoCooldown: true,
SupplementReason: "collaboration_supplemented",
}
e.enqueueCollaborationRetry(retry, session.Msg, "collaboration_supplement_queue_full")
}
func (e *AutoReplyEngine) collaborationSweepLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
e.cleanupIdleCollaborations()
}
}
func (e *AutoReplyEngine) cleanupIdleCollaborations() {
cfg := e.getConfig()
idle := time.Duration(cfg.Collaboration.TakeoverIdleExitSeconds) * time.Second
if idle <= 0 {
idle = 300 * time.Second
}
now := time.Now()
closed := 0
e.mu.Lock()
for key, session := range e.collaborations {
if session == nil {
delete(e.collaborations, key)
continue
}
if session.State == collaborationStateTakeover && now.Sub(session.LastCustomerMessageAt) >= idle {
delete(e.collaborations, key)
closed++
}
}
e.mu.Unlock()
if closed > 0 {
e.noteReason("collaboration_takeover_idle_closed")
}
}
func (e *AutoReplyEngine) finishCollaborationSession(key string) {
e.mu.Lock()
delete(e.collaborations, key)
e.mu.Unlock()
}
func (e *AutoReplyEngine) collaborationCountsLocked() (int, int) {
waiting := 0
takeover := 0
for _, session := range e.collaborations {
if session == nil {
continue
}
switch session.State {
case collaborationStateWaitingHuman, collaborationStateReviewing:
waiting++
case collaborationStateTakeover:
takeover++
}
}
return waiting, takeover
}
func (e *AutoReplyEngine) isCollaborationTakeoverMessage(msg autoReplyMessage) bool {
cfg := e.getConfig()
if !cfg.Collaboration.Enabled {
return false
}
key := e.collaborationKeyForMessage(msg)
e.mu.Lock()
defer e.mu.Unlock()
session := e.collaborations[key]
return session != nil && session.State == collaborationStateTakeover
}
func (e *AutoReplyEngine) collaborationKeyForMessage(msg autoReplyMessage) string {
return e.contextKeyForMessage(msg)
}
func (e *AutoReplyEngine) enqueueCollaborationRetry(job AutoReplyJob, msg autoReplyMessage, failureReason string) {
select {
case e.queue <- job:
default:
e.setLastErrorWithScope(autoReplyErrorScopeRecords, failureReason)
e.addRecord(AutoReplyRecord{
RobotID: msg.RobotID,
ClientID: msg.ClientID,
UserID: msg.RobotID,
ConversationID: msg.ConversationID,
Source: msg.sourceLabel(),
FromWxID: msg.FromWxID,
FromNickName: msg.FromNickName,
Question: msg.Content,
Action: "failed",
Reason: failureReason,
SenderIdentity: msg.SenderIdentity,
IdentitySource: msg.IdentitySource,
})
}
}
func (e *AutoReplyEngine) collaborationRetryReason(job AutoReplyJob, fallback string) string {
reason := strings.TrimSpace(job.SupplementReason)
if reason == "" {
return fallback
}
return fmt.Sprintf("%s:%s", fallback, reason)
}