cc.udp.core

  1import socket
  2import json
  3
  4import numpy as np
  5
  6
  7class UDP:
  8    """
  9    UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel.
 10    """
 11    def __init__(self, recv_addr=("127.0.0.1", 8000), send_addr=("127.0.0.1", 8001)):
 12        """
 13        Initialize UDP Tx and Rx
 14        
 15        Args:
 16            recv_addr: address to listen on, None if not receiving (tx only)
 17            send_addr: address of target host, None if not sending (rx only)
 18        """
 19        self.recv_addr = recv_addr
 20        self.send_addr = send_addr
 21        
 22        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 23        
 24        if self.recv_addr:
 25            self._sock.bind(self.recv_addr)
 26            print("UDP Rx is initialized:", self.recv_addr)
 27        
 28        if self.send_addr:
 29            print("UDP Tx is initialized:", self.send_addr)
 30
 31    def stop(self) -> None:
 32        """
 33        Close the socket.
 34        """
 35        self._sock.close()
 36
 37    def recv(self, bufsize=1024, timeout=None) -> bytes:
 38        """
 39        Receive data
 40
 41        timeout == None: blocking forever
 42        timeout == 0: non-blocking (the actual delay is around 0.1s)
 43        timeout > 0: blocking for timeout seconds
 44
 45        Args:
 46            bufsize: size of data buffer to receive
 47            timeout: timeout in seconds
 48        """
 49        if not self.recv_addr:
 50            raise ValueError("Cannot receive data without a receive address")
 51        self._sock.settimeout(timeout)
 52        try:
 53            buffer, addr = self._sock.recvfrom(bufsize)
 54        except (socket.timeout, BlockingIOError):
 55            return None
 56        return buffer
 57
 58    def send(self, buffer: bytes) -> None:
 59        """
 60        Send a byte buffer to the target device.
 61        
 62        Args:
 63            buffer: data to send
 64        """
 65        if not self.send_addr:
 66            raise ValueError("Cannot send data without a send address")
 67        self._sock.sendto(buffer, self.send_addr)
 68
 69    def recv_dict(self, bufsize=1024, timeout=None) -> dict:
 70        """
 71        Receive data and deserialize it into a python dictionary.
 72
 73        See `recv()` for more information on timeout.
 74
 75        Args:
 76            bufsize: size of data buffer to receive
 77            timeout: timeout in seconds
 78        """
 79        buffer = self.recv(bufsize=bufsize, timeout=timeout)
 80        if not buffer:
 81            return None
 82        serialized_data = buffer.decode("utf-8")
 83        data = json.loads(serialized_data)
 84        return data
 85    
 86    def send_dict(self, data: dict) -> None:
 87        """
 88        Serialize a python dictionary and send it.
 89        
 90        Args:
 91            data: data to send
 92        """
 93        buffer = json.dumps(data)
 94        buffer = buffer.encode()
 95        self.send(buffer)
 96
 97    def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray:
 98        """
 99        Receive data and deserialize it into a numpy array.
100        
101        See `recv()` for more information on timeout.
102        
103        Args:
104            bufsize: size of data buffer to receive
105            dtype: numpy data type
106            timeout: timeout in seconds
107        """
108        buffer = self.recv(bufsize=bufsize, timeout=timeout)
109        if not buffer:
110            return None
111        data = np.frombuffer(buffer, dtype=dtype)
112        return data
113
114    def send_numpy(self, data: np.ndarray) -> None:
115        """
116        Serialize a numpy array and send it.
117        
118        Args:
119            data: data to send
120        """
121        buffer = data.tobytes()
122        self.send(buffer)
class UDP:
  8class UDP:
  9    """
 10    UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel.
 11    """
 12    def __init__(self, recv_addr=("127.0.0.1", 8000), send_addr=("127.0.0.1", 8001)):
 13        """
 14        Initialize UDP Tx and Rx
 15        
 16        Args:
 17            recv_addr: address to listen on, None if not receiving (tx only)
 18            send_addr: address of target host, None if not sending (rx only)
 19        """
 20        self.recv_addr = recv_addr
 21        self.send_addr = send_addr
 22        
 23        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 24        
 25        if self.recv_addr:
 26            self._sock.bind(self.recv_addr)
 27            print("UDP Rx is initialized:", self.recv_addr)
 28        
 29        if self.send_addr:
 30            print("UDP Tx is initialized:", self.send_addr)
 31
 32    def stop(self) -> None:
 33        """
 34        Close the socket.
 35        """
 36        self._sock.close()
 37
 38    def recv(self, bufsize=1024, timeout=None) -> bytes:
 39        """
 40        Receive data
 41
 42        timeout == None: blocking forever
 43        timeout == 0: non-blocking (the actual delay is around 0.1s)
 44        timeout > 0: blocking for timeout seconds
 45
 46        Args:
 47            bufsize: size of data buffer to receive
 48            timeout: timeout in seconds
 49        """
 50        if not self.recv_addr:
 51            raise ValueError("Cannot receive data without a receive address")
 52        self._sock.settimeout(timeout)
 53        try:
 54            buffer, addr = self._sock.recvfrom(bufsize)
 55        except (socket.timeout, BlockingIOError):
 56            return None
 57        return buffer
 58
 59    def send(self, buffer: bytes) -> None:
 60        """
 61        Send a byte buffer to the target device.
 62        
 63        Args:
 64            buffer: data to send
 65        """
 66        if not self.send_addr:
 67            raise ValueError("Cannot send data without a send address")
 68        self._sock.sendto(buffer, self.send_addr)
 69
 70    def recv_dict(self, bufsize=1024, timeout=None) -> dict:
 71        """
 72        Receive data and deserialize it into a python dictionary.
 73
 74        See `recv()` for more information on timeout.
 75
 76        Args:
 77            bufsize: size of data buffer to receive
 78            timeout: timeout in seconds
 79        """
 80        buffer = self.recv(bufsize=bufsize, timeout=timeout)
 81        if not buffer:
 82            return None
 83        serialized_data = buffer.decode("utf-8")
 84        data = json.loads(serialized_data)
 85        return data
 86    
 87    def send_dict(self, data: dict) -> None:
 88        """
 89        Serialize a python dictionary and send it.
 90        
 91        Args:
 92            data: data to send
 93        """
 94        buffer = json.dumps(data)
 95        buffer = buffer.encode()
 96        self.send(buffer)
 97
 98    def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray:
 99        """
100        Receive data and deserialize it into a numpy array.
101        
102        See `recv()` for more information on timeout.
103        
104        Args:
105            bufsize: size of data buffer to receive
106            dtype: numpy data type
107            timeout: timeout in seconds
108        """
109        buffer = self.recv(bufsize=bufsize, timeout=timeout)
110        if not buffer:
111            return None
112        data = np.frombuffer(buffer, dtype=dtype)
113        return data
114
115    def send_numpy(self, data: np.ndarray) -> None:
116        """
117        Serialize a numpy array and send it.
118        
119        Args:
120            data: data to send
121        """
122        buffer = data.tobytes()
123        self.send(buffer)

UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel.

UDP(recv_addr=('127.0.0.1', 8000), send_addr=('127.0.0.1', 8001))
12    def __init__(self, recv_addr=("127.0.0.1", 8000), send_addr=("127.0.0.1", 8001)):
13        """
14        Initialize UDP Tx and Rx
15        
16        Args:
17            recv_addr: address to listen on, None if not receiving (tx only)
18            send_addr: address of target host, None if not sending (rx only)
19        """
20        self.recv_addr = recv_addr
21        self.send_addr = send_addr
22        
23        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
24        
25        if self.recv_addr:
26            self._sock.bind(self.recv_addr)
27            print("UDP Rx is initialized:", self.recv_addr)
28        
29        if self.send_addr:
30            print("UDP Tx is initialized:", self.send_addr)

Initialize UDP Tx and Rx

Arguments:
  • recv_addr: address to listen on, None if not receiving (tx only)
  • send_addr: address of target host, None if not sending (rx only)
recv_addr
send_addr
def stop(self) -> None:
32    def stop(self) -> None:
33        """
34        Close the socket.
35        """
36        self._sock.close()

Close the socket.

def recv(self, bufsize=1024, timeout=None) -> bytes:
38    def recv(self, bufsize=1024, timeout=None) -> bytes:
39        """
40        Receive data
41
42        timeout == None: blocking forever
43        timeout == 0: non-blocking (the actual delay is around 0.1s)
44        timeout > 0: blocking for timeout seconds
45
46        Args:
47            bufsize: size of data buffer to receive
48            timeout: timeout in seconds
49        """
50        if not self.recv_addr:
51            raise ValueError("Cannot receive data without a receive address")
52        self._sock.settimeout(timeout)
53        try:
54            buffer, addr = self._sock.recvfrom(bufsize)
55        except (socket.timeout, BlockingIOError):
56            return None
57        return buffer

