cc.udp.core
1import socket 2import json 3 4import numpy as np 5 6 7class UDP: 8 """ 9 UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel. 10 """ 11 def __init__(self, recv_addr=("127.0.0.1", 8000), send_addr=("127.0.0.1", 8001)): 12 """ 13 Initialize UDP Tx and Rx 14 15 Args: 16 recv_addr: address to listen on, None if not receiving (tx only) 17 send_addr: address of target host, None if not sending (rx only) 18 """ 19 self.recv_addr = recv_addr 20 self.send_addr = send_addr 21 22 self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 23 24 if self.recv_addr: 25 self._sock.bind(self.recv_addr) 26 print("UDP Rx is initialized:", self.recv_addr) 27 28 if self.send_addr: 29 print("UDP Tx is initialized:", self.send_addr) 30 31 def stop(self) -> None: 32 """ 33 Close the socket. 34 """ 35 self._sock.close() 36 37 def recv(self, bufsize=1024, timeout=None) -> bytes: 38 """ 39 Receive data 40 41 timeout == None: blocking forever 42 timeout == 0: non-blocking (the actual delay is around 0.1s) 43 timeout > 0: blocking for timeout seconds 44 45 Args: 46 bufsize: size of data buffer to receive 47 timeout: timeout in seconds 48 """ 49 if not self.recv_addr: 50 raise ValueError("Cannot receive data without a receive address") 51 self._sock.settimeout(timeout) 52 try: 53 buffer, addr = self._sock.recvfrom(bufsize) 54 except (socket.timeout, BlockingIOError): 55 return None 56 return buffer 57 58 def send(self, buffer: bytes) -> None: 59 """ 60 Send a byte buffer to the target device. 61 62 Args: 63 buffer: data to send 64 """ 65 if not self.send_addr: 66 raise ValueError("Cannot send data without a send address") 67 self._sock.sendto(buffer, self.send_addr) 68 69 def recv_dict(self, bufsize=1024, timeout=None) -> dict: 70 """ 71 Receive data and deserialize it into a python dictionary. 72 73 See `recv()` for more information on timeout. 74 75 Args: 76 bufsize: size of data buffer to receive 77 timeout: timeout in seconds 78 """ 79 buffer = self.recv(bufsize=bufsize, timeout=timeout) 80 if not buffer: 81 return None 82 serialized_data = buffer.decode("utf-8") 83 data = json.loads(serialized_data) 84 return data 85 86 def send_dict(self, data: dict) -> None: 87 """ 88 Serialize a python dictionary and send it. 89 90 Args: 91 data: data to send 92 """ 93 buffer = json.dumps(data) 94 buffer = buffer.encode() 95 self.send(buffer) 96 97 def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray: 98 """ 99 Receive data and deserialize it into a numpy array. 100 101 See `recv()` for more information on timeout. 102 103 Args: 104 bufsize: size of data buffer to receive 105 dtype: numpy data type 106 timeout: timeout in seconds 107 """ 108 buffer = self.recv(bufsize=bufsize, timeout=timeout) 109 if not buffer: 110 return None 111 data = np.frombuffer(buffer, dtype=dtype) 112 return data 113 114 def send_numpy(self, data: np.ndarray) -> None: 115 """ 116 Serialize a numpy array and send it. 117 118 Args: 119 data: data to send 120 """ 121 buffer = data.tobytes() 122 self.send(buffer)
class
UDP:
8class UDP: 9 """ 10 UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel. 11 """ 12 def __init__(self, recv_addr=("127.0.0.1", 8000), send_addr=("127.0.0.1", 8001)): 13 """ 14 Initialize UDP Tx and Rx 15 16 Args: 17 recv_addr: address to listen on, None if not receiving (tx only) 18 send_addr: address of target host, None if not sending (rx only) 19 """ 20 self.recv_addr = recv_addr 21 self.send_addr = send_addr 22 23 self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 24 25 if self.recv_addr: 26 self._sock.bind(self.recv_addr) 27 print("UDP Rx is initialized:", self.recv_addr) 28 29 if self.send_addr: 30 print("UDP Tx is initialized:", self.send_addr) 31 32 def stop(self) -> None: 33 """ 34 Close the socket. 35 """ 36 self._sock.close() 37 38 def recv(self, bufsize=1024, timeout=None) -> bytes: 39 """ 40 Receive data 41 42 timeout == None: blocking forever 43 timeout == 0: non-blocking (the actual delay is around 0.1s) 44 timeout > 0: blocking for timeout seconds 45 46 Args: 47 bufsize: size of data buffer to receive 48 timeout: timeout in seconds 49 """ 50 if not self.recv_addr: 51 raise ValueError("Cannot receive data without a receive address") 52 self._sock.settimeout(timeout) 53 try: 54 buffer, addr = self._sock.recvfrom(bufsize) 55 except (socket.timeout, BlockingIOError): 56 return None 57 return buffer 58 59 def send(self, buffer: bytes) -> None: 60 """ 61 Send a byte buffer to the target device. 62 63 Args: 64 buffer: data to send 65 """ 66 if not self.send_addr: 67 raise ValueError("Cannot send data without a send address") 68 self._sock.sendto(buffer, self.send_addr) 69 70 def recv_dict(self, bufsize=1024, timeout=None) -> dict: 71 """ 72 Receive data and deserialize it into a python dictionary. 73 74 See `recv()` for more information on timeout. 75 76 Args: 77 bufsize: size of data buffer to receive 78 timeout: timeout in seconds 79 """ 80 buffer = self.recv(bufsize=bufsize, timeout=timeout) 81 if not buffer: 82 return None 83 serialized_data = buffer.decode("utf-8") 84 data = json.loads(serialized_data) 85 return data 86 87 def send_dict(self, data: dict) -> None: 88 """ 89 Serialize a python dictionary and send it. 90 91 Args: 92 data: data to send 93 """ 94 buffer = json.dumps(data) 95 buffer = buffer.encode() 96 self.send(buffer) 97 98 def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray: 99 """ 100 Receive data and deserialize it into a numpy array. 101 102 See `recv()` for more information on timeout. 103 104 Args: 105 bufsize: size of data buffer to receive 106 dtype: numpy data type 107 timeout: timeout in seconds 108 """ 109 buffer = self.recv(bufsize=bufsize, timeout=timeout) 110 if not buffer: 111 return None 112 data = np.frombuffer(buffer, dtype=dtype) 113 return data 114 115 def send_numpy(self, data: np.ndarray) -> None: 116 """ 117 Serialize a numpy array and send it. 118 119 Args: 120 data: data to send 121 """ 122 buffer = data.tobytes() 123 self.send(buffer)
UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel.
UDP(recv_addr=('127.0.0.1', 8000), send_addr=('127.0.0.1', 8001))
12 def __init__(self, recv_addr=("127.0.0.1", 8000), send_addr=("127.0.0.1", 8001)): 13 """ 14 Initialize UDP Tx and Rx 15 16 Args: 17 recv_addr: address to listen on, None if not receiving (tx only) 18 send_addr: address of target host, None if not sending (rx only) 19 """ 20 self.recv_addr = recv_addr 21 self.send_addr = send_addr 22 23 self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 24 25 if self.recv_addr: 26 self._sock.bind(self.recv_addr) 27 print("UDP Rx is initialized:", self.recv_addr) 28 29 if self.send_addr: 30 print("UDP Tx is initialized:", self.send_addr)
Initialize UDP Tx and Rx
Arguments:
- recv_addr: address to listen on, None if not receiving (tx only)
- send_addr: address of target host, None if not sending (rx only)
def
recv(self, bufsize=1024, timeout=None) -> bytes:
38 def recv(self, bufsize=1024, timeout=None) -> bytes: 39 """ 40 Receive data 41 42 timeout == None: blocking forever 43 timeout == 0: non-blocking (the actual delay is around 0.1s) 44 timeout > 0: blocking for timeout seconds 45 46 Args: 47 bufsize: size of data buffer to receive 48 timeout: timeout in seconds 49 """ 50 if not self.recv_addr: 51 raise ValueError("Cannot receive data without a receive address") 52 self._sock.settimeout(timeout) 53 try: 54 buffer, addr = self._sock.recvfrom(bufsize) 55 except (socket.timeout, BlockingIOError): 56 return None 57 return buffer
Receive data
timeout == None: blocking forever timeout == 0: non-blocking (the actual delay is around 0.1s) timeout > 0: blocking for timeout seconds
Arguments:
- bufsize: size of data buffer to receive
- timeout: timeout in seconds
def
send(self, buffer: bytes) -> None:
59 def send(self, buffer: bytes) -> None: 60 """ 61 Send a byte buffer to the target device. 62 63 Args: 64 buffer: data to send 65 """ 66 if not self.send_addr: 67 raise ValueError("Cannot send data without a send address") 68 self._sock.sendto(buffer, self.send_addr)
Send a byte buffer to the target device.
Arguments:
- buffer: data to send
def
recv_dict(self, bufsize=1024, timeout=None) -> dict:
70 def recv_dict(self, bufsize=1024, timeout=None) -> dict: 71 """ 72 Receive data and deserialize it into a python dictionary. 73 74 See `recv()` for more information on timeout. 75 76 Args: 77 bufsize: size of data buffer to receive 78 timeout: timeout in seconds 79 """ 80 buffer = self.recv(bufsize=bufsize, timeout=timeout) 81 if not buffer: 82 return None 83 serialized_data = buffer.decode("utf-8") 84 data = json.loads(serialized_data) 85 return data
Receive data and deserialize it into a python dictionary.
See recv()
for more information on timeout.
Arguments:
- bufsize: size of data buffer to receive
- timeout: timeout in seconds
def
send_dict(self, data: dict) -> None:
87 def send_dict(self, data: dict) -> None: 88 """ 89 Serialize a python dictionary and send it. 90 91 Args: 92 data: data to send 93 """ 94 buffer = json.dumps(data) 95 buffer = buffer.encode() 96 self.send(buffer)
Serialize a python dictionary and send it.
Arguments:
- data: data to send
def
recv_numpy( self, bufsize=1024, dtype=<class 'numpy.float32'>, timeout=None) -> numpy.ndarray:
98 def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray: 99 """ 100 Receive data and deserialize it into a numpy array. 101 102 See `recv()` for more information on timeout. 103 104 Args: 105 bufsize: size of data buffer to receive 106 dtype: numpy data type 107 timeout: timeout in seconds 108 """ 109 buffer = self.recv(bufsize=bufsize, timeout=timeout) 110 if not buffer: 111 return None 112 data = np.frombuffer(buffer, dtype=dtype) 113 return data
Receive data and deserialize it into a numpy array.
See recv()
for more information on timeout.
Arguments:
- bufsize: size of data buffer to receive
- dtype: numpy data type
- timeout: timeout in seconds
def
send_numpy(self, data: numpy.ndarray) -> None:
115 def send_numpy(self, data: np.ndarray) -> None: 116 """ 117 Serialize a numpy array and send it. 118 119 Args: 120 data: data to send 121 """ 122 buffer = data.tobytes() 123 self.send(buffer)
Serialize a numpy array and send it.
Arguments:
- data: data to send