Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔥 Feature (v3): Add buffered streaming support #3131

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package fiber

import (
"bufio"
"bytes"
"context"
"crypto/tls"
Expand Down Expand Up @@ -1669,6 +1670,13 @@ func (c *DefaultCtx) SendStream(stream io.Reader, size ...int) error {
return nil
}

// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
c.fasthttp.Response.SetBodyStreamWriter(fasthttp.StreamWriter(streamWriter))

return nil
}

// Set sets the response's HTTP header field to the specified key, value.
func (c *DefaultCtx) Set(key, val string) {
c.fasthttp.Response.Header.Set(key, val)
Expand Down
3 changes: 3 additions & 0 deletions ctx_interface_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions ctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"text/template"
"time"
Expand Down Expand Up @@ -4327,6 +4328,73 @@ func Test_Ctx_SendStream(t *testing.T) {
require.Equal(t, "Hello bufio", string(c.Response().Body()))
}

// go test -run Test_Ctx_SendStreamWriter
func Test_Ctx_SendStreamWriter(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})

err := c.SendStreamWriter(func(w *bufio.Writer) {
w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error
})
require.NoError(t, err)
require.Equal(t, "Don't crash please", string(c.Response().Body()))

err = c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
if err := w.Flush(); err != nil {
t.Errorf("unexpected error: %s", err)
return
}
}
})
require.NoError(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n", string(c.Response().Body()))

err = c.SendStreamWriter(func(_ *bufio.Writer) {})
require.NoError(t, err)
require.Empty(t, c.Response().Body())
}

// go test -run Test_Ctx_SendStreamWriter_Interrupted
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})

var mutex sync.Mutex
startChan := make(chan bool)
interruptStreamWriter := func() {
<-startChan
time.Sleep(5 * time.Millisecond)
mutex.Lock()
c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
mutex.Unlock()
}
err := c.SendStreamWriter(func(w *bufio.Writer) {
go interruptStreamWriter()

startChan <- true
for lineNum := 1; lineNum <= 5; lineNum++ {
mutex.Lock()
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
mutex.Unlock()

if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}

time.Sleep(1500 * time.Microsecond)
}
})
require.NoError(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body()))
}

// go test -run Test_Ctx_Set
func Test_Ctx_Set(t *testing.T) {
t.Parallel()
Expand Down
41 changes: 41 additions & 0 deletions docs/api/ctx.md
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,47 @@ app.Get("/", func(c fiber.Ctx) error {
})
```

## SendStreamWriter

Sets the response body stream writer.

:::note
The argument `streamWriter` represents a function that populates
the response body using a buffered stream writer.
:::

```go title="Signature"
func (c Ctx) SendStreamWriter(streamWriter func(*bufio.Writer)) error
```

```go title="Example"
app.Get("/", func (c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
fmt.Fprintf(w, "Hello, World!\n")
})
// => "Hello, World!"
})
```

:::info
To flush data before the function returns, you can call `w.Flush()`
on the provided writer. Otherwise, the buffered stream flushes after
`streamWriter` returns.
:::

```go title="Example"
app.Get("/wait", func(c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
fmt.Fprintf(w, "Waiting for 10 seconds\n")
if err := w.Flush(); err != nil {
log.Print("User quit early")
}
time.Sleep(10 * time.Second)
fmt.Fprintf(w, "Done!\n")
})
})
```

## Set

Sets the response’s HTTP header field to the specified `key`, `value`.
Expand Down
Loading