laserspace/src/trap/zmqplugin.rs
2025-04-25 22:08:34 +02:00

175 lines
5.6 KiB
Rust

use zmq::Socket;
use serde_json::Result;
use bevy::{ecs::system::SystemState, prelude::*, render::Render};
use std::num::NonZero;
use super::{laser::{LaserApi, LaserTimer}, tracks::{Frame, LaserPoints, RenderableLines, Track, TrackBundle}};
// use trap::{Frame, Track, TrackBundle};
// use tracks::Frame;
// Because of world.insert_non_send_resource, this is an exclusive system
// see: https://bevy-cheatbook.github.io/programming/exclusive.html
fn setup_zmq(world: &mut World, params: &mut SystemState<Res<ZmqSettings>>) {
let settings = params.get(world);
let url = &settings.url;
let context = zmq::Context::new();
let subscriber = context.socket(zmq::SUB).unwrap();
assert!(subscriber.connect(url).is_ok());
// let filter = "10001";
let filter = &settings.filter; //"msgs";
assert!(subscriber.set_subscribe(filter.as_bytes()).is_ok());
world.insert_non_send_resource(subscriber);
// world.query::<>()
}
// fn receive_msg<T>(subscriber: Socket) -> Result<T> {
// let mut items = [
// subscriber.as_poll_item(zmq::POLLIN)
// ];
// let _nr = zmq::poll(&mut items, 0).unwrap();
// if items[0].is_readable() {
// let json = subscriber.recv_string(0).unwrap().unwrap();
// // dbg!(&json[4..]);
// // let msg: Frame = serde_json::from_str(&json[4..]).expect("No valid json?");
// let res: Result<T> = serde_json::from_str(&json);
// res
// } else {
// Err()
// }
// }
fn receive_zmq_lines(
subscriber: NonSend<Socket>,
mut lasers: Query<(&mut LaserApi, &mut LaserTimer)>,
time: Res<Time>,
mut exit: EventWriter<AppExit>
) {
let mut items = [
subscriber.as_poll_item(zmq::POLLIN)
];
let _nr = zmq::poll(&mut items, 0).unwrap();
if items[0].is_readable() {
let json = subscriber.recv_string(0).unwrap().unwrap();
// dbg!(&json[4..]);
// let msg: Frame = serde_json::from_str(&json[4..]).expect("No valid json?");
let res: Result<RenderableLines> = serde_json::from_str(&json);
let lines: RenderableLines = match res {
Ok(lines) => lines, // if Ok(255), set x to 255
Err(_e) => {
println!("No valid json? {json}");
println!("{}", _e);
exit.send(AppExit::Error(NonZero::<u8>::new(10).unwrap()));
return
}, // if Err("some message"), panic with error message "some message"
};
println!("receive {}", lines.lines.len());
for (laser_api, mut laser_timer) in lasers.iter_mut() {
laser_timer.timer.tick(time.delta());
// let lines = get_laser_lines(version);
let lines: RenderableLines = lines.clone();
laser_api.laser_stream.send(|laser| {
let laser_lines: RenderableLines = lines;
laser.current_lines = laser_lines;
}).unwrap();
}
}
}
fn receive_zmq_tracks(mut commands: Commands, subscriber: NonSend<Socket>, mut tracks_q: Query<&mut Track>, mut exit: EventWriter<AppExit>) {
let mut items = [
subscriber.as_poll_item(zmq::POLLIN)
];
let _nr = zmq::poll(&mut items, 0).unwrap();
if items[0].is_readable() {
let json = subscriber.recv_string(0).unwrap().unwrap();
// dbg!(&json[4..]);
// let msg: Frame = serde_json::from_str(&json[4..]).expect("No valid json?");
let res: Result<Frame> = serde_json::from_str(&json);
let msg = match res {
Ok(msg) => msg, // if Ok(255), set x to 255
Err(_e) => {
println!("No valid json? {json}");
exit.send(AppExit::Error(NonZero::<u8>::new(10).unwrap()));
return
}, // if Err("some message"), panic with error message "some message"
};
for (_track_id, new_track) in msg.tracks.into_iter() {
let mut done = false;
for mut track in tracks_q.iter_mut() {
if track.track_id == new_track.track_id {
// track.nr += 1;
track.history = new_track.history.clone();
track.predictor_history = new_track.predictor_history.clone();
track.predictions = new_track.predictions.clone();
// TODO match lenghts of points and drawn_points
done = true;
// dbg!(&track);
break
}
}
if !done {
// CREATE
// let track = Track{
// id: json.clone(),
// nr: 0,
// points: Vec::new()
// };
dbg!(&new_track.predictor_history);
commands.spawn(TrackBundle::from(new_track));
// commands.spawn(new_track);
}
}
}
}
#[derive(Resource)]
struct ZmqSettings {
url: String,
filter: String
}
pub enum ZmqReceiveTarget {
LINES,
TRACKS,
}
pub struct ZmqPlugin {
pub url: String,
pub filter: String,
pub target: ZmqReceiveTarget
}
impl Plugin for ZmqPlugin {
fn build(&self, app: &mut App) {
let settings = ZmqSettings{
url: self.url.clone(),
filter: self.filter.clone()
};
app
.insert_resource(settings)
.add_systems(Startup, setup_zmq)
;
match self.target {
ZmqReceiveTarget::TRACKS => app.add_systems(Update, receive_zmq_tracks),
ZmqReceiveTarget::LINES => app.add_systems(Update, receive_zmq_lines),
};
}
}