Add WebSocket transport and working CLI example + unit tests
This commit is contained in:
66
client.go
66
client.go
@@ -8,22 +8,29 @@ import (
|
||||
)
|
||||
|
||||
type ClientOptions struct {
|
||||
Url string
|
||||
Conn Conn
|
||||
Handshaker Handshaker
|
||||
EventsBuffer int
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
conn Conn
|
||||
url string
|
||||
conn Conn
|
||||
handshaker Handshaker
|
||||
|
||||
events_ch chan Event
|
||||
|
||||
mu sync.Mutex
|
||||
state SessionState
|
||||
ready bool
|
||||
closed bool
|
||||
close_ch chan struct{}
|
||||
mu sync.Mutex
|
||||
state SessionState
|
||||
ready bool
|
||||
closed bool
|
||||
close_ch chan struct{}
|
||||
read_cancel context.CancelFunc
|
||||
}
|
||||
|
||||
type conn_closer interface {
|
||||
Close() error
|
||||
}
|
||||
|
||||
func NewClient(opts ClientOptions) *Client {
|
||||
@@ -35,6 +42,7 @@ func NewClient(opts ClientOptions) *Client {
|
||||
handshaker := opts.Handshaker.EnsureDefaults()
|
||||
|
||||
return &Client{
|
||||
url: opts.Url,
|
||||
conn: opts.Conn,
|
||||
handshaker: handshaker,
|
||||
events_ch: make(chan Event, events_buffer),
|
||||
@@ -47,10 +55,6 @@ func (c *Client) Events() <-chan Event {
|
||||
}
|
||||
|
||||
func (c *Client) Connect(ctx context.Context, resume_session_id string) error {
|
||||
if c.conn == nil {
|
||||
return fmt.Errorf("no conn configured (ws transport not added yet)")
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
if c.ready {
|
||||
c.mu.Unlock()
|
||||
@@ -58,19 +62,39 @@ func (c *Client) Connect(ctx context.Context, resume_session_id string) error {
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
state, pending, err := c.handshaker.Run(ctx, c.conn, resume_session_id)
|
||||
conn := c.conn
|
||||
if conn == nil {
|
||||
if c.url == "" {
|
||||
return fmt.Errorf("no Url configured and no Conn provided")
|
||||
}
|
||||
|
||||
ws_conn, err := DialWs(ctx, c.url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conn = ws_conn
|
||||
|
||||
c.mu.Lock()
|
||||
c.conn = conn
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
state, pending, err := c.handshaker.Run(ctx, conn, resume_session_id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
read_ctx, read_cancel := context.WithCancel(context.Background())
|
||||
|
||||
c.mu.Lock()
|
||||
c.state = state
|
||||
c.ready = true
|
||||
c.read_cancel = read_cancel
|
||||
c.mu.Unlock()
|
||||
|
||||
c.emit(Event{Kind: EventOpened})
|
||||
|
||||
go c.read_loop(pending)
|
||||
go c.read_loop(read_ctx, pending)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -82,9 +106,20 @@ func (c *Client) Close() {
|
||||
return
|
||||
}
|
||||
c.closed = true
|
||||
|
||||
read_cancel := c.read_cancel
|
||||
conn := c.conn
|
||||
close(c.close_ch)
|
||||
c.mu.Unlock()
|
||||
|
||||
if read_cancel != nil {
|
||||
read_cancel()
|
||||
}
|
||||
|
||||
if closer, ok := conn.(conn_closer); ok {
|
||||
_ = closer.Close()
|
||||
}
|
||||
|
||||
c.emit(Event{Kind: EventClosed})
|
||||
}
|
||||
|
||||
@@ -100,6 +135,7 @@ func (c *Client) Send(ctx context.Context, content string, metadata string) erro
|
||||
|
||||
shared_key := c.state.SharedKey
|
||||
session_id_bytes := c.state.SessionIdBytes
|
||||
conn := c.conn
|
||||
c.mu.Unlock()
|
||||
|
||||
payload := outgoing_frame{
|
||||
@@ -123,10 +159,10 @@ func (c *Client) Send(ctx context.Context, content string, metadata string) erro
|
||||
return err
|
||||
}
|
||||
|
||||
return c.conn.WriteText(ctx, string(buf))
|
||||
return conn.WriteText(ctx, string(buf))
|
||||
}
|
||||
|
||||
func (c *Client) read_loop(pending []string) {
|
||||
func (c *Client) read_loop(ctx context.Context, pending []string) {
|
||||
for _, s := range pending {
|
||||
c.handle_incoming_text(s)
|
||||
}
|
||||
@@ -138,7 +174,7 @@ func (c *Client) read_loop(pending []string) {
|
||||
default:
|
||||
}
|
||||
|
||||
s, err := c.conn.ReadText(context.Background())
|
||||
s, err := c.conn.ReadText(ctx)
|
||||
if err != nil {
|
||||
c.emit(Event{Kind: EventError, Err: err})
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user