package ownwire_sdk import ( "context" "encoding/json" "fmt" "sync" ) type ClientOptions struct { Url string Conn Conn Handshaker Handshaker EventsBuffer int } type Client struct { url string conn Conn handshaker Handshaker events_ch chan Event mu sync.Mutex state SessionState ready bool closed bool close_ch chan struct{} read_cancel context.CancelFunc } type conn_closer interface { Close() error } func NewClient(opts ClientOptions) *Client { events_buffer := opts.EventsBuffer if events_buffer == 0 { events_buffer = 64 } handshaker := opts.Handshaker.EnsureDefaults() return &Client{ url: opts.Url, conn: opts.Conn, handshaker: handshaker, events_ch: make(chan Event, events_buffer), close_ch: make(chan struct{}), } } func (c *Client) Events() <-chan Event { return c.events_ch } func (c *Client) Connect(ctx context.Context, resume_session_id string) error { c.mu.Lock() if c.ready { c.mu.Unlock() return nil } c.mu.Unlock() conn := c.conn if conn == nil { if c.url == "" { return fmt.Errorf("no Url configured and no Conn provided") } ws_conn, err := DialWs(ctx, c.url) if err != nil { return err } conn = ws_conn c.mu.Lock() c.conn = conn c.mu.Unlock() } state, pending, err := c.handshaker.Run(ctx, conn, resume_session_id) if err != nil { return err } read_ctx, read_cancel := context.WithCancel(context.Background()) c.mu.Lock() c.state = state c.ready = true c.read_cancel = read_cancel c.mu.Unlock() c.emit(Event{Kind: EventOpened}) go c.read_loop(read_ctx, pending) return nil } func (c *Client) Close() { c.mu.Lock() if c.closed { c.mu.Unlock() return } c.closed = true read_cancel := c.read_cancel conn := c.conn close(c.close_ch) c.mu.Unlock() if read_cancel != nil { read_cancel() } if closer, ok := conn.(conn_closer); ok { _ = closer.Close() } c.emit(Event{Kind: EventClosed}) } func (c *Client) Send(ctx context.Context, content string, metadata string) error { c.mu.Lock() if !c.ready { c.mu.Unlock() return fmt.Errorf("client not connected") } c.state.SeqOut++ seq_num := c.state.SeqOut shared_key := c.state.SharedKey session_id_bytes := c.state.SessionIdBytes conn := c.conn c.mu.Unlock() payload := outgoing_frame{ Content: content, Metadata: metadata, SeqNum: seq_num, IsEncrypted: true, Salt: "", } enc, err := EncryptAESGCM(shared_key, session_id_bytes, []byte(content), seq_num, false) if err != nil { return err } payload.Content = enc.ContentB64 payload.Salt = enc.SaltHex buf, err := json.Marshal(payload) if err != nil { return err } return conn.WriteText(ctx, string(buf)) } func (c *Client) read_loop(ctx context.Context, pending []string) { for _, s := range pending { c.handle_incoming_text(s) } for { select { case <-c.close_ch: return default: } s, err := c.conn.ReadText(ctx) if err != nil { c.emit(Event{Kind: EventError, Err: err}) return } c.handle_incoming_text(s) } } func (c *Client) handle_incoming_text(s string) { if len(s) > 0 && s[0] == '/' { // Ignore unknown commands after handshake for now. return } var in incoming_frame if err := json.Unmarshal([]byte(s), &in); err != nil { return } c.mu.Lock() if !c.ready { c.mu.Unlock() return } shared_key := c.state.SharedKey session_id_bytes := c.state.SessionIdBytes c.mu.Unlock() content := in.Content if in.IsEncrypted { plain, err := DecryptAESGCM(shared_key, session_id_bytes, in.Content, in.Salt, in.SeqNum, in.IsResponse) if err != nil { c.emit(Event{Kind: EventError, Err: err}) return } content = string(plain) } c.mu.Lock() if in.SeqNum > c.state.SeqInMax { c.state.SeqInMax = in.SeqNum } c.mu.Unlock() c.emit(Event{ Kind: EventMessage, Message: Message{ Content: content, Metadata: in.Metadata, SeqNum: in.SeqNum, IsResponse: in.IsResponse, CreatedAt: in.CreatedAt, }, }) } func (c *Client) emit(ev Event) { select { case c.events_ch <- ev: default: // Drop if user isn't consuming. } } type outgoing_frame struct { Content string `json:"content"` Metadata string `json:"metadata"` SeqNum uint64 `json:"seq_num"` IsEncrypted bool `json:"is_encrypted"` Salt string `json:"salt"` } type incoming_frame struct { Content string `json:"content"` Metadata string `json:"metadata"` SeqNum uint64 `json:"seq_num"` IsEncrypted bool `json:"is_encrypted"` IsResponse bool `json:"is_response"` Salt string `json:"salt"` CreatedAt int64 `json:"created_at"` }