package session import ( "errors" "io" "os/exec" "sync" "time" "supervisor/internal/domain" "supervisor/internal/util" ) type StateChangeFn func(domain.Session) type Session struct { mu sync.RWMutex meta domain.Session process PTYProcess subscribers map[string]chan domain.Event scrollback []byte scrollbackSize int onStateChange StateChangeFn } func NewSession(meta domain.Session, scrollbackSize int, onStateChange StateChangeFn) *Session { if scrollbackSize <= 0 { scrollbackSize = 256 * 1024 } return &Session{ meta: meta, subscribers: make(map[string]chan domain.Event), scrollbackSize: scrollbackSize, onStateChange: onStateChange, } } func (s *Session) Snapshot() domain.Session { s.mu.RLock() defer s.mu.RUnlock() return s.meta } func (s *Session) Scrollback() []byte { s.mu.RLock() defer s.mu.RUnlock() cpy := make([]byte, len(s.scrollback)) copy(cpy, s.scrollback) return cpy } func (s *Session) Start(factory PTYFactory) error { s.mu.Lock() if s.meta.Status == domain.SessionStatusRunning { s.mu.Unlock() return nil } if s.meta.Command == "" { s.mu.Unlock() return errors.New("empty command") } proc, err := factory.Start(s.meta.Command) if err != nil { now := time.Now().UTC() s.meta.Status = domain.SessionStatusError s.meta.ExitedAt = &now s.mu.Unlock() s.emitStateChange() s.publish(domain.Event{ Type: domain.EventError, SessionID: s.meta.ID, Payload: domain.ErrorEvent{ Message: err.Error(), }, At: time.Now().UTC(), }) return err } now := time.Now().UTC() s.process = proc s.meta.Status = domain.SessionStatusRunning s.meta.StartedAt = &now s.meta.ExitedAt = nil s.meta.ExitCode = nil s.mu.Unlock() s.emitStateChange() s.publishStatus() go s.readLoop(proc) go s.waitLoop(proc) return nil } func (s *Session) Stop() error { s.mu.Lock() if s.process == nil { s.mu.Unlock() return nil } proc := s.process if s.meta.Status == domain.SessionStatusRunning { s.meta.Status = domain.SessionStatusStopped } s.mu.Unlock() s.emitStateChange() s.publishStatus() return proc.SignalStop() } func (s *Session) WriteInput(input string) error { s.mu.RLock() proc := s.process status := s.meta.Status s.mu.RUnlock() if proc == nil || status != domain.SessionStatusRunning { return errors.New("session is not running") } _, err := proc.Write([]byte(input)) return err } func (s *Session) Resize(cols, rows int) error { if cols <= 0 || rows <= 0 { return errors.New("invalid terminal size") } s.mu.RLock() proc := s.process s.mu.RUnlock() if proc == nil { return errors.New("session has no process") } return proc.Resize(uint16(cols), uint16(rows)) } func (s *Session) Subscribe() (string, <-chan domain.Event, func()) { id := util.NewID("sub") ch := make(chan domain.Event, 128) s.mu.Lock() s.subscribers[id] = ch s.mu.Unlock() cancel := func() { s.mu.Lock() sub, ok := s.subscribers[id] if ok { delete(s.subscribers, id) close(sub) } s.mu.Unlock() } return id, ch, cancel } func (s *Session) readLoop(proc PTYProcess) { buf := make([]byte, 4096) for { n, err := proc.Read(buf) if n > 0 { chunk := append([]byte(nil), buf[:n]...) s.appendScrollback(chunk) s.publish(domain.Event{ Type: domain.EventTerminalOutput, SessionID: s.Snapshot().ID, Payload: domain.TerminalOutputEvent{ Data: string(chunk), }, At: time.Now().UTC(), }) } if err != nil { if !errors.Is(err, io.EOF) { s.publish(domain.Event{ Type: domain.EventError, SessionID: s.Snapshot().ID, Payload: domain.ErrorEvent{Message: err.Error()}, At: time.Now().UTC(), }) } return } } } func (s *Session) waitLoop(proc PTYProcess) { err := proc.Wait() _ = proc.Close() var exitCode *int if err != nil { if exitErr, ok := err.(*exec.ExitError); ok { code := exitErr.ExitCode() exitCode = &code } } if err == nil { code := 0 exitCode = &code } now := time.Now().UTC() s.mu.Lock() if s.meta.Status != domain.SessionStatusError { s.meta.Status = domain.SessionStatusExited } s.meta.ExitedAt = &now s.meta.ExitCode = exitCode s.process = nil s.mu.Unlock() s.emitStateChange() s.publishStatus() if err != nil { s.publish(domain.Event{ Type: domain.EventError, SessionID: s.Snapshot().ID, Payload: domain.ErrorEvent{Message: err.Error()}, At: time.Now().UTC(), }) } } func (s *Session) appendScrollback(data []byte) { s.mu.Lock() defer s.mu.Unlock() if len(data) >= s.scrollbackSize { s.scrollback = append([]byte(nil), data[len(data)-s.scrollbackSize:]...) return } s.scrollback = append(s.scrollback, data...) if len(s.scrollback) > s.scrollbackSize { extra := len(s.scrollback) - s.scrollbackSize s.scrollback = append([]byte(nil), s.scrollback[extra:]...) } } func (s *Session) publishStatus() { snap := s.Snapshot() s.publish(domain.Event{ Type: domain.EventSessionStatus, SessionID: snap.ID, Payload: domain.SessionStatusEvent{ Status: snap.Status, ExitCode: snap.ExitCode, }, At: time.Now().UTC(), }) } func (s *Session) publish(event domain.Event) { s.mu.RLock() subs := make([]chan domain.Event, 0, len(s.subscribers)) for _, ch := range s.subscribers { subs = append(subs, ch) } s.mu.RUnlock() for _, ch := range subs { select { case ch <- event: default: } } } func (s *Session) emitStateChange() { if s.onStateChange == nil { return } s.onStateChange(s.Snapshot()) }