Receive data

timeout == None: blocking forever timeout == 0: non-blocking (the actual delay is around 0.1s) timeout > 0: blocking for timeout seconds

Arguments:
  • bufsize: size of data buffer to receive
  • timeout: timeout in seconds
def send(self, buffer: bytes) -> None:
59    def send(self, buffer: bytes) -> None:
60        """
61        Send a byte buffer to the target device.
62        
63        Args:
64            buffer: data to send
65        """
66        if not self.send_addr:
67            raise ValueError("Cannot send data without a send address")
68        self._sock.sendto(buffer, self.send_addr)

Send a byte buffer to the target device.

Arguments:
  • buffer: data to send
def recv_dict(self, bufsize=1024, timeout=None) -> dict:
70    def recv_dict(self, bufsize=1024, timeout=None) -> dict:
71        """
72        Receive data and deserialize it into a python dictionary.
73
74        See `recv()` for more information on timeout.
75
76        Args:
77            bufsize: size of data buffer to receive
78            timeout: timeout in seconds
79        """
80        buffer = self.recv(bufsize=bufsize, timeout=timeout)
81        if not buffer:
82            return None
83        serialized_data = buffer.decode("utf-8")
84        data = json.loads(serialized_data)
85        return data

Receive data and deserialize it into a python dictionary.

See recv() for more information on timeout.

Arguments:
  • bufsize: size of data buffer to receive
  • timeout: timeout in seconds
def send_dict(self, data: dict) -> None:
87    def send_dict(self, data: dict) -> None:
88        """
89        Serialize a python dictionary and send it.
90        
91        Args:
92            data: data to send
93        """
94        buffer = json.dumps(data)
95        buffer = buffer.encode()
96        self.send(buffer)

Serialize a python dictionary and send it.

Arguments:
  • data: data to send
def recv_numpy( self, bufsize=1024, dtype=<class 'numpy.float32'>, timeout=None) -> numpy.ndarray:
 98    def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray:
 99        """
100        Receive data and deserialize it into a numpy array.
101        
102        See `recv()` for more information on timeout.
103        
104        Args:
105            bufsize: size of data buffer to receive
106            dtype: numpy data type
107            timeout: timeout in seconds
108        """
109        buffer = self.recv(bufsize=bufsize, timeout=timeout)
110        if not buffer:
111            return None
112        data = np.frombuffer(buffer, dtype=dtype)
113        return data

Receive data and deserialize it into a numpy array.

See recv() for more information on timeout.

Arguments:
  • bufsize: size of data buffer to receive
  • dtype: numpy data type
  • timeout: timeout in seconds
def send_numpy(self, data: numpy.ndarray) -> None:
115    def send_numpy(self, data: np.ndarray) -> None:
116        """
117        Serialize a numpy array and send it.
118        
119        Args:
120            data: data to send
121        """
122        buffer = data.tobytes()
123        self.send(buffer)

Serialize a numpy array and send it.

Arguments:
  • data: data to send