You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

158 lines
4.1 KiB

/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Allan Stockdill-Mander
*/
package mqtt
import (
"sync"
"time"
"github.com/eclipse/paho.mqtt.golang/packets"
)
//PacketAndToken is a struct that contains both a ControlPacket and a
//Token. This struct is passed via channels between the client interface
//code and the underlying code responsible for sending and receiving
//MQTT messages.
type PacketAndToken struct {
p packets.ControlPacket
t Token
}
//Token defines the interface for the tokens used to indicate when
//actions have completed.
type Token interface {
Wait() bool
WaitTimeout(time.Duration) bool
flowComplete()
Error() error
}
type baseToken struct {
m sync.RWMutex
complete chan struct{}
ready bool
err error
}
// Wait will wait indefinitely for the Token to complete, ie the Publish
// to be sent and confirmed receipt from the broker
func (b *baseToken) Wait() bool {
b.m.Lock()
defer b.m.Unlock()
if !b.ready {
<-b.complete
b.ready = true
}
return b.ready
}
// WaitTimeout takes a time in ms to wait for the flow associated with the
// Token to complete, returns true if it returned before the timeout or
// returns false if the timeout occurred. In the case of a timeout the Token
// does not have an error set in case the caller wishes to wait again
func (b *baseToken) WaitTimeout(d time.Duration) bool {
b.m.Lock()
defer b.m.Unlock()
if !b.ready {
select {
case <-b.complete:
b.ready = true
case <-time.After(d):
}
}
return b.ready
}
func (b *baseToken) flowComplete() {
close(b.complete)
}
func (b *baseToken) Error() error {
b.m.RLock()
defer b.m.RUnlock()
return b.err
}
func newToken(tType byte) Token {
switch tType {
case packets.Connect:
return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}}
case packets.Subscribe:
return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
case packets.Publish:
return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
case packets.Unsubscribe:
return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}}
case packets.Disconnect:
return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}}
}
return nil
}
//ConnectToken is an extension of Token containing the extra fields
//required to provide information about calls to Connect()
type ConnectToken struct {
baseToken
returnCode byte
}
//ReturnCode returns the acknowlegement code in the connack sent
//in response to a Connect()
func (c *ConnectToken) ReturnCode() byte {
c.m.RLock()
defer c.m.RUnlock()
return c.returnCode
}
//PublishToken is an extension of Token containing the extra fields
//required to provide information about calls to Publish()
type PublishToken struct {
baseToken
messageID uint16
}
//MessageID returns the MQTT message ID that was assigned to the
//Publish packet when it was sent to the broker
func (p *PublishToken) MessageID() uint16 {
return p.messageID
}
//SubscribeToken is an extension of Token containing the extra fields
//required to provide information about calls to Subscribe()
type SubscribeToken struct {
baseToken
subs []string
subResult map[string]byte
}
//Result returns a map of topics that were subscribed to along with
//the matching return code from the broker. This is either the Qos
//value of the subscription or an error code.
func (s *SubscribeToken) Result() map[string]byte {
s.m.RLock()
defer s.m.RUnlock()
return s.subResult
}
//UnsubscribeToken is an extension of Token containing the extra fields
//required to provide information about calls to Unsubscribe()
type UnsubscribeToken struct {
baseToken
}
//DisconnectToken is an extension of Token containing the extra fields
//required to provide information about calls to Disconnect()
type DisconnectToken struct {
baseToken
}