You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
214 lines
5.3 KiB
214 lines
5.3 KiB
/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
|
|
package thrift
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"io"
|
|
)
|
|
|
|
// StreamTransport is a Transport made of an io.Reader and/or an io.Writer
|
|
type StreamTransport struct {
|
|
io.Reader
|
|
io.Writer
|
|
isReadWriter bool
|
|
closed bool
|
|
}
|
|
|
|
type StreamTransportFactory struct {
|
|
Reader io.Reader
|
|
Writer io.Writer
|
|
isReadWriter bool
|
|
}
|
|
|
|
func (p *StreamTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
|
|
if trans != nil {
|
|
t, ok := trans.(*StreamTransport)
|
|
if ok {
|
|
if t.isReadWriter {
|
|
return NewStreamTransportRW(t.Reader.(io.ReadWriter)), nil
|
|
}
|
|
if t.Reader != nil && t.Writer != nil {
|
|
return NewStreamTransport(t.Reader, t.Writer), nil
|
|
}
|
|
if t.Reader != nil && t.Writer == nil {
|
|
return NewStreamTransportR(t.Reader), nil
|
|
}
|
|
if t.Reader == nil && t.Writer != nil {
|
|
return NewStreamTransportW(t.Writer), nil
|
|
}
|
|
return &StreamTransport{}, nil
|
|
}
|
|
}
|
|
if p.isReadWriter {
|
|
return NewStreamTransportRW(p.Reader.(io.ReadWriter)), nil
|
|
}
|
|
if p.Reader != nil && p.Writer != nil {
|
|
return NewStreamTransport(p.Reader, p.Writer), nil
|
|
}
|
|
if p.Reader != nil && p.Writer == nil {
|
|
return NewStreamTransportR(p.Reader), nil
|
|
}
|
|
if p.Reader == nil && p.Writer != nil {
|
|
return NewStreamTransportW(p.Writer), nil
|
|
}
|
|
return &StreamTransport{}, nil
|
|
}
|
|
|
|
func NewStreamTransportFactory(reader io.Reader, writer io.Writer, isReadWriter bool) *StreamTransportFactory {
|
|
return &StreamTransportFactory{Reader: reader, Writer: writer, isReadWriter: isReadWriter}
|
|
}
|
|
|
|
func NewStreamTransport(r io.Reader, w io.Writer) *StreamTransport {
|
|
return &StreamTransport{Reader: bufio.NewReader(r), Writer: bufio.NewWriter(w)}
|
|
}
|
|
|
|
func NewStreamTransportR(r io.Reader) *StreamTransport {
|
|
return &StreamTransport{Reader: bufio.NewReader(r)}
|
|
}
|
|
|
|
func NewStreamTransportW(w io.Writer) *StreamTransport {
|
|
return &StreamTransport{Writer: bufio.NewWriter(w)}
|
|
}
|
|
|
|
func NewStreamTransportRW(rw io.ReadWriter) *StreamTransport {
|
|
bufrw := bufio.NewReadWriter(bufio.NewReader(rw), bufio.NewWriter(rw))
|
|
return &StreamTransport{Reader: bufrw, Writer: bufrw, isReadWriter: true}
|
|
}
|
|
|
|
func (p *StreamTransport) IsOpen() bool {
|
|
return !p.closed
|
|
}
|
|
|
|
// implicitly opened on creation, can't be reopened once closed
|
|
func (p *StreamTransport) Open() error {
|
|
if !p.closed {
|
|
return NewTTransportException(ALREADY_OPEN, "StreamTransport already open.")
|
|
} else {
|
|
return NewTTransportException(NOT_OPEN, "cannot reopen StreamTransport.")
|
|
}
|
|
}
|
|
|
|
// Closes both the input and output streams.
|
|
func (p *StreamTransport) Close() error {
|
|
if p.closed {
|
|
return NewTTransportException(NOT_OPEN, "StreamTransport already closed.")
|
|
}
|
|
p.closed = true
|
|
closedReader := false
|
|
if p.Reader != nil {
|
|
c, ok := p.Reader.(io.Closer)
|
|
if ok {
|
|
e := c.Close()
|
|
closedReader = true
|
|
if e != nil {
|
|
return e
|
|
}
|
|
}
|
|
p.Reader = nil
|
|
}
|
|
if p.Writer != nil && (!closedReader || !p.isReadWriter) {
|
|
c, ok := p.Writer.(io.Closer)
|
|
if ok {
|
|
e := c.Close()
|
|
if e != nil {
|
|
return e
|
|
}
|
|
}
|
|
p.Writer = nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Flushes the underlying output stream if not null.
|
|
func (p *StreamTransport) Flush(ctx context.Context) error {
|
|
if p.Writer == nil {
|
|
return NewTTransportException(NOT_OPEN, "Cannot flush null outputStream")
|
|
}
|
|
f, ok := p.Writer.(Flusher)
|
|
if ok {
|
|
err := f.Flush()
|
|
if err != nil {
|
|
return NewTTransportExceptionFromError(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *StreamTransport) Read(c []byte) (n int, err error) {
|
|
n, err = p.Reader.Read(c)
|
|
if err != nil {
|
|
err = NewTTransportExceptionFromError(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (p *StreamTransport) ReadByte() (c byte, err error) {
|
|
f, ok := p.Reader.(io.ByteReader)
|
|
if ok {
|
|
c, err = f.ReadByte()
|
|
} else {
|
|
c, err = readByte(p.Reader)
|
|
}
|
|
if err != nil {
|
|
err = NewTTransportExceptionFromError(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (p *StreamTransport) Write(c []byte) (n int, err error) {
|
|
n, err = p.Writer.Write(c)
|
|
if err != nil {
|
|
err = NewTTransportExceptionFromError(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (p *StreamTransport) WriteByte(c byte) (err error) {
|
|
f, ok := p.Writer.(io.ByteWriter)
|
|
if ok {
|
|
err = f.WriteByte(c)
|
|
} else {
|
|
err = writeByte(p.Writer, c)
|
|
}
|
|
if err != nil {
|
|
err = NewTTransportExceptionFromError(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (p *StreamTransport) WriteString(s string) (n int, err error) {
|
|
f, ok := p.Writer.(stringWriter)
|
|
if ok {
|
|
n, err = f.WriteString(s)
|
|
} else {
|
|
n, err = p.Writer.Write([]byte(s))
|
|
}
|
|
if err != nil {
|
|
err = NewTTransportExceptionFromError(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (p *StreamTransport) RemainingBytes() (num_bytes uint64) {
|
|
const maxSize = ^uint64(0)
|
|
return maxSize // the thruth is, we just don't know unless framed is used
|
|
}
|
|
|