- use socket.makefile for the socket, it's so much easier
- don't put a file object in the queue when receiving, just the list of lines read - fix parsing of message in ManagerMsg: - don't ignore blank lines - put everything after an invalid header into data - in _receive_data: - read whole lines -- socket.makefile makes it possible - if an invalid header is encountered read until END COMMAND marker - put list of lines into the queue, not an artificial file object git-svn-id: https://pyst.svn.sourceforge.net/svnroot/pyst/pyst/trunk@5 01a3061f-1c3a-49da-a2a0-fa5697faa6a0develop
parent
f34ff5f31f
commit
df35521c5b
|
@ -81,33 +81,21 @@ class ManagerMsg(object):
|
||||||
|
|
||||||
def parse(self, response):
|
def parse(self, response):
|
||||||
"""Parse a manager message"""
|
"""Parse a manager message"""
|
||||||
response.seek(0)
|
|
||||||
|
|
||||||
data = []
|
data = []
|
||||||
|
for n, line in enumerate (response):
|
||||||
# read the response line by line
|
# all valid header lines end in \r\n
|
||||||
for line in response.readlines():
|
if not line.endswith ('\r\n'):
|
||||||
line = line.rstrip() # strip trailing whitespace
|
data.extend(response[n:])
|
||||||
|
break
|
||||||
if not line: continue # don't process if this is not a message
|
try:
|
||||||
|
k, v = (x.strip() for x in line.split(':',1))
|
||||||
# locate the ':' in our message, if there is one
|
self.headers[k] = v
|
||||||
if line.find(':') > -1:
|
except ValueError:
|
||||||
item = [x.strip() for x in line.split(':',1)]
|
# invalid header, start of multi-line data response
|
||||||
|
data.extend(response[n:])
|
||||||
# if this is a header
|
break
|
||||||
if len(item) == 2:
|
self.data = ''.join(data)
|
||||||
# store the header
|
|
||||||
self.headers[item[0]] = item[1]
|
|
||||||
# otherwise it is just plain data
|
|
||||||
else:
|
|
||||||
data.append(line)
|
|
||||||
# if there was no ':', then we have data
|
|
||||||
else:
|
|
||||||
data.append(line)
|
|
||||||
|
|
||||||
# store the data
|
|
||||||
self.data = '%s\n' % '\n'.join(data)
|
|
||||||
|
|
||||||
def has_header(self, hname):
|
def has_header(self, hname):
|
||||||
"""Check for a header"""
|
"""Check for a header"""
|
||||||
|
@ -248,9 +236,10 @@ class Manager(object):
|
||||||
clist.append(EOL)
|
clist.append(EOL)
|
||||||
command = EOL.join(clist)
|
command = EOL.join(clist)
|
||||||
|
|
||||||
# lock the soket and send our command
|
# lock the socket and send our command
|
||||||
try:
|
try:
|
||||||
self._sock.sendall(command)
|
self._sock.write(command)
|
||||||
|
self._sock.flush()
|
||||||
except socket.error, (errno, reason):
|
except socket.error, (errno, reason):
|
||||||
raise ManagerSocketException(errno, reason)
|
raise ManagerSocketException(errno, reason)
|
||||||
|
|
||||||
|
@ -268,65 +257,52 @@ class Manager(object):
|
||||||
Read the response from a command.
|
Read the response from a command.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
multiline = False
|
||||||
# loop while we are sill running and connected
|
# loop while we are sill running and connected
|
||||||
while self._running.isSet() and self._connected.isSet():
|
while self._running.isSet() and self._connected.isSet():
|
||||||
|
|
||||||
lines = []
|
|
||||||
try:
|
try:
|
||||||
try:
|
lines = []
|
||||||
# if there is data to be read
|
for line in self._sock :
|
||||||
# read a message
|
# check to see if this is the greeting line
|
||||||
while self._connected.isSet():
|
if not self.title and '/' in line and not ':' in line:
|
||||||
line = []
|
# store the title of the manager we are connecting to:
|
||||||
|
self.title = line.split('/')[0].strip()
|
||||||
# read a line, one char at a time
|
# store the version of the manager we are connecting to:
|
||||||
while self._connected.isSet():
|
self.version = line.split('/')[1].strip()
|
||||||
c = self._sock.recv(1)
|
# fake message header
|
||||||
|
lines.append ('Response: Generated Header\r')
|
||||||
if not c: # the other end closed the connection
|
lines.append (line)
|
||||||
self._sock.close()
|
break
|
||||||
self._connected.clear()
|
# if the line is EOL marker we have a complete message
|
||||||
break
|
if line == EOL and not multiline:
|
||||||
|
if lines or not self._connected.isSet():
|
||||||
line.append(c) # append the character to our line
|
|
||||||
|
|
||||||
# is this the end of a line?
|
|
||||||
if c == '\n':
|
|
||||||
line = ''.join(line)
|
|
||||||
break
|
|
||||||
|
|
||||||
# if we are no longer connected we probably did not
|
|
||||||
# recieve a full message, don't try to handle it
|
|
||||||
if not self._connected.isSet(): break
|
|
||||||
|
|
||||||
# make sure our line is a string
|
|
||||||
assert type(line) in StringTypes
|
|
||||||
|
|
||||||
lines.append(line) # add the line to our message
|
|
||||||
|
|
||||||
# if the line is our EOL marker we have a complete message
|
|
||||||
if line == EOL:
|
|
||||||
break
|
break
|
||||||
|
# ignore empty lines at start
|
||||||
# check to see if this is the greeting line
|
continue
|
||||||
if not self.title and line.find('/') >= 0 and line.find(':') < 0:
|
lines.append(line)
|
||||||
self.title = line.split('/')[0].strip() # store the title of the manager we are connecting to
|
# line not ending in \r\n or without ':' isn't a
|
||||||
self.version = line.split('/')[1].strip() # store the version of the manager we are connecting to
|
# valid header and starts multiline response
|
||||||
break
|
if not line.endswith('\r\n') or ':' not in line:
|
||||||
|
multiline = True
|
||||||
#sleep(.001) # waste some time before reading another line
|
# same when seeing end of multiline response
|
||||||
|
if multiline and line.startswith('--END COMMAND--'):
|
||||||
except socket.error:
|
multiline = False
|
||||||
|
if not self._connected.isSet():
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# EOF during reading
|
||||||
self._sock.close()
|
self._sock.close()
|
||||||
self._connected.clear()
|
self._connected.clear()
|
||||||
break
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# if we have a message append it to our queue
|
# if we have a message append it to our queue
|
||||||
if lines and self._connected.isSet():
|
if lines and self._connected.isSet():
|
||||||
self._message_queue.put(StringIO(''.join(lines)))
|
self._message_queue.put(lines)
|
||||||
else:
|
else:
|
||||||
self._message_queue.put(None)
|
self._message_queue.put(None)
|
||||||
|
except socket.error:
|
||||||
|
self._sock.close()
|
||||||
|
self._connected.clear()
|
||||||
|
self._message_queue.put(None)
|
||||||
|
|
||||||
|
|
||||||
def register_event(self, event, function):
|
def register_event(self, event, function):
|
||||||
"""
|
"""
|
||||||
|
@ -393,7 +369,7 @@ class Manager(object):
|
||||||
|
|
||||||
|
|
||||||
def event_dispatch(self):
|
def event_dispatch(self):
|
||||||
"""This thread is responsible fore dispatching events"""
|
"""This thread is responsible for dispatching events"""
|
||||||
|
|
||||||
# loop dispatching events
|
# loop dispatching events
|
||||||
while self._running.isSet():
|
while self._running.isSet():
|
||||||
|
@ -428,8 +404,10 @@ class Manager(object):
|
||||||
|
|
||||||
# create our socket and connect
|
# create our socket and connect
|
||||||
try:
|
try:
|
||||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
self._sock.connect((host,port))
|
_sock.connect((host,port))
|
||||||
|
self._sock = _sock.makefile ()
|
||||||
|
_sock.close ()
|
||||||
except socket.error, (errno, reason):
|
except socket.error, (errno, reason):
|
||||||
raise ManagerSocketException(errno, reason)
|
raise ManagerSocketException(errno, reason)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue