feat: add RecvRaw (#896)
This commit is contained in:
@@ -32,17 +32,28 @@ type streamReader[T streamable] struct {
|
||||
}
|
||||
|
||||
func (stream *streamReader[T]) Recv() (response T, err error) {
|
||||
if stream.isFinished {
|
||||
err = io.EOF
|
||||
rawLine, err := stream.RecvRaw()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
response, err = stream.processLines()
|
||||
return
|
||||
err = stream.unmarshaler.Unmarshal(rawLine, &response)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (stream *streamReader[T]) RecvRaw() ([]byte, error) {
|
||||
if stream.isFinished {
|
||||
return nil, io.EOF
|
||||
}
|
||||
|
||||
return stream.processLines()
|
||||
}
|
||||
|
||||
//nolint:gocognit
|
||||
func (stream *streamReader[T]) processLines() (T, error) {
|
||||
func (stream *streamReader[T]) processLines() ([]byte, error) {
|
||||
var (
|
||||
emptyMessagesCount uint
|
||||
hasErrorPrefix bool
|
||||
@@ -53,9 +64,9 @@ func (stream *streamReader[T]) processLines() (T, error) {
|
||||
if readErr != nil || hasErrorPrefix {
|
||||
respErr := stream.unmarshalError()
|
||||
if respErr != nil {
|
||||
return *new(T), fmt.Errorf("error, %w", respErr.Error)
|
||||
return nil, fmt.Errorf("error, %w", respErr.Error)
|
||||
}
|
||||
return *new(T), readErr
|
||||
return nil, readErr
|
||||
}
|
||||
|
||||
noSpaceLine := bytes.TrimSpace(rawLine)
|
||||
@@ -68,11 +79,11 @@ func (stream *streamReader[T]) processLines() (T, error) {
|
||||
}
|
||||
writeErr := stream.errAccumulator.Write(noSpaceLine)
|
||||
if writeErr != nil {
|
||||
return *new(T), writeErr
|
||||
return nil, writeErr
|
||||
}
|
||||
emptyMessagesCount++
|
||||
if emptyMessagesCount > stream.emptyMessagesLimit {
|
||||
return *new(T), ErrTooManyEmptyStreamMessages
|
||||
return nil, ErrTooManyEmptyStreamMessages
|
||||
}
|
||||
|
||||
continue
|
||||
@@ -81,16 +92,10 @@ func (stream *streamReader[T]) processLines() (T, error) {
|
||||
noPrefixLine := bytes.TrimPrefix(noSpaceLine, headerData)
|
||||
if string(noPrefixLine) == "[DONE]" {
|
||||
stream.isFinished = true
|
||||
return *new(T), io.EOF
|
||||
return nil, io.EOF
|
||||
}
|
||||
|
||||
var response T
|
||||
unmarshalErr := stream.unmarshaler.Unmarshal(noPrefixLine, &response)
|
||||
if unmarshalErr != nil {
|
||||
return *new(T), unmarshalErr
|
||||
}
|
||||
|
||||
return response, nil
|
||||
return noPrefixLine, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user