274 lines
5.5 KiB
Go
274 lines
5.5 KiB
Go
package session
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"os/exec"
|
|
"sync"
|
|
"time"
|
|
|
|
"supervisor/internal/domain"
|
|
"supervisor/internal/util"
|
|
)
|
|
|
|
type StateChangeFn func(domain.Session)
|
|
|
|
type Session struct {
|
|
mu sync.RWMutex
|
|
meta domain.Session
|
|
process PTYProcess
|
|
subscribers map[string]chan domain.Event
|
|
scrollback []byte
|
|
scrollbackSize int
|
|
onStateChange StateChangeFn
|
|
}
|
|
|
|
func NewSession(meta domain.Session, scrollbackSize int, onStateChange StateChangeFn) *Session {
|
|
if scrollbackSize <= 0 {
|
|
scrollbackSize = 256 * 1024
|
|
}
|
|
return &Session{
|
|
meta: meta,
|
|
subscribers: make(map[string]chan domain.Event),
|
|
scrollbackSize: scrollbackSize,
|
|
onStateChange: onStateChange,
|
|
}
|
|
}
|
|
|
|
func (s *Session) Snapshot() domain.Session {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.meta
|
|
}
|
|
|
|
func (s *Session) Scrollback() []byte {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
cpy := make([]byte, len(s.scrollback))
|
|
copy(cpy, s.scrollback)
|
|
return cpy
|
|
}
|
|
|
|
func (s *Session) Start(factory PTYFactory) error {
|
|
s.mu.Lock()
|
|
if s.meta.Status == domain.SessionStatusRunning {
|
|
s.mu.Unlock()
|
|
return nil
|
|
}
|
|
if s.meta.Command == "" {
|
|
s.mu.Unlock()
|
|
return errors.New("empty command")
|
|
}
|
|
proc, err := factory.Start(s.meta.Command)
|
|
if err != nil {
|
|
now := time.Now().UTC()
|
|
s.meta.Status = domain.SessionStatusError
|
|
s.meta.ExitedAt = &now
|
|
s.mu.Unlock()
|
|
s.emitStateChange()
|
|
s.publish(domain.Event{
|
|
Type: domain.EventError,
|
|
SessionID: s.meta.ID,
|
|
Payload: domain.ErrorEvent{
|
|
Message: err.Error(),
|
|
},
|
|
At: time.Now().UTC(),
|
|
})
|
|
return err
|
|
}
|
|
now := time.Now().UTC()
|
|
s.process = proc
|
|
s.meta.Status = domain.SessionStatusRunning
|
|
s.meta.StartedAt = &now
|
|
s.meta.ExitedAt = nil
|
|
s.meta.ExitCode = nil
|
|
s.mu.Unlock()
|
|
|
|
s.emitStateChange()
|
|
s.publishStatus()
|
|
|
|
go s.readLoop(proc)
|
|
go s.waitLoop(proc)
|
|
return nil
|
|
}
|
|
|
|
func (s *Session) Stop() error {
|
|
s.mu.Lock()
|
|
if s.process == nil {
|
|
s.mu.Unlock()
|
|
return nil
|
|
}
|
|
proc := s.process
|
|
if s.meta.Status == domain.SessionStatusRunning {
|
|
s.meta.Status = domain.SessionStatusStopped
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
s.emitStateChange()
|
|
s.publishStatus()
|
|
return proc.SignalStop()
|
|
}
|
|
|
|
func (s *Session) WriteInput(input string) error {
|
|
s.mu.RLock()
|
|
proc := s.process
|
|
status := s.meta.Status
|
|
s.mu.RUnlock()
|
|
|
|
if proc == nil || status != domain.SessionStatusRunning {
|
|
return errors.New("session is not running")
|
|
}
|
|
_, err := proc.Write([]byte(input))
|
|
return err
|
|
}
|
|
|
|
func (s *Session) Resize(cols, rows int) error {
|
|
if cols <= 0 || rows <= 0 {
|
|
return errors.New("invalid terminal size")
|
|
}
|
|
s.mu.RLock()
|
|
proc := s.process
|
|
s.mu.RUnlock()
|
|
if proc == nil {
|
|
return errors.New("session has no process")
|
|
}
|
|
return proc.Resize(uint16(cols), uint16(rows))
|
|
}
|
|
|
|
func (s *Session) Subscribe() (string, <-chan domain.Event, func()) {
|
|
id := util.NewID("sub")
|
|
ch := make(chan domain.Event, 128)
|
|
s.mu.Lock()
|
|
s.subscribers[id] = ch
|
|
s.mu.Unlock()
|
|
cancel := func() {
|
|
s.mu.Lock()
|
|
sub, ok := s.subscribers[id]
|
|
if ok {
|
|
delete(s.subscribers, id)
|
|
close(sub)
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
return id, ch, cancel
|
|
}
|
|
|
|
func (s *Session) readLoop(proc PTYProcess) {
|
|
buf := make([]byte, 4096)
|
|
for {
|
|
n, err := proc.Read(buf)
|
|
if n > 0 {
|
|
chunk := append([]byte(nil), buf[:n]...)
|
|
s.appendScrollback(chunk)
|
|
s.publish(domain.Event{
|
|
Type: domain.EventTerminalOutput,
|
|
SessionID: s.Snapshot().ID,
|
|
Payload: domain.TerminalOutputEvent{
|
|
Data: string(chunk),
|
|
},
|
|
At: time.Now().UTC(),
|
|
})
|
|
}
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) {
|
|
s.publish(domain.Event{
|
|
Type: domain.EventError,
|
|
SessionID: s.Snapshot().ID,
|
|
Payload: domain.ErrorEvent{Message: err.Error()},
|
|
At: time.Now().UTC(),
|
|
})
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Session) waitLoop(proc PTYProcess) {
|
|
err := proc.Wait()
|
|
_ = proc.Close()
|
|
|
|
var exitCode *int
|
|
if err != nil {
|
|
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
code := exitErr.ExitCode()
|
|
exitCode = &code
|
|
}
|
|
}
|
|
if err == nil {
|
|
code := 0
|
|
exitCode = &code
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
s.mu.Lock()
|
|
if s.meta.Status != domain.SessionStatusError {
|
|
s.meta.Status = domain.SessionStatusExited
|
|
}
|
|
s.meta.ExitedAt = &now
|
|
s.meta.ExitCode = exitCode
|
|
s.process = nil
|
|
s.mu.Unlock()
|
|
|
|
s.emitStateChange()
|
|
s.publishStatus()
|
|
|
|
if err != nil {
|
|
s.publish(domain.Event{
|
|
Type: domain.EventError,
|
|
SessionID: s.Snapshot().ID,
|
|
Payload: domain.ErrorEvent{Message: err.Error()},
|
|
At: time.Now().UTC(),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *Session) appendScrollback(data []byte) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if len(data) >= s.scrollbackSize {
|
|
s.scrollback = append([]byte(nil), data[len(data)-s.scrollbackSize:]...)
|
|
return
|
|
}
|
|
s.scrollback = append(s.scrollback, data...)
|
|
if len(s.scrollback) > s.scrollbackSize {
|
|
extra := len(s.scrollback) - s.scrollbackSize
|
|
s.scrollback = append([]byte(nil), s.scrollback[extra:]...)
|
|
}
|
|
}
|
|
|
|
func (s *Session) publishStatus() {
|
|
snap := s.Snapshot()
|
|
s.publish(domain.Event{
|
|
Type: domain.EventSessionStatus,
|
|
SessionID: snap.ID,
|
|
Payload: domain.SessionStatusEvent{
|
|
Status: snap.Status,
|
|
ExitCode: snap.ExitCode,
|
|
},
|
|
At: time.Now().UTC(),
|
|
})
|
|
}
|
|
|
|
func (s *Session) publish(event domain.Event) {
|
|
s.mu.RLock()
|
|
subs := make([]chan domain.Event, 0, len(s.subscribers))
|
|
for _, ch := range s.subscribers {
|
|
subs = append(subs, ch)
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
for _, ch := range subs {
|
|
select {
|
|
case ch <- event:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Session) emitStateChange() {
|
|
if s.onStateChange == nil {
|
|
return
|
|
}
|
|
s.onStateChange(s.Snapshot())
|
|
}
|