Skip to content

Commit

Permalink
Merge pull request #71 from fieldryand/goroutine-stream-updates
Browse files Browse the repository at this point in the history
Stream one execution per event
  • Loading branch information
fieldryand committed Feb 18, 2024
2 parents 68397b5 + 0b8049e commit 65376e4
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 135 deletions.
2 changes: 0 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import "github.com/fieldryand/goflow/v2"
func main() {
options := goflow.Options{
UIPath: "ui/",
Streaming: true,
ShowExamples: true,
WithSeconds: true,
}
Expand Down Expand Up @@ -230,7 +229,6 @@ func main() {
You can pass different options to the engine. Options currently supported:
- `Store`: This is [described in more detail below.](#storage)
- `UIPath`: The path to the dashboard code. The default value is an empty string, meaning Goflow serves only the API and not the dashboard. Suggested value if you want the dashboard: `ui/`
- `Streaming`: Whether to stream updates to the dashboard. The default value is `false`, but if you use the dashboard then it's recommended to change this.
- `ShowExamples`: Whether to show the example jobs. Default value: `false`
- `WithSeconds`: Whether to include the seconds field in the cron spec. See the [cron package documentation](https://github.com/robfig/cron) for details. Default value: `false`

Expand Down
1 change: 0 additions & 1 deletion example/goflow-example.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
func main() {
options := goflow.Options{
UIPath: "ui/",
Streaming: true,
ShowExamples: true,
WithSeconds: true,
}
Expand Down
22 changes: 12 additions & 10 deletions execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (

// Execution of a job.
type execution struct {
ID uuid.UUID `json:"id"`
JobName string `json:"job"`
StartedAt string `json:"submitted"`
State state `json:"state"`
TaskExecutions []taskExecution `json:"tasks"`
ID uuid.UUID `json:"id"`
JobName string `json:"job"`
StartedAt string `json:"submitted"`
ModifiedTimestamp string `json:"modifiedTimestamp"`
State state `json:"state"`
TaskExecutions []taskExecution `json:"tasks"`
}

type taskExecution struct {
Expand All @@ -28,11 +29,12 @@ func (j *Job) newExecution() *execution {
taskExecutions = append(taskExecutions, taskrun)
}
return &execution{
ID: uuid.New(),
JobName: j.Name,
StartedAt: time.Now().UTC().Format(time.RFC3339Nano),
State: none,
TaskExecutions: taskExecutions}
ID: uuid.New(),
JobName: j.Name,
StartedAt: time.Now().UTC().Format(time.RFC3339Nano),
ModifiedTimestamp: time.Now().UTC().Format(time.RFC3339Nano),
State: none,
TaskExecutions: taskExecutions}
}

// Persist a new execution.
Expand Down
2 changes: 1 addition & 1 deletion goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (g *Goflow) Run(port string) {
log.SetFlags(0)
log.SetOutput(new(logWriter))
g.router.Use(gin.Recovery())
g.addStreamRoute()
g.addStreamRoute(true)
g.addAPIRoutes()
if g.Options.UIPath != "" {
g.addUIRoutes()
Expand Down
10 changes: 9 additions & 1 deletion goflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ func TestStreamRoute(t *testing.T) {
if w.Code != http.StatusOK {
t.Errorf("httpStatus is %d, expected %d", w.Code, http.StatusOK)
}

w = CreateTestResponseRecorder()
req, _ = http.NewRequest("GET", "/stream?jobname=example-complex-analytics", nil)
router.ServeHTTP(w, req)

if w.Code != http.StatusOK {
t.Errorf("httpStatus is %d, expected %d", w.Code, http.StatusOK)
}
}

// check for a race against /stream
Expand All @@ -207,7 +215,7 @@ func exampleRouter() *gin.Engine {
g.execute("example-custom-operator")
g.Use(DefaultLogger())
g.addStaticRoutes()
g.addStreamRoute()
g.addStreamRoute(false)
g.addUIRoutes()
g.addAPIRoutes()
return g.router
Expand Down
2 changes: 2 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"sync"
"time"

"github.com/philippgille/gokv"
)
Expand Down Expand Up @@ -194,6 +195,7 @@ func (j *Job) run(store gokv.Store, e *execution) error {

// Sync to store
e.State = j.loadState()
e.ModifiedTimestamp = time.Now().UTC().Format(time.RFC3339Nano)
syncStateToStore(store, e, write.key, write.val)

if j.allDone() {
Expand Down
4 changes: 2 additions & 2 deletions routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ func (g *Goflow) addStaticRoutes() *Goflow {
return g
}

func (g *Goflow) addStreamRoute() *Goflow {
g.router.GET("/stream", g.stream(g.Options.Streaming))
func (g *Goflow) addStreamRoute(keepOpen bool) *Goflow {
g.router.GET("/stream", g.stream(keepOpen))
return g
}

Expand Down
56 changes: 35 additions & 21 deletions stream.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,64 @@
package goflow

import (
"encoding/json"
"io"
"time"

"github.com/gin-gonic/gin"
)

// Setting clientDisconnect to false makes testing possible.
func (g *Goflow) stream(clientDisconnect bool) func(*gin.Context) {
// Set keepOpen to false when testing--one event will be sent and
// then the channel is closed by the server.
func (g *Goflow) stream(keepOpen bool) func(*gin.Context) {

return func(c *gin.Context) {
chanStream := make(chan string)
job := c.Query("jobname")

history := make([]*execution, 0)

// open a channel for live executions
chanStream := make(chan *execution)

// periodically push the list of job runs into the stream
go func() {
defer close(chanStream)
// Periodically push the list of job runs into the stream
for {
for jobname := range g.Jobs {
executions, _ := readExecutions(g.Store, jobname)
marshalled, _ := marshalExecutions(jobname, executions)
chanStream <- string(marshalled)
for _, e := range executions {

// make sure it wasn't already sent
inHistory := false

for _, h := range history {
if e.ID == h.ID && e.ModifiedTimestamp == h.ModifiedTimestamp {
inHistory = true
}
}

if !inHistory {
if job != "" && job == e.JobName {
chanStream <- e
history = append(history, e)
} else if job == "" {
chanStream <- e
history = append(history, e)
}
}

}
}
time.Sleep(time.Second * 1)
}
}()

c.Stream(func(w io.Writer) bool {
if msg, ok := <-chanStream; ok {
c.SSEvent("message", msg)
return clientDisconnect
return keepOpen
}
return false
})
}

}

// Obtain locks and put the response in the structure expected
// by the streaming endpoint.
func marshalExecutions(name string, executions []*execution) ([]byte, error) {
var msg struct {
JobName string `json:"jobName"`
Executions []*execution `json:"executions"`
}
msg.JobName = name
msg.Executions = executions
result, ok := json.Marshal(msg)
return result, ok
}
2 changes: 1 addition & 1 deletion ui/html/index.html.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@
</body>
</html>
<script>
updateJobStateCircles();
indexPageEventListener();
</script>
2 changes: 1 addition & 1 deletion ui/html/job.html.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@
<script src="/dist/dist.js"></script>
<script>goflowUI.graphViz({{ .jobName }})</script>
<script>updateJobActive({{ .jobName }})</script>
<script>readTaskStream({{ .jobName }})</script>
<script>jobPageEventListener({{ .jobName }})</script>
Loading

0 comments on commit 65376e4

Please sign in to comment.