Files
ownwire-go-sdk/client.go

263 lines
4.6 KiB
Go

package ownwire_sdk
import (
"context"
"encoding/json"
"fmt"
"sync"
)
type ClientOptions struct {
Url string
Conn Conn
Handshaker Handshaker
EventsBuffer int
}
type Client struct {
url string
conn Conn
handshaker Handshaker
events_ch chan Event
mu sync.Mutex
state SessionState
ready bool
closed bool
close_ch chan struct{}
read_cancel context.CancelFunc
}
type conn_closer interface {
Close() error
}
func NewClient(opts ClientOptions) *Client {
events_buffer := opts.EventsBuffer
if events_buffer == 0 {
events_buffer = 64
}
handshaker := opts.Handshaker.EnsureDefaults()
return &Client{
url: opts.Url,
conn: opts.Conn,
handshaker: handshaker,
events_ch: make(chan Event, events_buffer),
close_ch: make(chan struct{}),
}
}
func (c *Client) Events() <-chan Event {
return c.events_ch
}
func (c *Client) Connect(ctx context.Context, resume_session_id string) error {
c.mu.Lock()
if c.ready {
c.mu.Unlock()
return nil
}
c.mu.Unlock()
conn := c.conn
if conn == nil {
if c.url == "" {
return fmt.Errorf("no Url configured and no Conn provided")
}
ws_conn, err := DialWs(ctx, c.url)
if err != nil {
return err
}
conn = ws_conn
c.mu.Lock()
c.conn = conn
c.mu.Unlock()
}
state, pending, err := c.handshaker.Run(ctx, conn, resume_session_id)
if err != nil {
return err
}
read_ctx, read_cancel := context.WithCancel(context.Background())
c.mu.Lock()
c.state = state
c.ready = true
c.read_cancel = read_cancel
c.mu.Unlock()
c.emit(Event{Kind: EventOpened})
go c.read_loop(read_ctx, pending)
return nil
}
func (c *Client) Close() {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return
}
c.closed = true
read_cancel := c.read_cancel
conn := c.conn
close(c.close_ch)
c.mu.Unlock()
if read_cancel != nil {
read_cancel()
}
if closer, ok := conn.(conn_closer); ok {
_ = closer.Close()
}
c.emit(Event{Kind: EventClosed})
}
func (c *Client) Send(ctx context.Context, content string, metadata string) error {
c.mu.Lock()
if !c.ready {
c.mu.Unlock()
return fmt.Errorf("client not connected")
}
c.state.SeqOut++
seq_num := c.state.SeqOut
shared_key := c.state.SharedKey
session_id_bytes := c.state.SessionIdBytes
conn := c.conn
c.mu.Unlock()
payload := outgoing_frame{
Content: content,
Metadata: metadata,
SeqNum: seq_num,
IsEncrypted: true,
Salt: "",
}
enc, err := EncryptAESGCM(shared_key, session_id_bytes, []byte(content), seq_num, false)
if err != nil {
return err
}
payload.Content = enc.ContentB64
payload.Salt = enc.SaltHex
buf, err := json.Marshal(payload)
if err != nil {
return err
}
return conn.WriteText(ctx, string(buf))
}
func (c *Client) read_loop(ctx context.Context, pending []string) {
for _, s := range pending {
c.handle_incoming_text(s)
}
for {
select {
case <-c.close_ch:
return
default:
}
s, err := c.conn.ReadText(ctx)
if err != nil {
c.emit(Event{Kind: EventError, Err: err})
return
}
c.handle_incoming_text(s)
}
}
func (c *Client) handle_incoming_text(s string) {
if len(s) > 0 && s[0] == '/' {
// Ignore unknown commands after handshake for now.
return
}
var in incoming_frame
if err := json.Unmarshal([]byte(s), &in); err != nil {
return
}
c.mu.Lock()
if !c.ready {
c.mu.Unlock()
return
}
shared_key := c.state.SharedKey
session_id_bytes := c.state.SessionIdBytes
c.mu.Unlock()
content := in.Content
if in.IsEncrypted {
plain, err := DecryptAESGCM(shared_key, session_id_bytes, in.Content, in.Salt, in.SeqNum, in.IsResponse)
if err != nil {
c.emit(Event{Kind: EventError, Err: err})
return
}
content = string(plain)
}
c.mu.Lock()
if in.SeqNum > c.state.SeqInMax {
c.state.SeqInMax = in.SeqNum
}
c.mu.Unlock()
c.emit(Event{
Kind: EventMessage,
Message: Message{
Content: content,
Metadata: in.Metadata,
SeqNum: in.SeqNum,
IsResponse: in.IsResponse,
CreatedAt: in.CreatedAt,
},
})
}
func (c *Client) emit(ev Event) {
select {
case c.events_ch <- ev:
default:
// Drop if user isn't consuming.
}
}
type outgoing_frame struct {
Content string `json:"content"`
Metadata string `json:"metadata"`
SeqNum uint64 `json:"seq_num"`
IsEncrypted bool `json:"is_encrypted"`
Salt string `json:"salt"`
}
type incoming_frame struct {
Content string `json:"content"`
Metadata string `json:"metadata"`
SeqNum uint64 `json:"seq_num"`
IsEncrypted bool `json:"is_encrypted"`
IsResponse bool `json:"is_response"`
Salt string `json:"salt"`
CreatedAt int64 `json:"created_at"`
}