četvrtak, 9. lipnja 2011.

Parallel processing with expect and pexpect

Whether we change configuration on few hundreds devices, monitor services on remote devices, or just fetching data from remote devices, expect is tool that we could and probably will use. Expect is extension of the Tcl scripting language and inherits syntax and commands from Tcl. There are also alternative modules for other programming languages. Pexpect, python expect-like module is one of them.

Our task will be to connect to 15000 cpe devices and fetch serial number from each of them. Our final result must be list of serial numbers with IP addresses, and number of devices which we failed to connect to. The accent is to do that in parallel.  We will show the how to do that in tcl's expect and python's pexpect. We will get to know differences between two of the tools, and see pros and cons of using both tools.

Expect implementation

When using expect, we must be aware that we can't use threads. Expect and threads just don't work together. Instead, we will fork number of processes which will execute the task. There is one more catch, if we use expect fork function, our Tcl interpreter may not be thread enabled. To check whether your Tcl interpreter is thread enabled, check the existence of tcl_platform(threaded)  variable:
% array get tcl_platform
osVersion 2.6.32-26-generic pointerSize 4 byteOrder littleEndian threaded 1 machine i686 platform unix os Linux user nikola wordSize 4

On some platforms, like Ubuntu Linux, if you installed Tcl from packages, your Tcl installation will be thread enabled. In that case, you should reinstall Tcl from source (or install it in another directory). 
Idea is to fork number of children, and feed them with IP addresses. Children are connecting to the parent, fetching the serial numbers and then returning results to the parent process. Parent and children are communicating using TCP/IP sockets, one socket for each direction. Here is the source:
#!/usr/bin/expect -f
#
#Open 2 server sockets, and wait for incoming connections.
#
set listenport1 1498
set listenport2 1499
set serversock1 [socket -server server_accept1 $listenport1]
set serversock2 [socket -server server_accept2 $listenport2]
set maxchildren 200
set timeout 10


proc server_accept1 {sock addr port} {
    fconfigure $sock -buffering line -blocking 0
    fileevent $sock r [list parent_IPdealer $sock]
}

proc server_accept2 {sock addr port} {
    fconfigure $sock -buffering none -blocking 0
    fileevent $sock r [list parent_readresult $sock]
}

#
#Procedure which feeds children with IP addresses. When finished
#send 0 to signal client/child to terminate.
#
proc parent_IPdealer {sock} {
    global targets
    global pending
    if {[eof $sock] || [catch {gets $sock line}]} {
        puts "Socket $sock closed"
        close $sock
        return
    }
    if {[lindex $line 0] == "GET"} {
        if {$targets != ""} {
            set ip [lindex $targets 0]
            lappend pending $ip
            set targets [lrange $targets 1 end]
            puts $sock $ip
        } else {
            puts $sock 0
            catch {close $sock}
        }
    }
}

#
#Parent procedure, reads results from children.
#
proc parent_readresult {sock} {
    global targets
    global pending
    global finished
    if {[eof $sock] || [catch {gets $sock line}]} {
        close $sock
        return
    }
    if {[set ip [lindex $line 0]] != ""} {
        puts "Result = $line"
        set ip [lindex $line 0]
        set result [lindex $line 1]
        set position [lsearch $pending $ip]
        if {$position != -1} {
            set pending [lreplace $pending $position $position]
            set finished($ip) [lindex $result 1]
        }
    }
}

#
#Retrieves IP addresses on port $listenport1 and sends
#result to parent process on $listenport2.
#
proc child_body {i} {
    global listenport1 listenport2
    set sock1 [socket localhost $listenport1]
    fconfigure $sock1 -buffering line -blocking 1
    set sock2 [socket localhost $listenport2]
    fconfigure $sock2 -buffering line -blocking 1
    while {[eof $sock1] == 0} {
        puts $sock1 "GET"
        gets $sock1 ip
        if {$ip != 0 && $ip != ""} {
            set result [child_run $ip]
        } elseif {$ip == 0} {
            catch {close $sock1}
            catch {close $sock2}
            exit
        }
        puts $sock2 "$ip $result"
    }
    catch {close $sock2}
    exit
}

proc child_run {ip} {
    global spawn_id
    set result ""
    log_file logs/$ip
    set pid [spawn telnet $ip]
    if {![set result [login_CPE $ip]]} {
#Here you write your exe functions
        set sn [getSN]
        set result "$sn"
    } else {
        set result -1
    }
    log_file
    catch {close}
    catch {wait -nowait}
    return $result
}

#
#Login to the cpe. Returns 0 on success, or negative value if failed.
#
proc login_CPE {ip} {
    global spawn_id
    set user "admin"
    set pw "admin"
    expect {
        timeout        {return -1}
        "Username :"  {send "$user\r"}
    }
    if { [catch {
    expect {
        "Password :"  {send "$pw\r"}
    }}] } {
        return -1
    }
    expect {
        timeout {return -1}
        "=>"  {return 0}
        "Wrong username/password" { return -2 }
    }
}

