Flow - FBP / pipelines / workers pool¶
Package flow
provides support for very basic FBP / pipelines. It helps to structure multistage processing as a set of independent handlers communicating via channels. The typical use case is for ETL (extract, transform, load) type of processing. Package flow
doesn’t introduce any high-level abstraction and keeps everything in the hand of the user.
Package pool
provides a simplified version of flow
suitable for cases with a single-handler flows.
Details about flow
package¶
- Each handler represents an async stage. It consumes data from an input channel and publishes results to an output channel.
- Each handler runs in a separate goroutine.
- User must implement Handler functions and add it to the Flow.
- Each handler usually creates an output channel, reads from the input channel, processes data, sends results to the output channel and closes the output channel.
- Processing sequence determined by the order of those handlers.
- Any
Handler
can run in multiple concurrent goroutines (workers) by using theParallel
decorator. FanOut
allows to pass multiple handlers in broadcast mode, i.e., each handler gets every input record. Outputs from these handlers merged into single output channel.- Processing error detected as return error value from user’s handler func. Such error interrupts all other running handlers gracefully and won’t keep any goroutine running/leaking.
- Each
Flow
object can be executed only once. Handler
should handle context cancellation as a termination signal.
Install and update¶
go get -u github.com/go-pkgz/flow
Example of the flow’s handler¶
// ReaderHandler creates flow.Handler, reading strings from any io.Reader
func ReaderHandler(reader io.Reader) Handler {
return func(ctx context.Context, ch chan interface{}) (chan interface{}, func() error) {
metrics := flow.GetMetrics(ctx) // metrics collects how many records read with "read" key.
readerCh := make(chan interface{}, 1000)
readerFn := func() error {
defer close(readerCh)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
select {
case readerCh <- scanner.Text():
metrics.Inc("read")
case <-ctx.Done():
return ctx.Err()
}
}
return errors.Wrap(scanner.Err(), "scanner failed")
}
return readerCh, readerFn
}
}
Usage of the flow package¶
for complete example see example
// flow illustrates the use of a Flow for concurrent pipeline running each handler in separate goroutine.
func ExampleFlow_flow() {
f := New() // create new empty Flow
f.Add( // add handlers. Note: handlers can be added directly in New
// first handler, generate 100 initial values.
func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
out = make(chan interface{}, 100) // example of non-async handler
for i := 1; i <= 100; i++ {
out <- i
}
close(out) // each handler has to close out channel
return out, nil // no runnable function for non-async handler
},
// second handler - picks odd numbers only and multiply
func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
out = make(chan interface{}) // async handler makes its out channel
runFn = func() error {
defer close(out) // handler should close out channel
for e := range in {
val := e.(int)
if val%2 == 0 {
continue
}
f.Metrics().Inc("passed") // increment user-define metric "passed"
// send result to the next stage with flow.Send helper. Also checks for cancellation
if err := Send(ctx, out, val*rand.Int()); err != nil {
return err
}
}
return nil
}
return out, runFn
},
// final handler - sum all numbers
func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
out = make(chan interface{}, 1)
runFn = func() error {
defer close(out)
sum := 0
for {
select {
case e, more := <-in:
if !more {
out <- sum //send result
return nil
}
val := e.(int)
sum += val
case <-ctx.Done():
return ctx.Err()
}
}
}
return out, runFn
},
)
f.Go() // activate flow
// wait for all handlers to complete
if err := f.Wait(); err == nil {
fmt.Printf("all done, result=%v, passed=%d", <-f.Channel(), f.Metrics().Get("passed"))
}
}
// illustrates the use of a Flow for concurrent pipeline running some handlers in parallel way.
func ExampleFlow_parallel() {
f := New() // create new empty Flow
// make flow with mixed singles and parallel handlers and activate
f.Add(
// generate 100 initial values in single handler
func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
out = make(chan interface{}, 100) // example of non-async handler
for i := 1; i <= 100; i++ {
out <- i
}
close(out) // each handler has to close out channel
return out, nil // no runnable function for non-async handler
},
// multiple all numbers in 10 parallel handlers
f.Parallel(10, func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
out = make(chan interface{}) // async handler makes its out channel
runFn = func() error {
defer close(out) // handler should close out channel
for e := range in {
val := e.(int)
select {
case out <- val * rand.Int(): // send result to the next stage
case <-ctx.Done(): // check for cancellation
return ctx.Err()
}
}
return nil
}
return out, runFn
}),
// print all numbers
func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
runFn = func() error {
defer close(out)
sum := 0
for e := range in {
val := e.(int)
sum += val
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
fmt.Printf("all done, result=%d", sum)
return nil
}
return out, runFn
},
)
// wait for all handlers to complete
if err := f.Wait(); err == nil {
fmt.Printf("all done, result=%v", <-f.Channel())
}
}
Details about pool
package¶
Pool package provides thin implementation of workers pool. In addition to the default “run a func in multiple goroutines” mode, it also provides an optional support of chunked workers. In this mode each key, detected by user-provide func, guaranteed to be processed by the same worker. Such mode needed for stateful flows where each set of input records has to be processed sequentially and some state should be kept. Another thing pool
allows to define is the batch size. This one is a simple performance optimization collecting input request into a buffer and send them to worker channel in batches (slices) instead of per-submit call.
Options:
ChunkFn
- the function returns string identifying the chunkBatch
- sets batch size (default 1)ChanResSize
sets the size of output buffered channel (default 1)ChanWorkerSize
sets the size of workers buffered channel (default 1)ContinueOnError
allows workers continuation after error occurredOnCompletion
sets callback (for each worker) called on successful completion
worker function¶
Worker function passed by user and runs in multiple workers (goroutines) concurrently. This is the function: type workerFn func(ctx context.Context, inp interface{}, sender SenderFn, store WorkerStore} error
It takes inp
parameter, does the job and optionally send result(s) with SenderFn
to the common results channel. Error will terminate all workers unless ContinueOnError
set.
Note: workerFn
can be stateful, collect anything it needs and sends 0 or more results by calling SenderFn
one or more times.
worker store¶
Each worker gets WorkerStore
and can be used as thread-safe per-worker storage for any intermediate results.
type WorkerStore interface {
Set(key string, val interface{})
Get(key string) (interface{}, bool)
GetInt(key string) int
GetFloat(key string) float64
GetString(key string) string
GetBool(key string) bool
Keys() []string
Delete(key string)
}
alternatively state can be kept outside of workers as a slice of values and accessed by worker ID.
usage¶
p := pool.New(8, func(ctx context.Context, v interface{}, sendFn pool.Sender, ws pool.WorkerStore} error {
// worker function gets input v processes it and response(s) channel to send results
input, ok := v.(string) // in this case it gets string as input
if !ok {
return errors.New("incorrect input type")
}
// do something with input
// ...
v := ws.GetInt("something") // access thread-local var
sendFn("foo", nil) // send "foo" and nil error
sendFn("bar", nil) // send "foo" and nil error
pool.Metrics(ctx).Inc("counter")
ws.Set("something", 1234) // keep thread-local things
return "something", true, nil
})
cursor, err := p.Go(context.TODO()) // start all workers in 8 goroutines and get back result's cursor
// submit values (consumer side)
go func() {
p.Submit("something")
p.Submit("something else")
p.Close() // indicates completion of all inputs
}()
var v interface{}
for cursor(ctx, &v) {
log.Print(v) // print value
}
if cursor.Err() != nil { // error happened
return cursor.Err()
}
// alternatively read all from the cursor (response channel)
res, err := cursor.All(ctx)
// metrics the same as for flow
metrics := pool.Metrics()
log.Print(metrics.Get("counter"))