-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🔥 Feature (v3): Add buffered streaming support #3131
base: main
Are you sure you want to change the base?
🔥 Feature (v3): Add buffered streaming support #3131
Conversation
Create a new `*DefaultCtx` method called `SendStreamWriter()` that maps to fasthttp's `Response.SetBodyStreamWriter()`
- Adds Test_Ctx_SendStreamWriter to ctx_test.go
- Adds Test_Ctx_SendStreamWriter_Interrupted to ctx_test.go - (Work-In-Progress) This test verifies that some data is still sent before a client disconnects when using the method `c.SendStreamWriter()`. **Note:** Running this test reports a race condition when using the `-race` flag or running `make test`. The test uses a channel and mutex to prevent race conditions, but still triggers a warning.
WalkthroughThe pull request introduces a new method, Changes
Assessment against linked issues
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@grivera64 Try this: func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})
var mutex sync.Mutex
var wg sync.WaitGroup // WaitGroup to synchronize goroutines
startChan := make(chan bool)
interruptStreamWriter := func() {
defer wg.Done() // Notify WaitGroup when done
<-startChan
time.Sleep(5 * time.Millisecond)
mutex.Lock()
c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
mutex.Unlock()
}
wg.Add(1) // Increment WaitGroup counter before starting goroutine
err := c.SendStreamWriter(func(w *bufio.Writer) {
go interruptStreamWriter()
startChan <- true
for lineNum := 1; lineNum <= 5; lineNum++ {
mutex.Lock()
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
mutex.Unlock()
if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}
time.Sleep(1500 * time.Microsecond)
}
})
require.NoError(t, err)
wg.Wait() // Wait for the interruptStreamWriter to finish
// Protect access to the response body with the mutex
mutex.Lock()
defer mutex.Unlock()
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body()))
} |
@gaby Thanks for the recommendation! Adding the Wait Group does remove the race error, but now I am getting an empty response body. I think this may be due to one of the following:
// go test -run Test_Ctx_SendStreamWriter_Interrupted
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})
var mutex sync.Mutex
var wg sync.WaitGroup
startChan := make(chan bool)
interruptStreamWriter := func() {
wg.Add(1)
defer wg.Done()
<-startChan
time.Sleep(5 * time.Millisecond)
mutex.Lock()
c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
mutex.Unlock()
}
wg.Add(1)
err := c.SendStreamWriter(func(w *bufio.Writer) {
go interruptStreamWriter()
defer wg.Done()
startChan <- true
for lineNum := 1; lineNum <= 5; lineNum++ {
mutex.Lock()
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
mutex.Unlock()
if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}
time.Sleep(1500 * time.Microsecond)
}
})
require.NoError(t, err)
// Wait for StreamWriter and the goroutine to finish
wg.Wait()
mutex.Lock()
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body()))
mutex.Unlock()
} I will file an issue on valyala/fasthttp to ask how to mock client disconnections for this test. If it's not possible, we could remove this test case, as most of the other tests do not test for client disconnection issues. |
Hey all, after reading a bit more on Fiber and Fasthttp documentation, I believe that the race condition is due to I tried using a modified version of app's This is the new race condition warning: WARNING: DATA RACE
Read at 0x00c00016c130 by goroutine 9:
github.com/gofiber/fiber/v3.(*testConn).Write()
/fiber/helpers.go:628 +0x6b
bufio.(*Writer).Flush()
/usr/local/go/src/bufio/bufio.go:639 +0xee
github.com/valyala/fasthttp.writeChunk()
/go/pkg/mod/github.com/valyala/fasthttp@v1.55.0/http.go:2250 +0x10b
github.com/valyala/fasthttp.writeBodyChunked()
/go/pkg/mod/github.com/valyala/fasthttp@v1.55.0/http.go:2170 +0xce
github.com/valyala/fasthttp.(*Response).writeBodyStream()
/go/pkg/mod/github.com/valyala/fasthttp@v1.55.0/http.go:2066 +0x338
github.com/valyala/fasthttp.(*Response).Write()
/go/pkg/mod/github.com/valyala/fasthttp@v1.55.0/http.go:1967 +0x2c4
github.com/valyala/fasthttp.writeResponse()
/go/pkg/mod/github.com/valyala/fasthttp@v1.55.0/server.go:2589 +0xb8
github.com/valyala/fasthttp.(*Server).serveConn()
/go/pkg/mod/github.com/valyala/fasthttp@v1.55.0/server.go:2432 +0x1ead
github.com/valyala/fasthttp.(*Server).ServeConn()
/go/pkg/mod/github.com/valyala/fasthttp@v1.55.0/server.go:2042 +0x154
github.com/gofiber/fiber/v3.(*App).TestWithInterrupt.func1()
/fiber/app.go:975 +0xde
... This was the old warning when directly using github.com/valyala/fasthttp.(*Response).SetBodyStream()
/go/pkg/mod/github.com/valyala/fasthttp@v1.55.0/http.go:249 +0x4f
github.com/valyala/fasthttp.(*Response).SetBodyStreamWriter()
/go/pkg/mod/github.com/valyala/fasthttp@v1.55.0/http.go:292 +0x64
... I believe fixing this race warning in the cleanest way possible would require an upstream PR to fasthttp (most likely adding a mutex for Based on Fiber's current codebase, there doesn't seem to be other interrupt tests (while other tests ignore output when the response times out). If this isn't something we should be testing for, we could just remove the interrupt test and keep the remaining tests. What are your thoughts on this? Please let me know if you want to see the modified Edit: I will still try to work with the modified |
After editing // TestWithInterrupt is used for internal debugging by passing a *http.Request with an interruptAfter duration.
func (app *App) TestWithInterrupt(req *http.Request, interruptAfter time.Duration) (*http.Response, error) With this change, I was able to use that in the test case as written below as a fix: func Test_Ctx_SendStreamWriter_Interrupted_New(t *testing.T) {
t.Parallel()
app := New(Config{StreamRequestBody: true})
app.Get("/", func(c Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
time.Sleep(time.Duration(lineNum) * time.Millisecond)
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
if err := w.Flush(); err != nil {
if lineNum <= 3 {
t.Errorf("unexpected error: %s", err)
}
return
}
}
})
})
resp, err := app.TestWithInterrupt(httptest.NewRequest(MethodGet, "/", nil), 8*time.Millisecond)
require.NoError(t, err, "app.TestWithInterrupt(req)")
body, err := io.ReadAll(resp.Body)
require.NotNil(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(body))
} Would you all like me to write an issue/PR for adding |
Description
This feature adds buffered streaming support to Fiber v3 through the new Ctx method
SendStreamWriter
:c.SendStreamWriter()
essentially is a wrapper for calling fasthttp'sSetBodyStreamWriter
method. This feature wraps this method in the same way thatc.SendStream()
wraps fasthttp'sSetBodyStream()
.With this feature, Fiber users can send shorter segments of content over persistent HTTP connections. This functionality is important for several web-based applications such as:
For example, a simple self-contained SSE example using this new feature can be setup as the following:
Fixes #3127
Type of change
Please delete options that are not relevant.
CURRENT STATUS
Features
Ctx.SendStreamWriter()
to ctx.goUnit Tests
Add
Test_Ctx_SendStream_Writer
to ctx_test.goAdd
Test_Ctx_SendStreamWriter_Interrupted
to ctx_test.goDocumentation
Ctx.SendStreamWriter()
docs to docs/api/ctx.mdBenchmarks
Checklist
Before you submit your pull request, please make sure you meet these requirements:
/docs/
directory for Fiber's documentation.