#
#Get serial number. Returns serial number or -1 on failure.
#
proc getSN {} {
    send "env get var _PROD_SERIAL_NBR \r"
    expect {
        timeout {return -1}
        "=>"  {set output $expect_out(buffer) }
    }
    set sn [lindex [lindex [split $output "\n"] end-1] 0]
    return $sn
}

#
#Start children and wait that they finish.
#
proc start_children {} {
    global targets
    global pending
    global finished
    global maxchildren
    global serversock1 serversock2
    if {[set n [llength $targets]] > $maxchildren} {
        set n $maxchildren
    }
    set start [timestamp]
    for {set i 0} {$i < $n} {incr i} {
        if {[set pid [fork]] == 0} {
            close $serversock1
            close $serversock2
            disconnect
            child_body $i
        } else {
            lappend child_pids $pid
        }
    }
    while {$targets != "" || $pending != ""} {
        vwait finished
    }
    set finish [timestamp]
    catch {close $serversock1}
    catch {close $serversock2}
    set duration [expr $finish - $start]
    puts "Execution lasted for $duration seconds."
}


#
#Cpe's are listed in a flat file called targets.
#
set fileid [open "targets" r]
while {![eof $fileid]} {
    set ip [gets $fileid]
    lappend targets $ip
}

close $fileid

start_children

Python pexpect implementation

Python doesn't have thread limitations as Expect, so we will use threads. With threads, we don't have to take into account interproces communication, so we will benefit from the simpler script design. Similar as in expect example, we will create multiple threads, and assign them tasks to fetch serial numbers. In python's language it looks like this:
#!/usr/bin/python

import threading
import Queue
import pexpect
import time
maxthread = 50

class CpeConnect(threading.Thread):
    user = 'admin' + '\r\n'
    pw = 'admin' + '\r\n'
    def run(self):
        while True:
            try:
                ip = queue.get(True, 1)
            except Queue.Empty:
                continue
            if ip == 0:
                break

            f = self.login_cpe(ip)
            if f == 0:
                SN = self.getSN()
                print ip + ' ' + str(SN)
                return_queue.put([ip, SN])
            else:
                return_queue.put([ip, f])
                continue
               self.child.logfile=None
            self.child.close()


    def login_cpe(self, ip):
        self.child = pexpect.spawn ('telnet %s'%(ip))
        f = "logpy/" + ip
        fd = file(f, 'w')
        self.child.logfile = fd
        try:
            indx = self.child.expect('Username :')
            self.child.send (self.user)
            self.child.expect ('Password :')
            self.child.send (self.pw)
            self.child.expect("=>")
        except pexpect.TIMEOUT:
            self.child.close()
            return -1
        except pexpect.EOF:
            self.child.close()
            return -1
        return 0

    def getSN(self):
        self.child.send ("env get var _PROD_SERIAL_NBR \r\n")
        try:
            self.child.expect('=>')
            data = self.child.before
            sn = data.split('\r\n')[1]
        except pexpect.TIMEOUT:
            return -1
        return sn

targets = []
queue = Queue.Queue(0)
return_queue = Queue.Queue(0)
threadlist = []
f = fopen('targets', 'r')

for ip in f:
    targets.append(ip)

start = time.time()
num_of_threads = maxthread
if len(targets) < maxthread:
    num_of_threads = len(targets)

#
# Start num_of_threads threads (without arguments)
#
for x in range(num_of_threads):
    t = CpeConnect()
    threadlist.append(t)
    t.start()

#
# Supply threads with targets
#
for ip in targets:
    queue.put(ip)

#
# 0,0 is signal to the thread to finish
#
for t in threadlist:
    queue.put([0, 0])

#
# Wait threads to finish
#
for t in threadlist:
    t.join()
    print "Thread " + t.name + " finished!"

print "\n\n----------------------------------------\n\n"
for x in range(len(targets)):
    ip, sn = return_queue.get()
    print "Result = %s %s "%(ip, sn)

finish = time.time()
interval = finish - start
print "Execution lasted for " + str(interval) + " seconds!"


Expect vs Pexpect. The winner is:

Expect! Test was to get serial number from exact 14322 cpe's. Expect script has done the job in 293 seconds. It retrieved serial numbers from 12718 cpe's, 1604 cpe's were unavailable.
Python's pexpect is just not in the same category. Execution lasted for more than 3 hours and it wasn't finished. Some readers will notice that python implementation has not the same parameters as expect implementation. Maximum number of processes in expect was 200, and max number of threads in python implementation was 50. Furthermore, default timeout value for pexpect is 30 seconds, and in expect is 10 seconds. Idea was to compare tools with the same configuration parameters, but pexpect script just fails to login to the cpe with 200 threads or lower timeout value; it throws TIMEOUT exception while waiting for login prompt. On my server, number of 50 threads was maximum value I could set, and that my script is still working. More threads simply suffocate the resources and disable script.

Nema komentara:

Objavi komentar