diff --git a/concore.jl b/concore.jl index 1b23ff0..04a2987 100644 --- a/concore.jl +++ b/concore.jl @@ -9,12 +9,19 @@ # Uses relative paths (./in, ./out) for local execution. # For Docker containers, use concoredocker.jl instead. # -# No external dependencies -- only Julia stdlib. +# No hard external dependencies; ZMQ.jl is optional. module Concore using Mmap +const HAS_ZMQ = try + @eval import ZMQ + true +catch + false +end + # ----------------------------------------------------------------------------- # Backend selection # ----------------------------------------------------------------------------- @@ -35,9 +42,13 @@ struct MmapBackend <: AbstractBackend end end +"""ZeroMQ transport marker backend.""" +struct ZmqBackend <: AbstractBackend end + # Compatibility name for proposal/docs wording. const FileTransport = FileBackend const MmapTransport = MmapBackend +const ZmqTransport = ZmqBackend # ----------------------------------------------------------------------------- # Path configuration @@ -97,10 +108,189 @@ _backend_inpath(::FileBackend) = inpath _backend_outpath(::FileBackend) = outpath _backend_inpath(::MmapBackend) = inpath _backend_outpath(::MmapBackend) = outpath +_backend_inpath(::ZmqBackend) = "zmq://in" +_backend_outpath(::ZmqBackend) = "zmq://out" _input_dir(port::Int) = _backend_inpath(_backend) * string(port) _output_dir(port::Int) = _backend_outpath(_backend) * string(port) +mutable struct ZeroMQPort + context::Any + socket::Any + port_type::String + address::String + socket_type::String +end + +const zmq_ports = Dict{String, ZeroMQPort}() +const _zmq_context = Ref{Any}(nothing) +const _zmq_cleanup_registered = Ref(false) + +function _require_zmq() + HAS_ZMQ && return nothing + error("ZMQ.jl is not installed. Install it with: using Pkg; Pkg.add(\"ZMQ\")") +end + +function _zmq_socket_type(socket_type::AbstractString) + _require_zmq() + upper = uppercase(String(socket_type)) + for name in ("REQ", "REP", "PUB", "SUB", "PUSH", "PULL", "PAIR") + upper == name && return getfield(ZMQ, Symbol(name)) + end + error("unknown ZMQ socket type '$socket_type'") +end + +function _register_zmq_cleanup() + if !_zmq_cleanup_registered[] + atexit(terminate_zmq) + _zmq_cleanup_registered[] = true + end +end + +function _get_zmq_context() + _require_zmq() + if _zmq_context[] === nothing + _zmq_context[] = ZMQ.Context() + end + return _zmq_context[] +end + +function _configure_zmq_socket(socket) + ZMQ.set_rcvtimeo(socket, 2000) + ZMQ.set_sndtimeo(socket, 2000) + ZMQ.set_linger(socket, 0) + return nothing +end + +function _zmq_send(socket, payload::AbstractString; max_retries::Int = 5) + _require_zmq() + max_retries > 0 || throw(ArgumentError("max_retries must be positive")) + for attempt in 1:max_retries + try + ZMQ.send(socket, String(payload)) + return nothing + catch + attempt == max_retries && rethrow() + sleep(0.5) + end + end +end + +function _zmq_recv(socket; max_retries::Int = 5)::String + _require_zmq() + max_retries > 0 || throw(ArgumentError("max_retries must be positive")) + for attempt in 1:max_retries + try + return String(ZMQ.recv(socket)) + catch + attempt == max_retries && rethrow() + sleep(0.5) + end + end + error("ZMQ receive failed") +end + +function _zmq_payload(val::AbstractVector{<:Real}, delta::Real)::String + return _format_wire(Float64[simtime + Float64(delta); Float64.(val)]) +end + +function _zmq_payload(val::AbstractString, delta::Real)::String + return String(val) +end + +function _close_zmq_port(port::ZeroMQPort) + try + ZMQ.close(port.socket) + catch + end + return nothing +end + +function init_zmq_port( + port_name::AbstractString, + port_type::AbstractString, + address::AbstractString, + socket_type::AbstractString, +) + _require_zmq() + _register_zmq_cleanup() + + ptype = lowercase(String(port_type)) + (ptype == "bind" || ptype == "connect") || error("port_type must be bind or connect") + + if haskey(zmq_ports, String(port_name)) + return nothing + end + + ctx = _get_zmq_context() + socket = ZMQ.Socket(ctx, _zmq_socket_type(socket_type)) + _configure_zmq_socket(socket) + + if ptype == "bind" + ZMQ.bind(socket, String(address)) + else + ZMQ.connect(socket, String(address)) + end + + zmq_ports[String(port_name)] = ZeroMQPort( + ctx, + socket, + ptype, + String(address), + uppercase(String(socket_type)), + ) + return nothing +end + +function init_zmq_port(port_name, port_type::Symbol, address, socket_type::Symbol) + init_zmq_port(string(port_name), string(port_type), string(address), string(socket_type)) +end + +function _zmq_read(port_name::AbstractString, initstr::AbstractString)::Vector{Float64} + _require_zmq() + port = get(zmq_ports, String(port_name), nothing) + port === nothing && error("ZMQ port '$port_name' not registered") + + global simtime + try + msg = _zmq_recv(port.socket) + vals = safe_parse_list(msg) + simtime = max(simtime, vals[1]) + return vals[2:end] + catch + return initval(initstr) + end +end + +function _zmq_write( + port_name::AbstractString, + val::Union{AbstractVector{<:Real},AbstractString}, + delta::Real, +) + _require_zmq() + port = get(zmq_ports, String(port_name), nothing) + port === nothing && error("ZMQ port '$port_name' not registered") + + payload = _zmq_payload(val, delta) + _zmq_send(port.socket, payload) + return nothing +end + +function terminate_zmq() + for port in values(zmq_ports) + _close_zmq_port(port) + end + empty!(zmq_ports) + if _zmq_context[] !== nothing + try + ZMQ.close(_zmq_context[]) + catch + end + _zmq_context[] = nothing + end + return nothing +end + const _mmap_segments = Dict{String, Tuple{IOStream, Vector{UInt8}}}() const _mmap_cleanup_registered = Ref(false) @@ -498,6 +688,21 @@ function concore_read( return val[2:end] end +function concore_read( + port_identifier::AbstractString, + name::AbstractString, + initstr::AbstractString, +)::Vector{Float64} + if haskey(zmq_ports, String(port_identifier)) + return _zmq_read(port_identifier, initstr) + end + + file_port = tryparse(Int, port_identifier) + file_port !== nothing && return concore_read(file_port, name, initstr) + + error("ZMQ port '$port_identifier' not registered") +end + """ concore_write(port::Int, name::AbstractString, val::AbstractVector{<:Real}; delta::Real=0) @@ -525,6 +730,22 @@ function concore_write( return nothing end +function concore_write( + port_identifier::AbstractString, + name::AbstractString, + val::AbstractVector{<:Real}; + delta::Real = 0, +) + if haskey(zmq_ports, String(port_identifier)) + return _zmq_write(port_identifier, val, delta) + end + + file_port = tryparse(Int, port_identifier) + file_port !== nothing && return concore_write(file_port, name, val; delta=delta) + + error("ZMQ port '$port_identifier' not registered") +end + """ concore_write(port::Int, name::AbstractString, val::AbstractString; delta::Int=0) @@ -547,6 +768,22 @@ function concore_write( return nothing end +function concore_write( + port_identifier::AbstractString, + name::AbstractString, + val::AbstractString; + delta::Int = 0, +) + if haskey(zmq_ports, String(port_identifier)) + return _zmq_write(port_identifier, val, delta) + end + + file_port = tryparse(Int, port_identifier) + file_port !== nothing && return concore_write(file_port, name, val; delta=delta) + + error("ZMQ port '$port_identifier' not registered") +end + """ initval(simtime_val::AbstractString) -> Vector{Float64} @@ -617,6 +854,7 @@ export load_iport!, load_oport!, load_params!, concore_init! export load_iport, load_oport, load_params, default_maxtime, concore_init export AbstractBackend, FileBackend, FileTransport export MmapBackend, MmapTransport, mmap_cleanup +export ZmqBackend, ZmqTransport, HAS_ZMQ, init_zmq_port, terminate_zmq, zmq_ports # ----------------------------------------------------------------------------- # Auto-initialize on load diff --git a/tests/julia/runtests.jl b/tests/julia/runtests.jl index 0a56935..e85c7d3 100644 --- a/tests/julia/runtests.jl +++ b/tests/julia/runtests.jl @@ -11,4 +11,5 @@ using .Concore include("test_wire_compat.jl") include("test_interop.jl") include("test_mmap.jl") + include("test_zmq.jl") end diff --git a/tests/julia/test_zmq.jl b/tests/julia/test_zmq.jl new file mode 100644 index 0000000..ba9fe3d --- /dev/null +++ b/tests/julia/test_zmq.jl @@ -0,0 +1,108 @@ +@testset "ZMQ backend" begin + + function reset_zmq_state!() + Concore.terminate_zmq() + Concore._backend = Concore.FileBackend() + Concore.simtime = 0.0 + Concore.delay = 0.0 + Concore.s = "" + Concore.olds = "" + Concore.retrycount = 0 + end + + @testset "backend type" begin + @test Concore.ZmqBackend <: Concore.AbstractBackend + @test Concore.ZmqBackend() isa Concore.AbstractBackend + @test Concore.ZmqTransport === Concore.ZmqBackend + @test Concore._backend_inpath(Concore.ZmqBackend()) == "zmq://in" + @test Concore._backend_outpath(Concore.ZmqBackend()) == "zmq://out" + end + + @testset "soft dependency" begin + @test Concore.HAS_ZMQ isa Bool + if Concore.HAS_ZMQ + @test Concore._require_zmq() === nothing + else + @test_throws ErrorException Concore._require_zmq() + @test_throws ErrorException Concore.init_zmq_port( + "x", "bind", "tcp://127.0.0.1:5555", "REQ" + ) + end + end + + @testset "port registry cleanup" begin + reset_zmq_state!() + @test isempty(Concore.zmq_ports) + @test Concore.terminate_zmq() === nothing + @test isempty(Concore.zmq_ports) + end + + @testset "payload format" begin + reset_zmq_state!() + Concore.simtime = 5.0 + @test Concore._zmq_payload([42.0, 3.14], 0) == "[5.0, 42.0, 3.14]" + @test Concore._zmq_payload([1, 2], 1) == "[6.0, 1.0, 2.0]" + @test Concore._zmq_payload("[7.0, 8.0]", 0) == "[7.0, 8.0]" + end + + @testset "string port fallback" begin + mktempdir() do dir + reset_zmq_state!() + old_inpath = Concore.inpath + old_outpath = Concore.outpath + try + Concore.inpath = joinpath(dir, "io") + Concore.outpath = joinpath(dir, "io") + Concore.simtime = 2.0 + + Concore.concore_write("1", "signal", [9.0]) + result = Concore.concore_read("1", "signal", "[0.0, 0.0]") + + @test result == [9.0] + @test Concore.simtime == 2.0 + finally + Concore.inpath = old_inpath + Concore.outpath = old_outpath + reset_zmq_state!() + end + end + end + + @testset "unregistered string port" begin + reset_zmq_state!() + @test_throws ErrorException Concore.concore_read("missing", "signal", "[0.0]") + @test_throws ErrorException Concore.concore_write("missing", "signal", [1.0]) + end + + if Concore.HAS_ZMQ + @testset "live REQ to REP send" begin + using Sockets + + reset_zmq_state!() + server = Sockets.listen(Sockets.localhost, 0) + tcp_port = Sockets.getsockname(server)[2] + close(server) + address = "tcp://127.0.0.1:$tcp_port" + + Concore.init_zmq_port("rep", "bind", address, "REP") + Concore.init_zmq_port("req", "connect", address, "REQ") + @test Concore.zmq_ports["rep"].context === Concore.zmq_ports["req"].context + sleep(0.1) + + Concore.simtime = 4.0 + Concore.concore_write("req", "signal", [10.0, 11.0]) + result = Concore.concore_read("rep", "signal", "[0.0, 0.0, 0.0]") + + @test result == [10.0, 11.0] + @test Concore.simtime == 4.0 + reset_zmq_state!() + end + else + @testset "live ZMQ skipped" begin + @test_skip "ZMQ.jl not installed" + end + end + + reset_zmq_state!() + +end