Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 239 additions & 1 deletion concore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@
# Uses relative paths (./in, ./out) for local execution.
# For Docker containers, use concoredocker.jl instead.
#
# No external dependencies -- only Julia stdlib.
# No hard external dependencies; ZMQ.jl is optional.

module Concore

using Mmap

const HAS_ZMQ = try
@eval import ZMQ
true
catch
false
end

# -----------------------------------------------------------------------------
# Backend selection
# -----------------------------------------------------------------------------
Expand All @@ -35,9 +42,13 @@ struct MmapBackend <: AbstractBackend
end
end

"""ZeroMQ transport marker backend."""
struct ZmqBackend <: AbstractBackend end

# Compatibility name for proposal/docs wording.
const FileTransport = FileBackend
const MmapTransport = MmapBackend
const ZmqTransport = ZmqBackend

# -----------------------------------------------------------------------------
# Path configuration
Expand Down Expand Up @@ -97,10 +108,189 @@ _backend_inpath(::FileBackend) = inpath
_backend_outpath(::FileBackend) = outpath
_backend_inpath(::MmapBackend) = inpath
_backend_outpath(::MmapBackend) = outpath
_backend_inpath(::ZmqBackend) = "zmq://in"
_backend_outpath(::ZmqBackend) = "zmq://out"

_input_dir(port::Int) = _backend_inpath(_backend) * string(port)
_output_dir(port::Int) = _backend_outpath(_backend) * string(port)

mutable struct ZeroMQPort
context::Any
socket::Any
port_type::String
address::String
socket_type::String
end

const zmq_ports = Dict{String, ZeroMQPort}()
const _zmq_context = Ref{Any}(nothing)
const _zmq_cleanup_registered = Ref(false)

function _require_zmq()
HAS_ZMQ && return nothing
error("ZMQ.jl is not installed. Install it with: using Pkg; Pkg.add(\"ZMQ\")")
end

function _zmq_socket_type(socket_type::AbstractString)
_require_zmq()
upper = uppercase(String(socket_type))
for name in ("REQ", "REP", "PUB", "SUB", "PUSH", "PULL", "PAIR")
upper == name && return getfield(ZMQ, Symbol(name))
end
error("unknown ZMQ socket type '$socket_type'")
end

function _register_zmq_cleanup()
if !_zmq_cleanup_registered[]
atexit(terminate_zmq)
_zmq_cleanup_registered[] = true
end
end

function _get_zmq_context()
_require_zmq()
if _zmq_context[] === nothing
_zmq_context[] = ZMQ.Context()
end
return _zmq_context[]
end

function _configure_zmq_socket(socket)
ZMQ.set_rcvtimeo(socket, 2000)
ZMQ.set_sndtimeo(socket, 2000)
ZMQ.set_linger(socket, 0)
return nothing
end

function _zmq_send(socket, payload::AbstractString; max_retries::Int = 5)
_require_zmq()
max_retries > 0 || throw(ArgumentError("max_retries must be positive"))
for attempt in 1:max_retries
try
ZMQ.send(socket, String(payload))
return nothing
catch
attempt == max_retries && rethrow()
sleep(0.5)
end
end
end

function _zmq_recv(socket; max_retries::Int = 5)::String
_require_zmq()
max_retries > 0 || throw(ArgumentError("max_retries must be positive"))
for attempt in 1:max_retries
try
return String(ZMQ.recv(socket))
catch
attempt == max_retries && rethrow()
sleep(0.5)
end
end
error("ZMQ receive failed")
end

function _zmq_payload(val::AbstractVector{<:Real}, delta::Real)::String
return _format_wire(Float64[simtime + Float64(delta); Float64.(val)])
end

function _zmq_payload(val::AbstractString, delta::Real)::String
return String(val)
end

function _close_zmq_port(port::ZeroMQPort)
try
ZMQ.close(port.socket)
catch
end
return nothing
end

function init_zmq_port(
port_name::AbstractString,
port_type::AbstractString,
address::AbstractString,
socket_type::AbstractString,
)
_require_zmq()
_register_zmq_cleanup()

ptype = lowercase(String(port_type))
(ptype == "bind" || ptype == "connect") || error("port_type must be bind or connect")

if haskey(zmq_ports, String(port_name))
return nothing
end

