More Go concurrency using pipelines with eAPI

As a follow on to my previous post on using Go channels for concurrency, I thought I would try and use the pipeline pattern as well.  The idea is to create a series of goroutines that you can string together through channels.  This allows you to mix and match (compose) small functions to build the final result you want.  Its like using the ‘|’ operator in Unix.  For this example I’m going to take a few different show commands I want to run, create pipelined functions out of them, then string them together to pull down the final result I want.

For this example I will go grab the show version and show running-config of a series of Arista switches.  I’ve defined a json file to store the switch names and connection information.  Here is a short function to read in that file and parse the JSON data:

func readSwitches(filename string) []EosNode {
	var switches []EosNode
 
	file, err := os.Open("switches.json")
	if err != nil {
		panic(err)
	}
	decoder := json.NewDecoder(file)
	err = decoder.Decode(&switches)
	if err != nil {
		panic(err)
	}
	return switches
}

To store all the information I created a struct with fields for the relevant data (there are some extra fields here for future use):

type EosNode struct {
	Hostname      string
	MgmtIp        string
	Username      string
	Password      string
	Ssl           bool
	Reachable     bool
	ConfigCorrect bool
	Uptime        float64
	Version       string
	Config        string
	IntfConnected []string
	IpIntf        []string
	Vlans         []string
}

Now I start writing my functions.  There are three types of functions that we need.  First I will write a producer that starts the whole thing off by generating channels for each switch (in this case it will be EosNodes).  Then intermediate functions will take actions on those channels, and return a new channel with an EosNode.  Finally the consumer will take the channels and produce the final result.

The producer (or generator) will take a list of EosNodes, then kick off goroutines for each switch and tie them into the out channel, which I return from the function:

func genSwitches(nodes []EosNode) <-chan EosNode {
	out := make(chan EosNode)
	go func() {
		for _, node := range nodes {
			out <- node
		}
		close(out)
	}()
	return out
}

Now the intermediate functions that receive EosNodes from the channel, runs the eAPI call to fill in more data, then returns a new outbound channel with the new data populated in the EosNode:

func getConfigs(in <-chan EosNode) <-chan EosNode {
	out := make(chan EosNode)
	go func() {
		for n := range in {
			cmds := []string{"enable", "show running-config"}
			url := buildUrl(n)
			response := eapi.Call(url, cmds, "text")
			config := response.Result[1]["output"].(string)
			n.Config = config
			out <- n
		}
		close(out)
	}()
	return out
}
 
func getVersion(in <-chan EosNode) <-chan EosNode {
	out := make(chan EosNode)
	go func() {
		for n := range in {
			cmds := []string{"show version"}
			url := buildUrl(n)
			response := eapi.Call(url, cmds, "json")
			version := response.Result[0]["version"].(string)
			n.Version = version
			out <- n
		}
		close(out)
	}()
	return out
}

Note: I had a small helper function in there called buildUrl to create the eAPI URL.

Finally the consumer (or sink) in this case is just a for loop in main() that grabs the results from the channel:


	for i := 0; i < len(switches); i++ {
		node := <-out
		fmt.Println(node)
}

This comes after I call my functions, so the whole main() function looks like this:

func main() {
	swFilePtr := flag.String("swfile", "switches.json", "A JSON file with switches to fetch")
	flag.Parse() // command-line flag parsing
	switches := readSwitches(*swFilePtr)
 
	fmt.Println("############# Using Pipelines ###################")
	c1 := genSwitches(switches)
	c2 := getConfigs(c1)
	out := getVersion(c2)
	for i := 0; i < len(switches); i++ {
		node := <-out
		fmt.Println(node)
	}
}

In the above I start with the producer that creates a channel c1, then getConfigs takes that and produces a new channel c2 after processing. c2 is then fed into getVersion to produce yet another channel. Finally we consume it all. If I were to add more functions, I could keep chaining those channels together to grab all kinds of data from the switches. Here’s the complete program:


