update kcp-go package

This commit is contained in:
fatedier
2019-03-17 17:09:54 +08:00
parent 87a4de4370
commit fdcdccb0c2
122 changed files with 14490 additions and 2469 deletions

View File

@@ -4,7 +4,6 @@ import (
"crypto/rand"
"encoding/binary"
"hash/crc32"
"io"
"net"
"sync"
"sync/atomic"
@@ -12,6 +11,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
type errTimeout struct {
@@ -23,7 +23,7 @@ func (errTimeout) Temporary() bool { return true }
func (errTimeout) Error() string { return "i/o timeout" }
const (
// 16-bytes magic number for each packet
// 16-bytes nonce for each packet
nonceSize = 16
// 4-bytes packet checksum
@@ -40,9 +40,6 @@ const (
// accept backlog
acceptBacklog = 128
// prerouting(to session) queue
qlen = 128
)
const (
@@ -51,8 +48,8 @@ const (
)
var (
// global packet buffer
// shared among sending/receiving/FEC
// a system-wide packet buffer shared among sending, receiving and FEC
// to mitigate high-frequency memory allocation for packets
xmitBuf sync.Pool
)
@@ -68,17 +65,17 @@ type (
updaterIdx int // record slice index in updater
conn net.PacketConn // the underlying packet connection
kcp *KCP // KCP ARQ protocol
l *Listener // point to the Listener if it's accepted by Listener
block BlockCrypt // block encryption
l *Listener // pointing to the Listener object if it's been accepted by a Listener
block BlockCrypt // block encryption object
// kcp receiving is based on packets
// recvbuf turns packets into stream
recvbuf []byte
bufptr []byte
// extended output buffer(with header)
// header extended output buffer, if has header
ext []byte
// FEC
// FEC codec
fecDecoder *fecDecoder
fecEncoder *fecEncoder
@@ -86,16 +83,20 @@ type (
remote net.Addr // remote peer address
rd time.Time // read deadline
wd time.Time // write deadline
headerSize int // the overall header size added before KCP frame
ackNoDelay bool // send ack immediately for each incoming packet
headerSize int // the header size additional to a KCP frame
ackNoDelay bool // send ack immediately for each incoming packet(testing purpose)
writeDelay bool // delay kcp.flush() for Write() for bulk transfer
dup int // duplicate udp packets
dup int // duplicate udp packets(testing purpose)
// notifications
die chan struct{} // notify session has Closed
die chan struct{} // notify current session has Closed
chReadEvent chan struct{} // notify Read() can be called without blocking
chWriteEvent chan struct{} // notify Write() can be called without blocking
chErrorEvent chan error // notify Read() have an error
chReadError chan error // notify PacketConn.Read() have an error
chWriteError chan error // notify PacketConn.Write() have an error
// nonce generator
nonce Entropy
isClosed bool // flag the session has Closed
mu sync.Mutex
@@ -114,16 +115,19 @@ type (
func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, remote net.Addr, block BlockCrypt) *UDPSession {
sess := new(UDPSession)
sess.die = make(chan struct{})
sess.nonce = new(nonceAES128)
sess.nonce.Init()
sess.chReadEvent = make(chan struct{}, 1)
sess.chWriteEvent = make(chan struct{}, 1)
sess.chErrorEvent = make(chan error, 1)
sess.chReadError = make(chan error, 1)
sess.chWriteError = make(chan error, 1)
sess.remote = remote
sess.conn = conn
sess.l = l
sess.block = block
sess.recvbuf = make([]byte, mtuLimit)
// FEC initialization
// FEC codec initialization
sess.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
if sess.block != nil {
sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
@@ -131,7 +135,7 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
}
// calculate header size
// calculate additional header size introduced by FEC and encryption
if sess.block != nil {
sess.headerSize += cryptHeaderSize
}
@@ -139,8 +143,7 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
sess.headerSize += fecHeaderSizePlus2
}
// only allocate extended packet buffer
// when the extra header is required
// we only need to allocate extended packet buffer if we have the additional header
if sess.headerSize > 0 {
sess.ext = make([]byte, mtuLimit)
}
@@ -152,8 +155,8 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
})
sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
// add current session to the global updater,
// which periodically calls sess.update()
// register current session to the global updater,
// which call sess.update() periodically.
updater.addSession(sess)
if sess.l == nil { // it's a client connection
@@ -179,6 +182,7 @@ func (s *UDPSession) Read(b []byte) (n int, err error) {
n = copy(b, s.bufptr)
s.bufptr = s.bufptr[n:]
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
return n, nil
}
@@ -188,29 +192,29 @@ func (s *UDPSession) Read(b []byte) (n int, err error) {
}
if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
if len(b) >= size { // direct write to b
if len(b) >= size { // receive data into 'b' directly
s.kcp.Recv(b)
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
return size, nil
}
// resize kcp receive buffer
// to make sure recvbuf has enough capacity
// if necessary resize the stream buffer to guarantee a sufficent buffer space
if cap(s.recvbuf) < size {
s.recvbuf = make([]byte, size)
}
// resize recvbuf slice length
// resize the length of recvbuf to correspond to data size
s.recvbuf = s.recvbuf[:size]
s.kcp.Recv(s.recvbuf)
n = copy(b, s.recvbuf) // copy to b
s.bufptr = s.recvbuf[n:] // update pointer
n = copy(b, s.recvbuf) // copy to 'b'
s.bufptr = s.recvbuf[n:] // pointer update
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
return n, nil
}
// read deadline
// deadline for current reading operation
var timeout *time.Timer
var c <-chan time.Time
if !s.rd.IsZero() {
@@ -230,7 +234,7 @@ func (s *UDPSession) Read(b []byte) (n int, err error) {
case <-s.chReadEvent:
case <-c:
case <-s.die:
case err = <-s.chErrorEvent:
case err = <-s.chReadError:
if timeout != nil {
timeout.Stop()
}
@@ -252,7 +256,8 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
return 0, errors.New(errBrokenPipe)
}
// api flow control
// controls how much data will be sent to kcp core
// to prevent the memory from exhuasting
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
n = len(b)
for {
@@ -265,7 +270,8 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
}
}
if !s.writeDelay {
// flush immediately if the queue is full
if s.kcp.WaitSnd() >= int(s.kcp.snd_wnd) || !s.writeDelay {
s.kcp.flush(false)
}
s.mu.Unlock()
@@ -273,7 +279,7 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
return n, nil
}
// write deadline
// deadline for current writing operation
var timeout *time.Timer
var c <-chan time.Time
if !s.wd.IsZero() {
@@ -292,6 +298,11 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
case <-s.chWriteEvent:
case <-c:
case <-s.die:
case err = <-s.chWriteError:
if timeout != nil {
timeout.Stop()
}
return n, err
}
if timeout != nil {
@@ -302,13 +313,10 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
// Close closes the connection.
func (s *UDPSession) Close() error {
// remove this session from updater & listener(if necessary)
// remove current session from updater & listener(if necessary)
updater.removeSession(s)
if s.l != nil { // notify listener
s.l.closeSession(sessionKey{
addr: s.remote.String(),
convID: s.kcp.conv,
})
s.l.closeSession(s.remote)
}
s.mu.Lock()
@@ -337,6 +345,8 @@ func (s *UDPSession) SetDeadline(t time.Time) error {
defer s.mu.Unlock()
s.rd = t
s.wd = t
s.notifyReadEvent()
s.notifyWriteEvent()
return nil
}
@@ -345,6 +355,7 @@ func (s *UDPSession) SetReadDeadline(t time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()
s.rd = t
s.notifyReadEvent()
return nil
}
@@ -353,6 +364,7 @@ func (s *UDPSession) SetWriteDeadline(t time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()
s.wd = t
s.notifyWriteEvent()
return nil
}
@@ -420,10 +432,11 @@ func (s *UDPSession) SetDSCP(dscp int) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.l == nil {
if nc, ok := s.conn.(*connectedUDPConn); ok {
return ipv4.NewConn(nc.UDPConn).SetTOS(dscp << 2)
} else if nc, ok := s.conn.(net.Conn); ok {
return ipv4.NewConn(nc).SetTOS(dscp << 2)
if nc, ok := s.conn.(net.Conn); ok {
if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err != nil {
return ipv6.NewConn(nc).SetTrafficClass(dscp)
}
return nil
}
}
return errors.New(errInvalidOperation)
@@ -453,11 +466,11 @@ func (s *UDPSession) SetWriteBuffer(bytes int) error {
return errors.New(errInvalidOperation)
}
// output pipeline entry
// steps for output data processing:
// 0. Header extends
// 1. FEC
// 2. CRC32
// post-processing for sending a packet from kcp core
// steps:
// 0. Header extending
// 1. FEC packet generation
// 2. CRC32 integrity
// 3. Encryption
// 4. WriteTo kernel
func (s *UDPSession) output(buf []byte) {
@@ -477,13 +490,13 @@ func (s *UDPSession) output(buf []byte) {
// 2&3. crc32 & encryption
if s.block != nil {
io.ReadFull(rand.Reader, ext[:nonceSize])
s.nonce.Fill(ext[:nonceSize])
checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
s.block.Encrypt(ext, ext)
for k := range ecc {
io.ReadFull(rand.Reader, ecc[k][:nonceSize])
s.nonce.Fill(ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
@@ -497,6 +510,8 @@ func (s *UDPSession) output(buf []byte) {
if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
nbytes += n
npkts++
} else {
s.notifyWriteError(err)
}
}
@@ -504,6 +519,8 @@ func (s *UDPSession) output(buf []byte) {
if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
nbytes += n
npkts++
} else {
s.notifyWriteError(err)
}
}
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
@@ -513,11 +530,11 @@ func (s *UDPSession) output(buf []byte) {
// kcp update, returns interval for next calling
func (s *UDPSession) update() (interval time.Duration) {
s.mu.Lock()
s.kcp.flush(false)
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
waitsnd := s.kcp.WaitSnd()
interval = time.Duration(s.kcp.flush(false)) * time.Millisecond
if s.kcp.WaitSnd() < waitsnd {
s.notifyWriteEvent()
}
interval = time.Duration(s.kcp.interval) * time.Millisecond
s.mu.Unlock()
return
}
@@ -539,56 +556,77 @@ func (s *UDPSession) notifyWriteEvent() {
}
}
func (s *UDPSession) notifyWriteError(err error) {
select {
case s.chWriteError <- err:
default:
}
}
func (s *UDPSession) kcpInput(data []byte) {
var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
if s.fecDecoder != nil {
f := s.fecDecoder.decodeBytes(data)
s.mu.Lock()
if f.flag == typeData {
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
}
if len(data) > fecHeaderSize { // must be larger than fec header size
f := s.fecDecoder.decodeBytes(data)
if f.flag == typeData || f.flag == typeFEC { // header check
if f.flag == typeFEC {
fecParityShards++
}
recovers := s.fecDecoder.decode(f)
if f.flag == typeData || f.flag == typeFEC {
if f.flag == typeFEC {
fecParityShards++
}
s.mu.Lock()
waitsnd := s.kcp.WaitSnd()
if f.flag == typeData {
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
}
recovers := s.fecDecoder.decode(f)
for _, r := range recovers {
if len(r) >= 2 { // must be larger than 2bytes
sz := binary.LittleEndian.Uint16(r)
if int(sz) <= len(r) && sz >= 2 {
if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
fecRecovered++
for _, r := range recovers {
if len(r) >= 2 { // must be larger than 2bytes
sz := binary.LittleEndian.Uint16(r)
if int(sz) <= len(r) && sz >= 2 {
if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
fecRecovered++
} else {
kcpInErrors++
}
} else {
kcpInErrors++
fecErrs++
}
} else {
fecErrs++
}
} else {
fecErrs++
}
}
}
// notify reader
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
// to notify the readers to receive the data
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
// to notify the writers when queue is shorter(e.g. ACKed)
if s.kcp.WaitSnd() < waitsnd {
s.notifyWriteEvent()
}
s.mu.Unlock()
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
s.mu.Unlock()
} else {
s.mu.Lock()
waitsnd := s.kcp.WaitSnd()
if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
// notify reader
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.kcp.WaitSnd() < waitsnd {
s.notifyWriteEvent()
}
s.mu.Unlock()
}
@@ -608,65 +646,52 @@ func (s *UDPSession) kcpInput(data []byte) {
}
}
func (s *UDPSession) receiver(ch chan<- []byte) {
for {
data := xmitBuf.Get().([]byte)[:mtuLimit]
if n, _, err := s.conn.ReadFrom(data); err == nil && n >= s.headerSize+IKCP_OVERHEAD {
select {
case ch <- data[:n]:
case <-s.die:
return
}
} else if err != nil {
s.chErrorEvent <- err
return
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
}
}
// read loop for client session
// the read loop for a client session
func (s *UDPSession) readLoop() {
chPacket := make(chan []byte, qlen)
go s.receiver(chPacket)
buf := make([]byte, mtuLimit)
var src string
for {
select {
case data := <-chPacket:
raw := data
dataValid := false
if s.block != nil {
s.block.Decrypt(data, data)
data = data[nonceSize:]
checksum := crc32.ChecksumIEEE(data[crcSize:])
if checksum == binary.LittleEndian.Uint32(data) {
data = data[crcSize:]
dataValid = true
} else {
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
}
} else if s.block == nil {
dataValid = true
if n, addr, err := s.conn.ReadFrom(buf); err == nil {
// make sure the packet is from the same source
if src == "" { // set source address
src = addr.String()
} else if addr.String() != src {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
continue
}
if dataValid {
s.kcpInput(data)
if n >= s.headerSize+IKCP_OVERHEAD {
data := buf[:n]
dataValid := false
if s.block != nil {
s.block.Decrypt(data, data)
data = data[nonceSize:]
checksum := crc32.ChecksumIEEE(data[crcSize:])
if checksum == binary.LittleEndian.Uint32(data) {
data = data[crcSize:]
dataValid = true
} else {
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
}
} else if s.block == nil {
dataValid = true
}
if dataValid {
s.kcpInput(data)
}
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
xmitBuf.Put(raw)
case <-s.die:
} else {
s.chReadError <- err
return
}
}
}
type (
sessionKey struct {
addr string
convID uint32
}
// Listener defines a server listening for connections
// Listener defines a server which will be waiting to accept incoming connections
Listener struct {
block BlockCrypt // block encryption
dataShards int // FEC data shard
@@ -674,120 +699,93 @@ type (
fecDecoder *fecDecoder // FEC mock initialization
conn net.PacketConn // the underlying packet connection
sessions map[sessionKey]*UDPSession // all sessions accepted by this Listener
chAccepts chan *UDPSession // Listen() backlog
chSessionClosed chan sessionKey // session close queue
headerSize int // the overall header size added before KCP frame
die chan struct{} // notify the listener has closed
rd atomic.Value // read deadline for Accept()
sessions map[string]*UDPSession // all sessions accepted by this Listener
sessionLock sync.Mutex
chAccepts chan *UDPSession // Listen() backlog
chSessionClosed chan net.Addr // session close queue
headerSize int // the additional header to a KCP frame
die chan struct{} // notify the listener has closed
rd atomic.Value // read deadline for Accept()
wd atomic.Value
}
// incoming packet
inPacket struct {
from net.Addr
data []byte
}
)
// monitor incoming data for all connections of server
func (l *Listener) monitor() {
// cache last session
var lastKey sessionKey
// a cache for session object last used
var lastAddr string
var lastSession *UDPSession
chPacket := make(chan inPacket, qlen)
go l.receiver(chPacket)
buf := make([]byte, mtuLimit)
for {
select {
case p := <-chPacket:
raw := p.data
data := p.data
from := p.from
dataValid := false
if l.block != nil {
l.block.Decrypt(data, data)
data = data[nonceSize:]
checksum := crc32.ChecksumIEEE(data[crcSize:])
if checksum == binary.LittleEndian.Uint32(data) {
data = data[crcSize:]
if n, from, err := l.conn.ReadFrom(buf); err == nil {
if n >= l.headerSize+IKCP_OVERHEAD {
data := buf[:n]
dataValid := false
if l.block != nil {
l.block.Decrypt(data, data)
data = data[nonceSize:]
checksum := crc32.ChecksumIEEE(data[crcSize:])
if checksum == binary.LittleEndian.Uint32(data) {
data = data[crcSize:]
dataValid = true
} else {
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
}
} else if l.block == nil {
dataValid = true
} else {
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
}
} else if l.block == nil {
dataValid = true
}
if dataValid {
var conv uint32
convValid := false
if l.fecDecoder != nil {
isfec := binary.LittleEndian.Uint16(data[4:])
if isfec == typeData {
conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
convValid = true
}
} else {
conv = binary.LittleEndian.Uint32(data)
convValid = true
}
if convValid {
key := sessionKey{
addr: from.String(),
convID: conv,
}
if dataValid {
addr := from.String()
var s *UDPSession
var ok bool
// packets received from an address always come in batch.
// the packets received from an address always come in batch,
// cache the session for next packet, without querying map.
if key == lastKey {
if addr == lastAddr {
s, ok = lastSession, true
} else if s, ok = l.sessions[key]; ok {
lastSession = s
lastKey = key
} else {
l.sessionLock.Lock()
if s, ok = l.sessions[addr]; ok {
lastSession = s
lastAddr = addr
}
l.sessionLock.Unlock()
}
if !ok { // new session
if len(l.chAccepts) < cap(l.chAccepts) && len(l.sessions) < 4096 { // do not let new session overwhelm accept queue and connection count
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
s.kcpInput(data)
l.sessions[key] = s
l.chAccepts <- s
if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue
var conv uint32
convValid := false
if l.fecDecoder != nil {
isfec := binary.LittleEndian.Uint16(data[4:])
if isfec == typeData {
conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
convValid = true
}
} else {
conv = binary.LittleEndian.Uint32(data)
convValid = true
}
if convValid { // creates a new session only if the 'conv' field in kcp is accessible
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
s.kcpInput(data)
l.sessionLock.Lock()
l.sessions[addr] = s
l.sessionLock.Unlock()
l.chAccepts <- s
}
}
} else {
s.kcpInput(data)
}
}
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
xmitBuf.Put(raw)
case key := <-l.chSessionClosed:
if key == lastKey {
lastKey = sessionKey{}
}
delete(l.sessions, key)
case <-l.die:
return
}
}
}
func (l *Listener) receiver(ch chan<- inPacket) {
for {
data := xmitBuf.Get().([]byte)[:mtuLimit]
if n, from, err := l.conn.ReadFrom(data); err == nil && n >= l.headerSize+IKCP_OVERHEAD {
select {
case ch <- inPacket{from, data[:n]}:
case <-l.die:
return
}
} else if err != nil {
return
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
return
}
}
}
@@ -811,7 +809,10 @@ func (l *Listener) SetWriteBuffer(bytes int) error {
// SetDSCP sets the 6bit DSCP field of IP header
func (l *Listener) SetDSCP(dscp int) error {
if nc, ok := l.conn.(net.Conn); ok {
return ipv4.NewConn(nc).SetTOS(dscp << 2)
if err := ipv4.NewConn(nc).SetTOS(dscp << 2); err != nil {
return ipv6.NewConn(nc).SetTrafficClass(dscp)
}
return nil
}
return errors.New(errInvalidOperation)
}
@@ -864,13 +865,14 @@ func (l *Listener) Close() error {
}
// closeSession notify the listener that a session has closed
func (l *Listener) closeSession(key sessionKey) bool {
select {
case l.chSessionClosed <- key:
func (l *Listener) closeSession(remote net.Addr) (ret bool) {
l.sessionLock.Lock()
defer l.sessionLock.Unlock()
if _, ok := l.sessions[remote.String()]; ok {
delete(l.sessions, remote.String())
return true
case <-l.die:
return false
}
return false
}
// Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
@@ -898,9 +900,9 @@ func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards
func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
l := new(Listener)
l.conn = conn
l.sessions = make(map[sessionKey]*UDPSession)
l.sessions = make(map[string]*UDPSession)
l.chAccepts = make(chan *UDPSession, acceptBacklog)
l.chSessionClosed = make(chan sessionKey)
l.chSessionClosed = make(chan net.Addr)
l.die = make(chan struct{})
l.dataShards = dataShards
l.parityShards = parityShards
@@ -924,17 +926,22 @@ func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0
// DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
// network type detection
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
if err != nil {
return nil, errors.Wrap(err, "net.ResolveUDPAddr")
}
network := "udp4"
if udpaddr.IP.To4() == nil {
network = "udp"
}
udpconn, err := net.DialUDP("udp", nil, udpaddr)
conn, err := net.ListenUDP(network, nil)
if err != nil {
return nil, errors.Wrap(err, "net.DialUDP")
}
return NewConn(raddr, block, dataShards, parityShards, &connectedUDPConn{udpconn})
return NewConn(raddr, block, dataShards, parityShards, conn)
}
// NewConn establishes a session and talks KCP protocol over a packet connection.
@@ -949,6 +956,12 @@ func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn
return newUDPSession(convid, dataShards, parityShards, nil, conn, udpaddr, block), nil
}
// monotonic reference time point
var refTime time.Time = time.Now()
// currentMs returns current elasped monotonic milliseconds since program startup
func currentMs() uint32 { return uint32(time.Now().Sub(refTime) / time.Millisecond) }
func NewConnEx(convid uint32, connected bool, raddr string, block BlockCrypt, dataShards, parityShards int, conn *net.UDPConn) (*UDPSession, error) {
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
if err != nil {
@@ -963,9 +976,6 @@ func NewConnEx(convid uint32, connected bool, raddr string, block BlockCrypt, da
return newUDPSession(convid, dataShards, parityShards, nil, pConn, udpaddr, block), nil
}
// returns current time in milliseconds
func currentMs() uint32 { return uint32(time.Now().UnixNano() / int64(time.Millisecond)) }
// connectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
// to Write syscalls that are 4 times faster on some OS'es. This should only be
// used for connections that were produced by a net.Dial* call.