diff --git a/README.md b/README.md index f0835cd..53a5e56 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ -* Subscribe to a ZMQ pub socket with `record_sub.py` -* Playback the recorded messages by publishing them to the provided port with `record_playback.py` - * Messages should be played with the same timing offsets as they were recorded. Starting after the first message. +* Subscribe to a ZMQ pub socket with `uv run record_sub.py` + * Provide port and filename as arguments. For example: `uv run record_sub.py 99174 messages-20250701.jsonl` +* Playback the recorded messages by publishing them to the provided port with `uv run record_playback.py` + * Messages should be played with the same timing offsets as they were recorded. Starting after the first message (which is send immediately, without delay). + * Again, provide port and filename as arguments: `uv run record_playback.py 99174 messages-20250701.jsonl ` Usefull for testing the functionality of a pubsub subsciber script. diff --git a/record_playback.py b/record_playback.py index e85b7d8..ba64e89 100644 --- a/record_playback.py +++ b/record_playback.py @@ -6,7 +6,7 @@ import sys import time import json -port = "99173" +port = "99174" if len(sys.argv) > 1: port = sys.argv[1] int(port) diff --git a/record_sub.py b/record_sub.py index a2bf9f0..2885086 100644 --- a/record_sub.py +++ b/record_sub.py @@ -4,6 +4,7 @@ Record messages from dataclasses import dataclass from typing import Any +import tqdm import zmq import random import sys @@ -11,11 +12,22 @@ import time import json import jsonlines +port = "99174" +if len(sys.argv) > 1: + port = sys.argv[1] + int(port) + +if len(sys.argv) > 2: + fn = sys.argv[2] +else: + fn = "messages.jsonl" context = zmq.Context() socket = context.socket(zmq.SUB) -socket.connect ("tcp://100.109.175.82:99173") +addr = f"tcp://100.109.175.82:{port}" + +socket.connect (addr) socket.setsockopt_string(zmq.SUBSCRIBE, "") #print('connected') # Subscribe to zipcode, default is NYC, 10001 @@ -24,18 +36,21 @@ socket.setsockopt_string(zmq.SUBSCRIBE, "") last = time.time() -with jsonlines.open('messages.jsonl', mode='w', flush=True) as writer: - while True: - string = socket.recv_string() - data = json.loads(string) - now = time.time() - msg = { - 'data': data, - 'offset': now - last - } - print(msg['offset']) - writer.write(msg) - last = now +print(f"Listen to {addr}, write to {fn}") + +with jsonlines.open(fn, mode='w', flush=True) as writer: + with tqdm.tqdm() as pbar: + while True: + pbar.update() + string = socket.recv_string() + data = json.loads(string) + now = time.time() + msg = { + 'data': data, + 'offset': now - last + } + writer.write(msg) + last = now #topic, messagedata = string.split() #total_value += int(messagedata)