feat: add supervisor prototype with embedded frontend
This commit is contained in:
273
internal/session/session.go
Normal file
273
internal/session/session.go
Normal file
@ -0,0 +1,273 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"supervisor/internal/domain"
|
||||
"supervisor/internal/util"
|
||||
)
|
||||
|
||||
type StateChangeFn func(domain.Session)
|
||||
|
||||
type Session struct {
|
||||
mu sync.RWMutex
|
||||
meta domain.Session
|
||||
process PTYProcess
|
||||
subscribers map[string]chan domain.Event
|
||||
scrollback []byte
|
||||
scrollbackSize int
|
||||
onStateChange StateChangeFn
|
||||
}
|
||||
|
||||
func NewSession(meta domain.Session, scrollbackSize int, onStateChange StateChangeFn) *Session {
|
||||
if scrollbackSize <= 0 {
|
||||
scrollbackSize = 256 * 1024
|
||||
}
|
||||
return &Session{
|
||||
meta: meta,
|
||||
subscribers: make(map[string]chan domain.Event),
|
||||
scrollbackSize: scrollbackSize,
|
||||
onStateChange: onStateChange,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) Snapshot() domain.Session {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.meta
|
||||
}
|
||||
|
||||
func (s *Session) Scrollback() []byte {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
cpy := make([]byte, len(s.scrollback))
|
||||
copy(cpy, s.scrollback)
|
||||
return cpy
|
||||
}
|
||||
|
||||
func (s *Session) Start(factory PTYFactory) error {
|
||||
s.mu.Lock()
|
||||
if s.meta.Status == domain.SessionStatusRunning {
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
if s.meta.Command == "" {
|
||||
s.mu.Unlock()
|
||||
return errors.New("empty command")
|
||||
}
|
||||
proc, err := factory.Start(s.meta.Command)
|
||||
if err != nil {
|
||||
now := time.Now().UTC()
|
||||
s.meta.Status = domain.SessionStatusError
|
||||
s.meta.ExitedAt = &now
|
||||
s.mu.Unlock()
|
||||
s.emitStateChange()
|
||||
s.publish(domain.Event{
|
||||
Type: domain.EventError,
|
||||
SessionID: s.meta.ID,
|
||||
Payload: domain.ErrorEvent{
|
||||
Message: err.Error(),
|
||||
},
|
||||
At: time.Now().UTC(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
s.process = proc
|
||||
s.meta.Status = domain.SessionStatusRunning
|
||||
s.meta.StartedAt = &now
|
||||
s.meta.ExitedAt = nil
|
||||
s.meta.ExitCode = nil
|
||||
s.mu.Unlock()
|
||||
|
||||
s.emitStateChange()
|
||||
s.publishStatus()
|
||||
|
||||
go s.readLoop(proc)
|
||||
go s.waitLoop(proc)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Session) Stop() error {
|
||||
s.mu.Lock()
|
||||
if s.process == nil {
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
proc := s.process
|
||||
if s.meta.Status == domain.SessionStatusRunning {
|
||||
s.meta.Status = domain.SessionStatusStopped
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
s.emitStateChange()
|
||||
s.publishStatus()
|
||||
return proc.SignalStop()
|
||||
}
|
||||
|
||||
func (s *Session) WriteInput(input string) error {
|
||||
s.mu.RLock()
|
||||
proc := s.process
|
||||
status := s.meta.Status
|
||||
s.mu.RUnlock()
|
||||
|
||||
if proc == nil || status != domain.SessionStatusRunning {
|
||||
return errors.New("session is not running")
|
||||
}
|
||||
_, err := proc.Write([]byte(input))
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Session) Resize(cols, rows int) error {
|
||||
if cols <= 0 || rows <= 0 {
|
||||
return errors.New("invalid terminal size")
|
||||
}
|
||||
s.mu.RLock()
|
||||
proc := s.process
|
||||
s.mu.RUnlock()
|
||||
if proc == nil {
|
||||
return errors.New("session has no process")
|
||||
}
|
||||
return proc.Resize(uint16(cols), uint16(rows))
|
||||
}
|
||||
|
||||
func (s *Session) Subscribe() (string, <-chan domain.Event, func()) {
|
||||
id := util.NewID("sub")
|
||||
ch := make(chan domain.Event, 128)
|
||||
s.mu.Lock()
|
||||
s.subscribers[id] = ch
|
||||
s.mu.Unlock()
|
||||
cancel := func() {
|
||||
s.mu.Lock()
|
||||
sub, ok := s.subscribers[id]
|
||||
if ok {
|
||||
delete(s.subscribers, id)
|
||||
close(sub)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
return id, ch, cancel
|
||||
}
|
||||
|
||||
func (s *Session) readLoop(proc PTYProcess) {
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := proc.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := append([]byte(nil), buf[:n]...)
|
||||
s.appendScrollback(chunk)
|
||||
s.publish(domain.Event{
|
||||
Type: domain.EventTerminalOutput,
|
||||
SessionID: s.Snapshot().ID,
|
||||
Payload: domain.TerminalOutputEvent{
|
||||
Data: string(chunk),
|
||||
},
|
||||
At: time.Now().UTC(),
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) {
|
||||
s.publish(domain.Event{
|
||||
Type: domain.EventError,
|
||||
SessionID: s.Snapshot().ID,
|
||||
Payload: domain.ErrorEvent{Message: err.Error()},
|
||||
At: time.Now().UTC(),
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) waitLoop(proc PTYProcess) {
|
||||
err := proc.Wait()
|
||||
_ = proc.Close()
|
||||
|
||||
var exitCode *int
|
||||
if err != nil {
|
||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
||||
code := exitErr.ExitCode()
|
||||
exitCode = &code
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
code := 0
|
||||
exitCode = &code
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
s.mu.Lock()
|
||||
if s.meta.Status != domain.SessionStatusError {
|
||||
s.meta.Status = domain.SessionStatusExited
|
||||
}
|
||||
s.meta.ExitedAt = &now
|
||||
s.meta.ExitCode = exitCode
|
||||
s.process = nil
|
||||
s.mu.Unlock()
|
||||
|
||||
s.emitStateChange()
|
||||
s.publishStatus()
|
||||
|
||||
if err != nil {
|
||||
s.publish(domain.Event{
|
||||
Type: domain.EventError,
|
||||
SessionID: s.Snapshot().ID,
|
||||
Payload: domain.ErrorEvent{Message: err.Error()},
|
||||
At: time.Now().UTC(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) appendScrollback(data []byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if len(data) >= s.scrollbackSize {
|
||||
s.scrollback = append([]byte(nil), data[len(data)-s.scrollbackSize:]...)
|
||||
return
|
||||
}
|
||||
s.scrollback = append(s.scrollback, data...)
|
||||
if len(s.scrollback) > s.scrollbackSize {
|
||||
extra := len(s.scrollback) - s.scrollbackSize
|
||||
s.scrollback = append([]byte(nil), s.scrollback[extra:]...)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) publishStatus() {
|
||||
snap := s.Snapshot()
|
||||
s.publish(domain.Event{
|
||||
Type: domain.EventSessionStatus,
|
||||
SessionID: snap.ID,
|
||||
Payload: domain.SessionStatusEvent{
|
||||
Status: snap.Status,
|
||||
ExitCode: snap.ExitCode,
|
||||
},
|
||||
At: time.Now().UTC(),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Session) publish(event domain.Event) {
|
||||
s.mu.RLock()
|
||||
subs := make([]chan domain.Event, 0, len(s.subscribers))
|
||||
for _, ch := range s.subscribers {
|
||||
subs = append(subs, ch)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
for _, ch := range subs {
|
||||
select {
|
||||
case ch <- event:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) emitStateChange() {
|
||||
if s.onStateChange == nil {
|
||||
return
|
||||
}
|
||||
s.onStateChange(s.Snapshot())
|
||||
}
|
||||
Reference in New Issue
Block a user