Provide port and filename as arguments
This commit is contained in:
parent
9d8d7cbff4
commit
1937557b21
3 changed files with 34 additions and 17 deletions
|
@ -1,5 +1,7 @@
|
||||||
* Subscribe to a ZMQ pub socket with `record_sub.py`
|
* Subscribe to a ZMQ pub socket with `uv run record_sub.py`
|
||||||
* Playback the recorded messages by publishing them to the provided port with `record_playback.py`
|
* Provide port and filename as arguments. For example: `uv run record_sub.py 99174 messages-20250701.jsonl`
|
||||||
* Messages should be played with the same timing offsets as they were recorded. Starting after the first message.
|
* Playback the recorded messages by publishing them to the provided port with `uv run record_playback.py`
|
||||||
|
* Messages should be played with the same timing offsets as they were recorded. Starting after the first message (which is send immediately, without delay).
|
||||||
|
* Again, provide port and filename as arguments: `uv run record_playback.py 99174 messages-20250701.jsonl `
|
||||||
|
|
||||||
Usefull for testing the functionality of a pubsub subsciber script.
|
Usefull for testing the functionality of a pubsub subsciber script.
|
||||||
|
|
|
@ -6,7 +6,7 @@ import sys
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
|
|
||||||
port = "99173"
|
port = "99174"
|
||||||
if len(sys.argv) > 1:
|
if len(sys.argv) > 1:
|
||||||
port = sys.argv[1]
|
port = sys.argv[1]
|
||||||
int(port)
|
int(port)
|
||||||
|
|
|
@ -4,6 +4,7 @@ Record messages
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
import tqdm
|
||||||
import zmq
|
import zmq
|
||||||
import random
|
import random
|
||||||
import sys
|
import sys
|
||||||
|
@ -11,11 +12,22 @@ import time
|
||||||
import json
|
import json
|
||||||
import jsonlines
|
import jsonlines
|
||||||
|
|
||||||
|
port = "99174"
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
port = sys.argv[1]
|
||||||
|
int(port)
|
||||||
|
|
||||||
|
if len(sys.argv) > 2:
|
||||||
|
fn = sys.argv[2]
|
||||||
|
else:
|
||||||
|
fn = "messages.jsonl"
|
||||||
|
|
||||||
context = zmq.Context()
|
context = zmq.Context()
|
||||||
socket = context.socket(zmq.SUB)
|
socket = context.socket(zmq.SUB)
|
||||||
|
|
||||||
socket.connect ("tcp://100.109.175.82:99173")
|
addr = f"tcp://100.109.175.82:{port}"
|
||||||
|
|
||||||
|
socket.connect (addr)
|
||||||
socket.setsockopt_string(zmq.SUBSCRIBE, "")
|
socket.setsockopt_string(zmq.SUBSCRIBE, "")
|
||||||
#print('connected')
|
#print('connected')
|
||||||
# Subscribe to zipcode, default is NYC, 10001
|
# Subscribe to zipcode, default is NYC, 10001
|
||||||
|
@ -24,18 +36,21 @@ socket.setsockopt_string(zmq.SUBSCRIBE, "")
|
||||||
|
|
||||||
last = time.time()
|
last = time.time()
|
||||||
|
|
||||||
with jsonlines.open('messages.jsonl', mode='w', flush=True) as writer:
|
print(f"Listen to {addr}, write to {fn}")
|
||||||
while True:
|
|
||||||
string = socket.recv_string()
|
with jsonlines.open(fn, mode='w', flush=True) as writer:
|
||||||
data = json.loads(string)
|
with tqdm.tqdm() as pbar:
|
||||||
now = time.time()
|
while True:
|
||||||
msg = {
|
pbar.update()
|
||||||
'data': data,
|
string = socket.recv_string()
|
||||||
'offset': now - last
|
data = json.loads(string)
|
||||||
}
|
now = time.time()
|
||||||
print(msg['offset'])
|
msg = {
|
||||||
writer.write(msg)
|
'data': data,
|
||||||
last = now
|
'offset': now - last
|
||||||
|
}
|
||||||
|
writer.write(msg)
|
||||||
|
last = now
|
||||||
|
|
||||||
#topic, messagedata = string.split()
|
#topic, messagedata = string.split()
|
||||||
#total_value += int(messagedata)
|
#total_value += int(messagedata)
|
||||||
|
|
Loading…
Reference in a new issue