ctx = _get_zmq_context()
socket = ZMQ.Socket(ctx, _zmq_socket_type(socket_type))
_configure_zmq_socket(socket)

if ptype == "bind"
ZMQ.bind(socket, String(address))
else
ZMQ.connect(socket, String(address))
end

zmq_ports[String(port_name)] = ZeroMQPort(
ctx,
socket,
ptype,
String(address),
uppercase(String(socket_type)),
)
return nothing
end

function init_zmq_port(port_name, port_type::Symbol, address, socket_type::Symbol)
init_zmq_port(string(port_name), string(port_type), string(address), string(socket_type))
end

function _zmq_read(port_name::AbstractString, initstr::AbstractString)::Vector{Float64}
_require_zmq()
port = get(zmq_ports, String(port_name), nothing)
port === nothing && error("ZMQ port '$port_name' not registered")

global simtime
try
msg = _zmq_recv(port.socket)
vals = safe_parse_list(msg)
simtime = max(simtime, vals[1])
return vals[2:end]
catch
return initval(initstr)
end
end

function _zmq_write(
port_name::AbstractString,
val::Union{AbstractVector{<:Real},AbstractString},
delta::Real,
)
_require_zmq()
port = get(zmq_ports, String(port_name), nothing)
port === nothing && error("ZMQ port '$port_name' not registered")

payload = _zmq_payload(val, delta)
_zmq_send(port.socket, payload)
return nothing
end

function terminate_zmq()
for port in values(zmq_ports)
_close_zmq_port(port)
end
empty!(zmq_ports)
if _zmq_context[] !== nothing
try
ZMQ.close(_zmq_context[])
catch
end
_zmq_context[] = nothing
end
return nothing
end

const _mmap_segments = Dict{String, Tuple{IOStream, Vector{UInt8}}}()
const _mmap_cleanup_registered = Ref(false)

Expand Down Expand Up @@ -498,6 +688,21 @@ function concore_read(
return val[2:end]
end

function concore_read(
port_identifier::AbstractString,
name::AbstractString,
initstr::AbstractString,
)::Vector{Float64}
if haskey(zmq_ports, String(port_identifier))
return _zmq_read(port_identifier, initstr)
end

file_port = tryparse(Int, port_identifier)
file_port !== nothing && return concore_read(file_port, name, initstr)

error("ZMQ port '$port_identifier' not registered")
end

"""
concore_write(port::Int, name::AbstractString, val::AbstractVector{<:Real}; delta::Real=0)

Expand Down Expand Up @@ -525,6 +730,22 @@ function concore_write(
return nothing
end

function concore_write(
port_identifier::AbstractString,
name::AbstractString,
val::AbstractVector{<:Real};
delta::Real = 0,
)
if haskey(zmq_ports, String(port_identifier))
return _zmq_write(port_identifier, val, delta)
end

file_port = tryparse(Int, port_identifier)
file_port !== nothing && return concore_write(file_port, name, val; delta=delta)

error("ZMQ port '$port_identifier' not registered")
end

"""
concore_write(port::Int, name::AbstractString, val::AbstractString; delta::Int=0)

Expand All @@ -547,6 +768,22 @@ function concore_write(
return nothing
end

function concore_write(
port_identifier::AbstractString,
name::AbstractString,
val::AbstractString;
delta::Int = 0,
)
if haskey(zmq_ports, String(port_identifier))
return _zmq_write(port_identifier, val, delta)
end

file_port = tryparse(Int, port_identifier)
file_port !== nothing && return concore_write(file_port, name, val; delta=delta)

error("ZMQ port '$port_identifier' not registered")
end

"""
initval(simtime_val::AbstractString) -> Vector{Float64}

Expand Down Expand Up @@ -617,6 +854,7 @@ export load_iport!, load_oport!, load_params!, concore_init!
export load_iport, load_oport, load_params, default_maxtime, concore_init
export AbstractBackend, FileBackend, FileTransport
export MmapBackend, MmapTransport, mmap_cleanup
export ZmqBackend, ZmqTransport, HAS_ZMQ, init_zmq_port, terminate_zmq, zmq_ports

# -----------------------------------------------------------------------------
# Auto-initialize on load
Expand Down
1 change: 1 addition & 0 deletions tests/julia/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ using .Concore
include("test_wire_compat.jl")
include("test_interop.jl")
include("test_mmap.jl")
include("test_zmq.jl")
end
Loading
Loading