Skip to content

Flow - FBP / pipelines / workers pool

Build Status Go Report Card Coverage Status

Package flow provides support for very basic FBP / pipelines. It helps to structure multistage processing as a set of independent handlers communicating via channels. The typical use case is for ETL (extract, transform, load) type of processing. Package flow doesn’t introduce any high-level abstraction and keeps everything in the hand of the user.

Package pool provides a simplified version of flow suitable for cases with a single-handler flows.

Details about flow package

  • Each handler represents an async stage. It consumes data from an input channel and publishes results to an output channel.
  • Each handler runs in a separate goroutine.
  • User must implement Handler functions and add it to the Flow.
  • Each handler usually creates an output channel, reads from the input channel, processes data, sends results to the output channel and closes the output channel.
  • Processing sequence determined by the order of those handlers.
  • Any Handler can run in multiple concurrent goroutines (workers) by using the Parallel decorator.
  • FanOut allows to pass multiple handlers in broadcast mode, i.e., each handler gets every input record. Outputs from these handlers merged into single output channel.
  • Processing error detected as return error value from user’s handler func. Such error interrupts all other running handlers gracefully and won’t keep any goroutine running/leaking.
  • Each Flow object can be executed only once.
  • Handler should handle context cancellation as a termination signal.

Install and update

go get -u github.com/go-pkgz/flow

Example of the flow’s handler

// ReaderHandler creates flow.Handler, reading strings from any io.Reader
func ReaderHandler(reader io.Reader) Handler {
    return func(ctx context.Context, ch chan interface{}) (chan interface{}, func() error) {
        metrics := flow.GetMetrics(ctx) // metrics collects how many records read with "read" key.

        readerCh := make(chan interface{}, 1000)
        readerFn := func() error {
            defer close(readerCh)

            scanner := bufio.NewScanner(reader)
            for scanner.Scan() {

                select {
                case readerCh <- scanner.Text():
                    metrics.Inc("read")
                case <-ctx.Done():
                    return ctx.Err()
                }
            }
            return errors.Wrap(scanner.Err(), "scanner failed")
        }
        return readerCh, readerFn
    }
}

Usage of the flow package

for complete example see example

// flow illustrates the use of a Flow for concurrent pipeline running each handler in separate goroutine.
func ExampleFlow_flow() {

    f := New() // create new empty Flow
    f.Add(     // add handlers. Note: handlers can be added directly in New

        // first handler, generate 100 initial values.
        func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
            out = make(chan interface{}, 100) // example of non-async handler
            for i := 1; i <= 100; i++ {
                out <- i
            }
            close(out)      // each handler has to close out channel
            return out, nil // no runnable function for non-async handler
        },

        // second handler - picks odd numbers only and multiply
        func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
            out = make(chan interface{}) // async handler makes its out channel
            runFn = func() error {
                defer close(out) // handler should close out channel
                for e := range in {
                    val := e.(int)
                    if val%2 == 0 {
                        continue
                    }
                    f.Metrics().Inc("passed") // increment user-define metric "passed"

                    // send result to the next stage with flow.Send helper. Also checks for cancellation
                    if err := Send(ctx, out, val*rand.Int()); err != nil {
                        return err
                    }
                }
                return nil
            }
            return out, runFn
        },

        // final handler - sum all numbers
        func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
            out = make(chan interface{}, 1)
            runFn = func() error {
                defer close(out)
                sum := 0
                for {
                    select {
                    case e, more := <-in:
                        if !more {
                            out <- sum //send result
                            return nil
                        }
                        val := e.(int)
                        sum += val

                    case <-ctx.Done():
                        return ctx.Err()
                    }
                }
            }
            return out, runFn
        },
    )

    f.Go() // activate flow

    // wait for all handlers to complete
    if err := f.Wait(); err == nil {
        fmt.Printf("all done, result=%v, passed=%d", <-f.Channel(), f.Metrics().Get("passed"))
    }
}
// illustrates the use of a Flow for concurrent pipeline running some handlers in parallel way.
func ExampleFlow_parallel() {

    f := New() // create new empty Flow

    // make flow with mixed singles and parallel handlers and activate
    f.Add(

        // generate 100 initial values in single handler
        func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
            out = make(chan interface{}, 100) // example of non-async handler
            for i := 1; i <= 100; i++ {
                out <- i
            }
            close(out)      // each handler has to close out channel
            return out, nil // no runnable function for non-async handler
        },

        // multiple all numbers in 10 parallel handlers
        f.Parallel(10, func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
            out = make(chan interface{}) // async handler makes its out channel
            runFn = func() error {
                defer close(out) // handler should close out channel
                for e := range in {
                    val := e.(int)
                    select {
                    case out <- val * rand.Int(): // send result to the next stage
                    case <-ctx.Done(): // check for cancellation
                        return ctx.Err()
                    }
                }
                return nil
            }
            return out, runFn
        }),

        // print all numbers
        func(ctx context.Context, in chan interface{}) (out chan interface{}, runFn func() error) {
            runFn = func() error {
                defer close(out)
                sum := 0
                for e := range in {
                    val := e.(int)
                    sum += val
                    select {
                    case <-ctx.Done():
                        return ctx.Err()
                    default:
                    }
                }
                fmt.Printf("all done, result=%d", sum)
                return nil
            }
            return out, runFn
        },
    )

    // wait for all handlers to complete
    if err := f.Wait(); err == nil {
        fmt.Printf("all done, result=%v", <-f.Channel())
    }
}

Details about pool package

Pool package provides thin implementation of workers pool. In addition to the default “run a func in multiple goroutines” mode, it also provides an optional support of chunked workers. In this mode each key, detected by user-provide func, guaranteed to be processed by the same worker. Such mode needed for stateful flows where each set of input records has to be processed sequentially and some state should be kept. Another thing pool allows to define is the batch size. This one is a simple performance optimization collecting input request into a buffer and send them to worker channel in batches (slices) instead of per-submit call.

Options:

  • ChunkFn - the function returns string identifying the chunk
  • Batch - sets batch size (default 1)
  • ChanResSize sets the size of output buffered channel (default 1)
  • ChanWorkerSize sets the size of workers buffered channel (default 1)
  • ContinueOnError allows workers continuation after error occurred
  • OnCompletion sets callback (for each worker) called on successful completion

worker function

Worker function passed by user and runs in multiple workers (goroutines) concurrently. This is the function: type workerFn func(ctx context.Context, inp interface{}, sender SenderFn, store WorkerStore} error

It takes inp parameter, does the job and optionally send result(s) with SenderFn to the common results channel. Error will terminate all workers unless ContinueOnError set.

Note: workerFn can be stateful, collect anything it needs and sends 0 or more results by calling SenderFn one or more times.

worker store

Each worker gets WorkerStore and can be used as thread-safe per-worker storage for any intermediate results.

type WorkerStore interface {
    Set(key string, val interface{})
    Get(key string) (interface{}, bool)
    GetInt(key string) int
    GetFloat(key string) float64
    GetString(key string) string
    GetBool(key string) bool
    Keys() []string
    Delete(key string)
}

alternatively state can be kept outside of workers as a slice of values and accessed by worker ID.

usage

    p := pool.New(8, func(ctx context.Context, v interface{}, sendFn pool.Sender, ws pool.WorkerStore} error {
        // worker function gets input v processes it and response(s) channel to send results

        input, ok := v.(string) // in this case it gets string as input
        if !ok {
            return errors.New("incorrect input type")
        }   
        // do something with input
        // ...

        v := ws.GetInt("something")  // access thread-local var

        sendFn("foo", nil) // send "foo" and nil error     
        sendFn("bar", nil) // send "foo" and nil error     
        pool.Metrics(ctx).Inc("counter")
        ws.Set("something", 1234) // keep thread-local things
       return "something", true, nil
    })

    cursor, err := p.Go(context.TODO()) // start all workers in 8 goroutines and get back result's cursor

    // submit values (consumer side)
    go func() {
        p.Submit("something")
        p.Submit("something else")
        p.Close() // indicates completion of all inputs
    }()   

    var v interface{}
    for cursor(ctx, &v) {
        log.Print(v)  // print value
    }

    if cursor.Err() != nil { // error happened
        return cursor.Err()
    } 

    // alternatively read all from the cursor (response channel)
    res, err := cursor.All(ctx)

    // metrics the same as for flow
    metrics := pool.Metrics()
    log.Print(metrics.Get("counter"))