package aristalabstatus
import (
"encoding/json"
"flag"
"fmt"
"github.com/fredhsu/eapigo"
"io/ioutil"
"net/http"
"os"
)
type EosNode struct {
Hostname string
MgmtIp string
Username string
Password string
Ssl bool
Reachable bool
ConfigCorrect bool
Uptime float64
Version string
Config string
IntfConnected []string
IpIntf []string
Vlans []string
}
func readSwitches(filename string) []EosNode {
var switches []EosNode
file, err := os.Open("switches.json")
if err != nil {
panic(err)
}
decoder := json.NewDecoder(file)
err = decoder.Decode(&switches)
if err != nil {
panic(err)
}
return switches
}
func genSwitches(nodes []EosNode) <-chan EosNode {
out := make(chan EosNode)
go func() {
for _, node := range nodes {
out <- node
}
close(out)
}()
return out
}
func buildUrl(node EosNode) string {
prefix := "http"
if node.Ssl == true {
prefix = prefix + "s"
}
url := prefix + "://" + node.Username + ":" + node.Password + "@" + node.Hostname + "/command-api"
return url
}
func getConfigs(in <-chan EosNode) <-chan EosNode {
out := make(chan EosNode)
go func() {
for n := range in {
cmds := []string{"enable", "show running-config"}
url := buildUrl(n)
response := eapi.Call(url, cmds, "text")
config := response.Result[1]["output"].(string)
n.Config = config
out <- n
}
close(out)
}()
return out
}
func getVersion(in <-chan EosNode) <-chan EosNode {
out := make(chan EosNode)
go func() {
for n := range in {
cmds := []string{"show version"}
url := buildUrl(n)
response := eapi.Call(url, cmds, "json")
version := response.Result[0]["version"].(string)
n.Version = version
out <- n
}
close(out)
}()
return out
}
func getIntfConnected(in <-chan EosNode) <-chan EosNode {
out := make(chan EosNode)
go func() {
for n := range in {
cmds := []string{"show interfaces status connected"}
url := buildUrl(n)
response := eapi.Call(url, cmds, "json")
statuses := response.Result[0]["interfaceStatuses"].(map[string]interface{})
for status := range statuses {
n.IntfConnected = append(n.IntfConnected, status)
}
out <- n
}
close(out)
}()
return out
}
func getIpInterfaces(in <-chan EosNode) <-chan EosNode {
out := make(chan EosNode)
go func() {
for n := range in {
cmds := []string{"show ip interface"}
url := buildUrl(n)
response := eapi.Call(url, cmds, "json")
intfs := response.Result[0]["interfaces"].(map[string]interface{})
for intf := range intfs {
n.IntfConnected = append(n.IntfConnected, intf)
}
out <- n
}
close(out)
}()
return out
}
func switchesHandler(w http.ResponseWriter, r *http.Request) {
switches := readSwitches("switches.json")
c1 := genSwitches(switches)
c2 := getVersion(c1)
c2 = getIntfConnected(c2)
c2 = getIpInterfaces(c2)
output := []EosNode{}
for i := 0; i < len(switches); i++ {
node := <-c2
fmt.Println(node)
output = append(output, node)
}
b, err := json.Marshal(output)
if err != nil {
fmt.Println(err)
return
}
fmt.Fprintf(w, string(b))
}
func main() {
swFilePtr := flag.String("swfile", "switches.json", "A JSON file with switches to fetch")
flag.Parse() // command-line flag parsing
switches := readSwitches(*swFilePtr)
fmt.Println("############# Using Pipelines ###################")
c1 := genSwitches(switches)
c2 := getConfigs(c1)
c3 := getVersion(c2)
out := getIntfConnected(c3)
for i := 0; i < len(switches); i++ {
node := <-out
fmt.Print(node.Hostname + ": ")
fmt.Println(node.IntfConnected)
}
http.HandleFunc("/switches/", switchesHandler)
http.ListenAndServe(":8081", nil)
}

One thought on “More Go concurrency using pipelines with eAPI

Leave a comment