'''
This module implements the real-time transport protocol (RTP) and companion real-time
transport control protocol (RTCP) based on RFC 3550.
The RTP and RTCP classes define the packet format for RTP and RTCP.
The Session class defines the control behavior for an RTP session.
The Source class represents a member or source.
The Network class abstracts out the network behavior such as pair of sockets.
'''
import struct, random, math, time, socket
from kutil import getlocaladdr
_debug = False
'''
return the data as list string representing binary form of the characted in data.
>>> print binstr('\\x01\\x02\\x03\\x04\\x05\\x06\\x07')
00000001000000100000001100000100
000001010000011000000111--------
'''
def binary(data, size=4):
all = ''.join([''.join([('1' if (ord(x) & (1<<(7-y))) else '0') for y in range(0, 8)]) for x in data])
result, size = [], size*8
while len(all) >= size:
result.append(all[:size])
all = all[size:]
if len(all)>0:
result.append(all + '-'*(size-len(all)))
return result
binstr = lambda x: '\n'.join(binary(x))
class RTP(object):
'''A RTP packet.
>>> p1 = RTP(pt=8, seq=12, ts=13, ssrc=14, csrcs=[15, 16], marker=True, extn=(17, '\\x18\\x19\\x1a\\x1b'), payload='\\x1c\\x1d\\x1e')
>>> print ''.join(['%02x'%ord(x) for x in str(p1)])
b288000c0000000d0000000e0000000f000000100011000118191a1b1c1d1e01
>>> p2 = RTP(value=str(p1))
>>> print p2.pt, p2.seq, p2.ts, p2.ssrc, p2.csrcs, p2.marker, p2.extn, repr(p2.payload)
8 12 13 14 [15, 16] True (17, '\\x18\\x19\\x1a\\x1b') '\\x1c\\x1d\\x1e'
>>> print '\\n'.join(binary(str(p2)))
10110010100010000000000000001100
00000000000000000000000000001101
00000000000000000000000000001110
00000000000000000000000000001111
00000000000000000000000000010000
00000000000100010000000000000001
00011000000110010001101000011011
00011100000111010001111000000001
'''
def __init__(self, value=None, pt=0, seq=0, ts=0, ssrc=0, csrcs=[], marker=False, extn=None, payload=''):
'''Construct a RTP packet from individual components: pt a payload type [0, 128),
seq a 16 bit unsigned sequence number, ts a 32 bit unsigned timestamp, ssrc a
32 bit source identifier, csrcs a list of 32-bit contributing source identifiers
with max size of 15, marker a boolean, extn a tuple of (type, value) for the header
extension and payload is the RTP payload data.
Alternatively, if value is specified, then construct the RTP packet by parsing the
value.'''
if not value:
self.pt, self.seq, self.ts, self.ssrc, self.csrcs, self.marker, self.extn, self.payload = \
pt, seq, ts, ssrc, csrcs, marker, extn, payload
else:
if len(value) < 12: raise ValueError, 'RTP packet must be at least 12 bytes'
if ord(value[0]) & 0xC0 != 0x80: raise ValueError, 'RTP version must be 2'
px, mpt, self.seq, self.ts, self.ssrc = struct.unpack('!BBHII', value[:12])
self.marker, self.pt = (mpt & 0x80 and True or False), (mpt & 0x7f)
self.csrcs, value = ([] if (px & 0x0f == 0) else list(struct.unpack('!'+'I'*(px&0x0f), value[12:12+(px&0x0f)*4]))), value[12+(px & 0x0f)*4:]
if px & 0x10:
xtype, xlen = struct.unpack('!HH', value[:4])
self.extn, value = (xtype, value[4:4+xlen*4]), value[4+xlen*4:]
else: self.extn = None
self.payload = value if px & 0x20 == 0 else value[:len(value)-ord(value[-1])]
def __repr__(self):
return struct.pack('!BBHII', 0x80 | ((len(self.payload)%4 != 0) and 0x20 or 0x00) | (self.extn and 0x10 or 0x00) | (len(self.csrcs) > 15 and 15 or len(self.csrcs)), \
(self.pt & 0x7f) | (self.marker and 1 or 0) << 7, (self.seq & 0xffff), self.ts, self.ssrc) \
+ ''.join(struct.pack('!I', x) for x in self.csrcs[:16]) \
+ ('' if not self.extn else (struct.pack('!HH', self.extn[0] & 0xffff, len(self.extn[1])/4) + self.extn[1])) \
+ self.payload \
+ ('' if (len(self.payload) % 4 == 0) else ('\x00'*(4-len(self.payload)%4-1) + struct.pack('!B', 4-len(self.payload)%4)))
class RTCP(list):
'''A compound RTCP packet is a list of individual RTCP packets. A individual RTCP
packet is of type RTCP.packet with attributes or items defined depending on the type
of the packet.
>>> sr = RTCP.packet(pt=RTCP.SR, ssrc=1, ntp=2, ts=3, pktcount=4, octcount=5, reports=[], extn=None)
>>> r1 = RTCP.packet(ssrc=1, flost=2, clost=3, hseq=4, jitter=5, lsr=6, dlsr=7)
>>> r2 = RTCP.packet(ssrc=8, flost=9, clost=10, hseq=11, jitter=12, lsr=13, dlsr=14)
>>> rr = RTCP.packet(pt=RTCP.RR, ssrc=1, reports=[r1, r2])
>>> item1 = (1, [(RTCP.CNAME, 'kundan@example.net'), (RTCP.NAME, 'Kundan Singh'), (RTCP.EMAIL, 'kundan@example.net'), (RTCP.PHONE, '9176216392')])
>>> item2 = (2, [(RTCP.CNAME, 'sanjayc77@example.net'), ])
>>> sdes = RTCP.packet(pt=RTCP.SDES, items=[item1, item2])
>>> bye = RTCP.packet(pt=RTCP.BYE, ssrcs=[1,2,3], reason='disconnecting')
>>> p1 = RTCP([sr, rr, sdes, bye])
>>> p2 = RTCP(str(p1))
>>> sr, rr, sdes, bye = tuple(p2)
>>> print sr.pt, sr.ssrc, sr.ntp, sr.ts, sr.pktcount, sr.octcount
200 1 2.0 3 4 5
>>> print rr.pt, rr.ssrc, [(x.ssrc, x.flost, x.clost, x.hseq, x.jitter, x.lsr, x.dlsr) for x in rr.reports]
201 1 [(1, 2, 3, 4, 5, 6, 7), (8, 9, 10, 11, 12, 13, 14)]
>>> print sdes.pt
202
>>> for item in sdes.items:
... print 'ssrc=', item[0]
... for n,v in item[1]: print '',n,'=',v
ssrc= 1
1 = kundan@example.net
2 = Kundan Singh
3 = kundan@example.net
4 = 9176216392
ssrc= 2
1 = sanjayc77@example.net
>>> print bye.pt, bye.ssrcs, bye.reason
203 [1, 2, 3] disconnecting
'''
SR, RR, SDES, BYE, APP = range(200, 205)
CNAME, NAME, EMAIL, PHONE, LOC, TOOL, NOTE, PRIV = range(1, 9)
def __init__(self, value=None):
if isinstance(value, list):
for v in value: self.append(v)
return
while value and len(value)>0:
p = RTCP.packet()
px, p.pt, plen = struct.unpack('!BBH', value[:4])
if px & 0xC0 != 0x80: raise ValueError, 'RTP version must be 2'
if p.pt < 200 or p.pt >= 205: raise ValueError, 'Not an RTCP packet type %d'%(p.pt)
data, value = value[4:4+plen*4], value[4+plen*4:]
if px & 0x20: data = data[:len(data)-ord(data[-1])]
if p.pt == RTCP.SR or p.pt == RTCP.RR:
if p.pt == RTCP.SR:
p.ssrc, ntp1, ntp2, p.ts, p.pktcount, p.octcount = struct.unpack('!IIIIII', data[:24])
p.ntp = ntp2time((ntp1, ntp2))
data = data[24:]
else:
p.ssrc, = struct.unpack('!I', data[:4])
data = data[4:]
p.reports = []
for i in range(px&0x1f):
r = RTCP.packet()
r.ssrc, lost, r.hseq, r.jitter, r.lsr, r.dlsr = struct.unpack('!IIIIII', data[:24])
r.flost, r.clost = (lost >> 24) & 0x0ff, (lost & 0x0ffffff)
p.reports.append(r)
data = data[24:]
p.extn = data if data else None
elif p.pt == RTCP.SDES:
p.items = []
for i in range(0, px&0x1f):
ssrc, = struct.unpack('!I', data[:4])
items = []
data, count = data[4:], 0
while len(data)>0:
itype, ilen = struct.unpack('!BB', data[:2])
count += (2 + ilen)
ivalue, data = data[2:2+ilen], data[2+ilen:]
if itype == 0: break
items.append((itype, ivalue))
if count % 4 != 0: data = data[(4-count%4):]
p.items.append((ssrc, items))
elif p.pt == RTCP.BYE:
p.ssrcs, p.reason = [], None
for i in range(0, px & 0x01f):
ssrc, = struct.unpack('!I', data[:4])
p.ssrcs.append(ssrc)
data = data[4:]
if data and len(data)>0:
rlen, = struct.unpack('!B', data[:1])
p.reason = data[1:1+rlen]
elif p.pt == RTCP.APP:
p.subtype = px&0x1f
p.ssrc, p.name = struct.unpack('!I4s', data[:8])
p.data = data[8:]
if not p.data: p.data = None
else:
p.subtype = px&0x1f
p.data = data[4:]
self.append(p)
def __str__(self):
result = ''
for p in self:
count, value = 0, ''
if p.pt == RTCP.SR or p.pt == RTCP.RR:
if p.pt == RTCP.SR:
ntp1, ntp2 = time2ntp(p.ntp)
value = struct.pack('!IIIIII', p.ssrc, ntp1, ntp2, p.ts, p.pktcount, p.octcount)
else: value = struct.pack('!I', p.ssrc)
count = len(p.reports)
for r in p.reports:
value += struct.pack('!IIIIII', r.ssrc, (r.flost << 24) | (r.clost & 0x0ffffff), r.hseq, r.jitter, r.lsr, r.dlsr)
if p.extn: value += p.extn
elif p.pt == RTCP.SDES:
count = len(p.items)
for ssrc,items in p.items:
chunk = struct.pack('!I', ssrc)
for n,v in items:
chunk += struct.pack('!BB', n, len(v)>255 and 255 or len(v)) + v[:256]
chunk += struct.pack('!BB', 0, 0)
if len(chunk)%4!=0: chunk += '\x00'*(4-len(chunk)%4)
value += chunk
elif p.pt == RTCP.BYE:
count = len(p.ssrcs)
for ssrc in p.ssrcs: value += struct.pack('!I', ssrc)
if p.reason and len(p.reason)>0: value += struct.pack('!B', len(p.reason)>255 and 255 or len(p.reason)) + p.reason[:256]
elif p.pt == RTCP.APP:
count = p.subtype
value += struct.pack('!I4s', p.ssrc, p.name) + (p.data if p.data else '')
else:
count = p.subtype
value += p.data
length = len(value)/4 + (1 if len(value)%4 != 0 else 0)
result += struct.pack('!BBH', 0x80 | (len(value)%4 != 0 and 0x20 or 0x00) | (count & 0x1f), p.pt, length) \
+ value + ('' if (len(value) % 4 == 0) else ('\x00'*(4-len(value)%4-1) + struct.pack('!B', 4-len(value)%4)))
return result
class packet(object):
'''A generic class for individual packet or report. It exposes both container and
attribute interface.'''
def __init__(self, **kwargs):
for n,v in kwargs.items(): self[n] = v
def __getattr__(self, name): return self.__getitem__(name)
def __getitem__(self, name): return self.__dict__.get(name, None)
def __setitem__(self, name, value): self.__dict__[name] = value
def __contains__(self, name): return name in self.__dict__
RTP_SEQ_MOD = (1<<16)
MAX_DROPOUT = 3000
MAX_MISORDER = 100
MIN_SEQUENTIAL = 2
class Source(object):
'''A source in a RTP-based Session. This is used to represent both the local member
as well as the remote members. The SSRC and SDES's CNAME must be unique in a session.
'''
def __init__(self, ssrc, items=[], address=None):
'''Create a new member for the given SSRC.
>>> m = Source(1, [(RTCP.CNAME, 'kundan@example.net'), (RTCP.NAME, 'Kundan Singh')], ('127.0.0.1', 8000))
>>> print m
<Source ssrc=1 items=[(1, 'kundan@example.net'), (2, 'Kundan Singh')] address=('127.0.0.1', 8000) lost=0 fraction=0 pktcount=0 octcount=0 maxseq=0 badseq=0 cycles=0 baseseq=0 probation=0 received=0 expectedprior=0 receivedprior=0 transit=0 jitter=0 lastts=None lastntp=None rtcpdelay=None>
'''
self.ssrc, self.items, self.address = ssrc, items, address
self.lost = self.fraction = self.pktcount = self.octcount = self.timeout = 0
self.maxseq = self.badseq = self.cycles = self.baseseq = self.probation = self.received = self.expectedprior = self.receivedprior = self.transit = self.jitter = 0
self.lastts = self.lastntp = self.rtcpdelay = None
def __repr__(self):
props = ('ssrc', 'items', 'address', 'lost', 'fraction', 'pktcount', 'octcount', \
'maxseq', 'badseq', 'cycles', 'baseseq', 'probation', 'received', \
'expectedprior', 'receivedprior', 'transit', 'jitter', 'lastts', \
'lastntp', 'rtcpdelay')
return ('<Source ' + ' '.join([p+'=%r' for p in props]) + '>')%tuple([(eval('self.%s'%p)) for p in props])
def initseq(self, seq):
'''Initialize the seq using the newly received seq of RTP packet.
>>> print Source(ssrc=1).initseq(10)
<Source ssrc=1 items=[] address=None lost=0 fraction=0 pktcount=0 octcount=0 maxseq=10 badseq=9 cycles=0 baseseq=10 probation=0 received=0 expectedprior=0 receivedprior=0 transit=0 jitter=0 lastts=None lastntp=None rtcpdelay=None>
'''
self.baseseq = self.maxseq = seq
self.badseq = seq - 1
self.cycles = self.received = self.receivedprior = self.expectedprior = 0
return self
def newfound(self, seq):
'''Indicate that this source is newly found and added to members table.
>>> print Source(ssrc=1).newfound(10)
<Source ssrc=1 items=[] address=None lost=0 fraction=0 pktcount=0 octcount=0 maxseq=9 badseq=9 cycles=0 baseseq=10 probation=2 received=0 expectedprior=0 receivedprior=0 transit=0 jitter=0 lastts=None lastntp=None rtcpdelay=None>
'''
self.initseq(seq)
self.maxseq, self.probation = seq-1, MIN_SEQUENTIAL
return self
def updateseq(self, seq):
'''Update the source properties based on received RTP packet's seq.
>>> print Source(1).newfound(10).updateseq(12).updateseq(13) # simulate loss of 11
<Source ssrc=1 items=[] address=None lost=0 fraction=0 pktcount=0 octcount=0 maxseq=13 badseq=12 cycles=0 baseseq=13 probation=0 received=1 expectedprior=0 receivedprior=0 transit=0 jitter=0 lastts=None lastntp=None rtcpdelay=None>
'''
udelta = seq - self.maxseq
if self.probation > 0:
if seq == self.maxseq+1:
self.probation, self.maxseq = self.probation - 1, seq
if self.probation == 0:
self.initseq(seq)
self.received = self.received + 1
return self
else:
self.probation, self.maxseq = MIN_SEQUENTIAL-1, seq
return self
elif udelta < MAX_DROPOUT:
if seq < self.maxseq: self.cycles += RTP_SEQ_MOD
self.maxseq = seq
elif udelta <= RTP_SEQ_MOD - MAX_MISORDER:
if seq == self.badseq: self.initseq(seq)
else:
self.badseq = (seq + 1) & (RTP_SEQ_MOD-1)
return self
self.received = self.received + 1
return self
def updatejitter(self, ts, arrival):
'''Update the jitter based on ts and arrival (in ts units).
>>> s = Source(1).newfound(10).updatejitter(1000, 0).updatejitter(1160, 160).updatejitter(1330, 320)
>>> print s.transit, int(s.jitter)
-1010 55
'''
transit = int(arrival - ts)
d, self.transit = int(math.fabs(transit - self.transit)), transit
self.jitter += (1/16.) * (d-self.jitter)
return self
def updatelostandexpected(self):
'''Update the number of packets expected and lost.
>>> s = Source(1).newfound(10).updateseq(11).updateseq(12).updateseq(14).updatelostandexpected() # similar loss of 13
>>> print s.lost, s.fraction, s.expectedprior, s.receivedprior
1 85 3 2
'''
extendedmax = self.cycles + self.maxseq
expected = extendedmax - self.baseseq + 1
self.lost = expected - self.received
expectedinterval = expected - self.expectedprior
self.expectedprior = expected
receivedinterval = self.received - self.receivedprior
self.receivedprior = self.received
lostinterval = expectedinterval - receivedinterval
if expectedinterval == 0 or lostinterval <= 0: self.fraction = 0
else: self.fraction = (lostinterval << 8) / expectedinterval
return self
def storereport(self, fraction, lost, jitter, delay):
self.fraction, self.lost, self.jitter, self.rtcpdelay = fraction, lost, jitter, delay
return self
def time2ntp(value):
'''Convert from time.time() output to NTP (sec, frac).
>>> print time2ntp(0.5)
(2208988800L, 2147483648L)
'''
value = value + 2208988800
return (int(value), int((value-int(value)) * 4294967296.))
def ntp2time(value):
'''Convert from NTP (sec, frac) to time similar to time.time() output.
>>> print ntp2time(time2ntp(10.5))
10.5
'''
return (value[0] + value[1] / 4294967296.) - 2208988800
class Session(object):
'''A RTP session.'''
def __init__(self, app, **kwargs):
'''Start an RTP session for the given network with additional optional keyword
arguments such as pt, rate, bandwidth, fraction, member, ssrc, cname, seq0, ts0.
@param pt: the optional payload type, default 96.
@param rate: the optional sampling rate, default 8000.
@param bandwidth: the optional total session bandwidth, default 64000.
@param fraction: the optional fraction to use for RTCP, default 0.05.
@param member: the optional Source object for this member, default constructs a new.
@param ssrc: if member is absent, then optional SSRC for Source, default a random number.
@param cname: if member is absent, then optional CNAME for Source, default is ssrc@hostname.
@param seq0: the optional initial sequence number, default a random number.
@param ts0: the optional initial timestamp, default a random number.
'''
self.app, self.pt, self.rate, self.bandwidth, self.fraction, self.member = \
app, kwargs.get('pt', 96), kwargs.get('rate', 8000), kwargs.get('bandwidth', 64000), kwargs.get('fraction', 0.05), kwargs.get('member', None)
if not self.member:
ssrc = kwargs.get('ssrc', random.randint(0, 2**32))
cname = kwargs.get('cname', '%d@%s'%(ssrc, getlocaladdr()))
self.member = Source(ssrc=ssrc, items=[(RTCP.CNAME, cname)])
self.seq0, self.ts0 = kwargs.get('seq0', self.randint(0, 2**16)), kwargs.get('ts0', self.randint(0, 2**32))
self.seq = self.ts = self.ts1 = 0
self.ntp = self.ntp1 = self.tc
self.rtpsent = self.rtcpsent = self.byesent = self.running = False
self.tp = self.tn = 0
self.members, self.senders = dict(), dict()
self.pmembers = 0
self.rtcpbw = self.bandwidth*self.fraction
self.wesent, self.initial, self.avgrtcpsize = False, True, 200
def randint(self, low=0, high=0x100000000):
'''Return a random number between [low, high).'''
return random.randint(low, high)
@property
def tc(self):
'''The current time property in double.'''
return time.time()
@property
def tsnow(self):
'''The current RTP timestamp in ts unit based on current time.'''
return int(self.ts + (self.tc - self.ntp)*((self.ts - self.ts1) / (self.ntp - self.ntp1))) & 0xffffffff
def start(self):
'''Start the session, starts sending RTCP and RTP, as well as receiving them.'''
if self.running: return
self.senders.clear(); self.members.clear();
self.pmembers = 1
self.members[self.member.ssrc] = self.member
self.wesent = self.rtcpsent = False
delay = self.rtcpinterval(True)
self.tp, self.tn = self.tc, self.tc + delay
self.timer = timer = self.app.createTimer(self)
timer.start(delay*1000)
self.running = True
self.app.starting(self)
def stop(self, reason=''):
'''Stop or close the session, hence stops sending or receiving packets.'''
if not self.running: return
sendBye(reason=reason)
self.members.clear()
self.senders.clear()
self.pmembers = 0
if self.timer:
self.timer.stop()
self.timer = None
self.running = False
self.app.stopping(self)
def send(self, payload='', ts=0, marker=False, pt=None):
'''Send a RTP packet using the given payload, timestamp and marker.'''
member = self.member
member.pktcount = member.pktcount + 1
member.octcount = member.octcount + len(payload)
self.ts, self.ntp = ts, self.tc
if self.ts1 == 0: self.ts1 = ts
self.rtpsent = self.wesent = True
if pt is None: pt = self.pt
pkt = RTP(pt=pt, marker=marker, seq=self.seq0+self.seq, ts=self.ts0+ts, ssrc=member.ssrc, payload=payload)
self.app.sendRTP(pkt)
self.seq = self.seq + 1
def receivedRTP(self, data, src, dest):
'''Received an RTP packet on the network. Process it and invoke app.received() callback'''
p = RTP(data)
member = None
if p.ssrc not in self.members and self.running:
member = self.members[p.ssrc] = Source(ssrc=p.ssrc).newfound(p.seq)
elif self.running:
member = self.members[p.ssrc]
if p.ssrc not in self.senders and self.running:
self.senders[p.ssrc] = self.members[p.ssrc]
if member:
member.received = member.received + 1
member.timeout = 0
member.address = src
member.updateseq(p.seq)
member.updatejitter(p.ts, self.tsnow)
self.app.received(member, p)
def receivedRTCP(self, data, src, dest):
'''Received an RTCP packet on network. Process it locally.'''
for p in RTCP(data):
if p.pt == RTCP.SR or p.pt == RTCP.RR:
if p.ssrc not in self.members and self.running:
self.members[p.ssrc] = Source(ssrc=p.ssrc)
member = self.members[p.ssrc]
if p.pt == RTCP.SR:
member.lastts = p.ts
member.lastntp = p.ntp
member.timeout = 0
for r in p.reports:
if r.ssrc == self.member.ssrc:
self.member.storereport(r.flost, r.clost, r.jitter, r.dlsr/65536.)
break
elif p.pt == RTCP.SDES:
for ssrc,items in p.items:
if ssrc not in self.members:
member = self.members[ssrc] = Source(ssrc=ssrc)
else:
member = self.members[ssrc]
member.items = items
elif p.pt == RTCP.BYE:
for ssrc in p.ssrcs:
if ssrc in self.members:
del self.members[ssrc]
if ssrc in self.senders:
del self.senders[ssrc]
if self.running:
self.timer.stop()
self.tn = self.tc + (len(self.members)/self.pmembers) * (self.tn-self.tc)
self.tp = self.tc - (len(self.members)/self.pmembers) * (self.tc-self.tp)
self.timer.start((self.tn - self.tc) * 1000)
self.pmembers = len(self.pmembers)
self.avgrtcpsize = (1/16.)*len(data) + (15/16.)*self.avgrtcpsize
def rtcpinterval(self, initial=False):
if len(self.senders) < 0.25*len(self.members):
if self.wesent: C, n = self.avgrtcpsize / (0.25*self.rtcpbw), len(self.senders)
else: C, n = self.avgrtcpsize / (0.75*self.rtcpbw), len(self.members) - len(self.senders)
else: C, n = self.avgrtcpsize / self.rtcpbw, len(self.members)
return (min(initial and 2.5 or 5.0, n*C)) * (random.random() + 0.5) / 1.21828
def timedout(self, timer):
'''Timeout invoked to send out an RTCP.'''
if not self.running:
delay = self.rtcpinterval()
self.tn = self.tp + delay
if self.tn <= self.tc:
self.sendBYE()
else:
self.timer.start((self.tn - self.tc) * 1000)
else:
delay = self.rtcpinterval()
self.tn = self.tp + delay
if self.tn <= self.tc:
size = self.sendRTCP()
self.avgrtcpsize = (1/16.)*size + (15/16.)*self.avgrtcpsize
self.tp = self.tc
delay = self.rtcpinterval()
self.initial = False
else:
delay = self.tn - self.tc
self.pmembers = len(self.members)
self.timer.start(delay*1000)
def sendBYE(self, reason=''):
if self.rtpsent and self.rtcpsent:
sendRTCP(True)
def sendRTCP(self, sendbye=False):
'''Send a RTCP packet with SR or RR and SDES, and optionally BYE if sendbye is True.
It returns the size of the packet sent.'''
reports = []
toremove = []
for member in self.members.values():
if member.received > 0:
ntp1, ntp2 = time2ntp(member.lastntp)
lsr = ((ntp1 & 0x0ffff) << 16) | ((ntp2 >> 16) & 0x0ffff)
dlsr = int((self.tc - member.lastntp)*65536)
member.updatelostandexpected()
report = RTCP.packet(ssrc=member.ssrc, flost=member.fraction, clost=member.lost, hseq=member.cycles+member.maxseq, jitter=int(member.jitter), lsr=lsr, dlsr=dlsr)
reports.append(report)
member.received = 0
if member.timeout == 5:
toremove.append(member.ssrc)
else:
member.timeout = member.timeout + 1
if toremove:
for ssrc in toremove: del self.members[ssrc]
packet = RTCP()
if self.wesent:
p = RTCP.packet(pt=RTCP.SR, ntp=self.tc, ts=self.tsnow+self.ts0, pktcount=self.member.pktcount, octcount=self.member.octcount, reports=reports[:32])
self.wesent = False
else:
p = RTCP.packet(pt=RTCP.RR, reports=reports[:32])
packet.append(p)
if len(reports)>=32:
reports = reports[32:]
while reports:
p, reports = RTCP.packet(pt=RTCP.RR, reports=reports[:32]), reports[32:]
packet.append(p)
p = RTCP.packet(pt=RTCP.SDES, items=self.member.items)
packet.append(p)
if sendbye:
p = RTCP.packet(pt=RTCP.BYE, ssrcs=[self.member.ssrc])
packet.append(p)
data = str(packet)
self.app.sendRTCP(data)
self.rtcpsent = True
return len(data)
class Network(object):
'''A network interface that can be implemented by the application for the session,
in case of a simple consecutive (even,odd) UDP ports of RTP and RTCP. The useful properties
are src and dest, which are tuple ('ip', port) representing source and destination
addresses. There are also srcRTCP and destRTCP properties that explicitly allow
setting RTCP ports different from RTP. Once created the src property can not be changed.'''
def __init__(self, app, **kwargs):
'''Initialize the network.'''
self.app = app
self.src = kwargs.get('src', ('0.0.0.0', 0))
self.dest = kwargs.get('dest', None)
self.srcRTCP= kwargs.get('srcRTCP', (self.src[0], self.src[1] and self.src[1]+1 or 0))
self.destRTCP=kwargs.get('destRTCP', None)
self.maxsize = kwargs.get('maxsize', 1500)
if self.src[1] != 0:
try:
s1 = socket.socket(type=socket.SOCK_DGRAM)
s2 = socket.socket(type=socket.SOCK_DGRAM)
s1.bind(self.src)
s2.bind(self.srcRTCP)
except:
s1.close(); s2.close();
s1 = s2 = None
else:
retry = kwargs.get('retry', 20)
low = kwargs.get('low', 10000)
high = kwargs.get('high', 65535)
even = kwargs.get('even', True)
while retry>0:
s1 = socket.socket(type=socket.SOCK_DGRAM)
s2 = socket.socket(type=socket.SOCK_DGRAM)
if even:
port = random.randint(low, high) & 0x0fffe
else:
port = random.randint(low, high) | 0x00001
try:
s1.bind((self.src[0], port))
s2.bind((self.src[0], port+1))
self.src, self.srcRTCP = s1.getsockname(), s2.getsockname()
except:
s1.close(); s2.close();
s1 = s2 = None
retry = retry - 1
if s1 and s2:
self.rtp, self.rtcp = s1, s2
multitask.add(self.receiveRTP(s1))
multitask.add(self.receiveRTCP(s2))
else:
raise ValueError, 'cannot allocate sockets'
def __del__(self):
if self.rtp: self.rtp.close(); self.rtp = None
if self.rtcp: self.rtcp.close(); self.rtcp = None
if self.app: self.app = None
def receiveRTP(self, sock):
try:
while True:
data, remote = yield multitask.recvfrom(sock, self.maxsize)
if self.app: self.app.receivedRTP(data, remote, self.src)
except: pass
def receiveRTCP(self, sock):
try:
while True:
data, remote = yield multitask.recvfrom(sock, self.maxsize)
if self.app: self.app.receivedRTCP(data, remote, self.src)
except: pass
def sendRTP(self, data, dest=None):
if self.rtp:
yield multitask.sendto(self.rtp, data, dest or self.dest)
def sendRTCP(self, data, dest=None):
if self.rtcp:
yield multitask.sendto(self.rtcp, data, dest or self.dest)
if __name__ == '__main__':
import doctest
doctest.testmod()