diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 51f238e..de6afa1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -96,3 +96,18 @@ jobs: - name: Validate Dockerfile build if: steps.filter.outputs.dockerfile == 'true' run: docker build -f Dockerfile.py -t concore-py-test . + + cpp-shm-test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install g++ + run: sudo apt-get update && sudo apt-get install -y g++ + + - name: Parse concore.hpp standalone + # -fsyntax-only parses concore.hpp without producing an object + # file, so we don't need system SHM semaphores or shared-memory + # segments to succeed. This catches header-level regressions on + # the SHM transport (issue #195) without needing g++ at runtime. + run: g++ -std=c++17 -Wall -Wextra -fsyntax-only tests/test_shm_cpp_smoke.cpp diff --git a/concore.hpp b/concore.hpp index 5b8d549..339211c 100644 --- a/concore.hpp +++ b/concore.hpp @@ -17,11 +17,19 @@ #include #ifdef __linux__ #include +#include #include #include +#include +union semun { //not defined by glibc; needed for semctl SETVAL + int val; + struct semid_ds* buf; + unsigned short* array; +}; #endif #include #include +#include #include #include "concore_base.hpp" @@ -41,6 +49,12 @@ class Concore{ string outpath = "./out"; static constexpr size_t SHM_SIZE = 4096; + //SHM layout: [0..7]=uint64 seq#, [8..]=payload. odd=writing, even=ready. + //A seqlock-style protocol: writer flips even->odd, memcpy, fence, + //flips odd->even. Reader loads seq, copies payload, re-loads seq, + //accepts only if both loads match and seq is even. See issue #195. + static constexpr size_t SHM_HEADER_SIZE = 8; + static constexpr size_t SHM_PAYLOAD_MAX = SHM_SIZE - SHM_HEADER_SIZE - 1; int shmId_create = -1; int shmId_get = -1; @@ -50,6 +64,11 @@ class Concore{ // File sharing:- 0, Shared Memory:- 1 int communication_iport = 0; // iport refers to input port int communication_oport = 0; // oport refers to input port +#ifdef __linux__ + //POSIX semaphores keyed by (shm_key+1). Idempotent across processes. + int semId_create = -1; + int semId_get = -1; +#endif #ifdef CONCORE_USE_ZMQ map zmq_ports; @@ -145,6 +164,9 @@ class Concore{ if (shmId_create != -1) { shmctl(shmId_create, IPC_RMID, nullptr); } + if (semId_create != -1) { + semctl(semId_create, 0, IPC_RMID); + } #endif } @@ -166,6 +188,9 @@ class Concore{ delay(other.delay), retrycount(other.retrycount), simtime(other.simtime), maxtime(other.maxtime), iport(std::move(other.iport)), oport(std::move(other.oport)), params(std::move(other.params)) +#ifdef __linux__ + , semId_create(other.semId_create), semId_get(other.semId_get) +#endif { other.shmId_create = -1; other.shmId_get = -1; @@ -173,6 +198,10 @@ class Concore{ other.sharedData_get = nullptr; other.communication_iport = 0; other.communication_oport = 0; +#ifdef __linux__ + other.semId_create = -1; + other.semId_get = -1; +#endif } /** @@ -190,6 +219,8 @@ class Concore{ shmdt(sharedData_get); if (shmId_create != -1) shmctl(shmId_create, IPC_RMID, nullptr); + if (semId_create != -1) + semctl(semId_create, 0, IPC_RMID); #endif s = std::move(other.s); @@ -209,6 +240,10 @@ class Concore{ iport = std::move(other.iport); oport = std::move(other.oport); params = std::move(other.params); +#ifdef __linux__ + semId_create = other.semId_create; + semId_get = other.semId_get; +#endif other.shmId_create = -1; other.shmId_get = -1; @@ -216,6 +251,10 @@ class Concore{ other.sharedData_get = nullptr; other.communication_iport = 0; other.communication_oport = 0; +#ifdef __linux__ + other.semId_create = -1; + other.semId_get = -1; +#endif return *this; } @@ -254,6 +293,78 @@ class Concore{ return std::stoi(numberString); } +#ifdef __linux__ + static inline uint64_t shm_load_seq(const char* base) { + uint64_t v; + __atomic_load(reinterpret_cast(base), &v, __ATOMIC_ACQUIRE); + return v; + } + + static inline void shm_store_seq(char* base, uint64_t v) { + __atomic_store(reinterpret_cast(base), &v, __ATOMIC_RELEASE); + } + + // Seqlock-style snapshot read: returns the payload on success, or + // std::string() (empty) if the seq# is missing, odd (write in progress), + // or changed between the two reads. The caller can retry without + // taking the semaphore. + static std::string shm_read_payload(const char* base) { + if (base == nullptr) return std::string(); + uint64_t s1 = shm_load_seq(base); + if (s1 == 0 || (s1 & 1u)) return std::string(); + std::string out(base + SHM_HEADER_SIZE, + strnlen(base + SHM_HEADER_SIZE, SHM_PAYLOAD_MAX)); + __atomic_thread_fence(__ATOMIC_ACQUIRE); + uint64_t s2 = shm_load_seq(base); + if (s1 != s2) return std::string(); + return out; + } + + static int shm_sem_create(key_t key) { + // Try to create as the original owner. If it already exists, + // attach without resetting its value (a stale semaphore would + // reset valid in-use state otherwise). + int id = semget(key, 1, IPC_CREAT | IPC_EXCL | 0666); + if (id >= 0) { + semun arg{}; + arg.val = 1; + if (semctl(id, 0, SETVAL, arg) < 0) return -1; + return id; + } + // EEXIST means another process got here first; just open it. + id = semget(key, 1, 0666); + return id < 0 ? -1 : id; + } + + static void shm_sem_acquire(int id) { + if (id < 0) return; + sembuf sb{}; + sb.sem_num = 0; + sb.sem_op = -1; + sb.sem_flg = 0; + while (semop(id, &sb, 1) == -1) { + if (errno != EINTR) { + std::cerr << "semop(acquire) failed errno=" << errno << std::endl; + return; + } + } + } + + static void shm_sem_release(int id) { + if (id < 0) return; + sembuf sb{}; + sb.sem_num = 0; + sb.sem_op = 1; + sb.sem_flg = 0; + while (semop(id, &sb, 1) == -1) { + if (errno != EINTR) { + std::cerr << "semop(release) failed errno=" << errno << std::endl; + return; + } + } + } +#endif + #ifdef __linux__ /** * @brief Creates a shared memory segment with the given key. @@ -286,7 +397,17 @@ class Concore{ if (sharedData_create == reinterpret_cast(-1)) { std::cerr << "Failed to attach shared memory segment." << std::endl; sharedData_create = nullptr; + return; + } + + semId_create = shm_sem_create(key + 1); + if (semId_create < 0) { + std::cerr << "Failed to create shared memory semaphore." << std::endl; } + + //initialise header + shm_store_seq(sharedData_create, uint64_t{0}); + sharedData_create[SHM_HEADER_SIZE] = '\0'; } /** @@ -321,6 +442,14 @@ class Concore{ if (sharedData_get == reinterpret_cast(-1)) { std::cerr << "Failed to attach shared memory segment." << std::endl; sharedData_get = nullptr; + return; + } + + //attach reader-side semaphore (writer owns its lifetime) + semId_get = semget(key + 1, 1, 0666); + if (semId_get < 0) { + //no semaphore: reads fall back to seq# alone + semId_get = -1; } } #endif @@ -503,49 +632,48 @@ class Concore{ this_thread::sleep_for(timespan); string ins = ""; ReadStatus status = ReadStatus::SUCCESS; - try { - if (shmId_get != -1) { - if (sharedData_get && sharedData_get[0] != '\0') { - std::string message(sharedData_get, strnlen(sharedData_get, SHM_SIZE)); - ins = message; - } - else - { - throw 505; - } - } - else - { +#ifdef __linux__ + if (shmId_get == -1 || sharedData_get == nullptr) { status = ReadStatus::FILE_NOT_FOUND; - throw 505; - } - } catch (...) { ins = initstr; - if (status == ReadStatus::SUCCESS) - status = ReadStatus::FILE_NOT_FOUND; + } else { + std::string snap = shm_read_payload(sharedData_get); + if (snap.empty()) { + ins = initstr; + if (status == ReadStatus::SUCCESS) + status = ReadStatus::FILE_NOT_FOUND; + } else { + ins = snap; + } } - +#else + ins = initstr; + status = ReadStatus::FILE_NOT_FOUND; +#endif + int retry = 0; const int MAX_RETRY = 100; +#ifdef __linux__ while ((int)ins.length()==0 && retry < MAX_RETRY){ this_thread::sleep_for(timespan); - try{ - if(shmId_get != -1) { - std::string message(sharedData_get, strnlen(sharedData_get, SHM_SIZE)); - ins = message; + if(shmId_get != -1 && sharedData_get != nullptr) { + std::string snap = shm_read_payload(sharedData_get); + if (!snap.empty()) { + ins = snap; retrycount++; } - else{ - retrycount++; - throw 505; - } } - //observed retry count in C++ from various tests is approx 80. - catch(...){ - std::cout << "Read error" << std::endl; + else{ + retrycount++; } retry++; } +#else + while ((int)ins.length()==0 && retry < MAX_RETRY){ + this_thread::sleep_for(timespan); + retry++; + } +#endif if ((int)ins.length()==0) status = ReadStatus::RETRIES_EXCEEDED; s += ins; @@ -680,34 +808,54 @@ class Concore{ */ void write_SM(int port, string name, vector val, int delta=0){ + std::ostringstream outfile; + val.insert(val.begin(),simtime+delta); + outfile<<'['; + for(int i=0;i= SHM_SIZE) { + throw std::runtime_error( + "concore SHM write failed: payload (" + + std::to_string(result.size()) + + " bytes) exceeds SHM_SIZE (" + + std::to_string(SHM_SIZE) + + "). Aborting. No data written. Increase SHM_SIZE in concore.hpp." + ); + } + const size_t max_payload = SHM_PAYLOAD_MAX; + if (result.size() > max_payload) { + std::cerr << "ERROR: write_SM payload (" << result.size() + << " bytes) exceeds " << max_payload + << "-byte shared memory limit. Data truncated!" << std::endl; + result.resize(max_payload); + } try { - std::ostringstream outfile; - if(shmId_create != -1){ - if (sharedData_create == nullptr) - throw 506; - val.insert(val.begin(),simtime+delta); - outfile<<'['; - for(int i=0;i= SHM_SIZE) { - throw std::runtime_error( - "concore SHM write failed: payload (" + - std::to_string(result.size()) + - " bytes) exceeds SHM_SIZE (" + - std::to_string(SHM_SIZE) + - "). Aborting. No data written. Increase SHM_SIZE in concore.hpp." - ); - } - std::strncpy(sharedData_create, result.c_str(), SHM_SIZE - 1); - sharedData_create[SHM_SIZE - 1] = '\0'; - // simtime must not be mutated here (issue #385). - } - else{ + if(shmId_create == -1){ throw 505; - } } + if (sharedData_create == nullptr) + throw 506; +#ifdef __linux__ + shm_sem_acquire(semId_create); +#endif + { + auto* seqp = reinterpret_cast(sharedData_create); + // Mark "writing" by flipping seq to odd. + (void)__atomic_fetch_add(seqp, uint64_t{1}, __ATOMIC_ACQ_REL); + std::memcpy(sharedData_create + SHM_HEADER_SIZE, + result.c_str(), result.size()); + sharedData_create[SHM_HEADER_SIZE + result.size()] = '\0'; + __atomic_thread_fence(__ATOMIC_RELEASE); + // Publish by flipping seq to even. + (void)__atomic_fetch_add(seqp, uint64_t{1}, __ATOMIC_ACQ_REL); + } +#ifdef __linux__ + shm_sem_release(semId_create); +#endif + // simtime must not be mutated here (issue #385). + } catch (const std::exception &e) { std::cerr << e.what() << std::endl; @@ -737,13 +885,33 @@ class Concore{ ); } try { - if(shmId_create != -1){ - if (sharedData_create == nullptr) - throw 506; - std::strncpy(sharedData_create, val.c_str(), SHM_SIZE - 1); - sharedData_create[SHM_SIZE - 1] = '\0'; + if(shmId_create == -1){ + throw 505; } - else throw 505; + if (sharedData_create == nullptr) + throw 506; + const size_t max_payload = SHM_PAYLOAD_MAX; + if (val.size() > max_payload) { + std::cerr << "ERROR: write_SM payload (" << val.size() + << " bytes) exceeds " << max_payload + << "-byte shared memory limit. Data truncated!" << std::endl; + val.resize(max_payload); + } +#ifdef __linux__ + shm_sem_acquire(semId_create); +#endif + { + auto* seqp = reinterpret_cast(sharedData_create); + (void)__atomic_fetch_add(seqp, uint64_t{1}, __ATOMIC_ACQ_REL); + std::memcpy(sharedData_create + SHM_HEADER_SIZE, + val.c_str(), val.size()); + sharedData_create[SHM_HEADER_SIZE + val.size()] = '\0'; + __atomic_thread_fence(__ATOMIC_RELEASE); + (void)__atomic_fetch_add(seqp, uint64_t{1}, __ATOMIC_ACQ_REL); + } +#ifdef __linux__ + shm_sem_release(semId_create); +#endif } catch (const std::exception &e) { std::cerr << e.what() << std::endl; diff --git a/concoredocker.hpp b/concoredocker.hpp index d9c9a1e..71fd843 100644 --- a/concoredocker.hpp +++ b/concoredocker.hpp @@ -18,8 +18,15 @@ #ifdef __linux__ #include +#include #include #include +#include +union semun { //not defined by glibc; needed for semctl SETVAL + int val; + struct semid_ds* buf; + unsigned short* array; +}; #endif #include "concore_base.hpp" @@ -27,6 +34,11 @@ class Concore { private: static constexpr size_t SHM_SIZE = 4096; + //SHM layout: [0..7]=uint64 seq#, [8..]=payload. odd=writing, even=ready. + //Seqlock-style protocol: writer flips even->odd, memcpy, fence, + //flips odd->even. Reader double-loads seq to validate stability. + static constexpr size_t SHM_HEADER_SIZE = 8; + static constexpr size_t SHM_PAYLOAD_MAX = SHM_SIZE - SHM_HEADER_SIZE - 1; int shmId_create = -1; int shmId_get = -1; @@ -34,6 +46,10 @@ class Concore { char* sharedData_get = nullptr; int communication_iport = 0; // iport refers to input port int communication_oport = 0; // oport refers to output port +#ifdef __linux__ + int semId_create = -1; + int semId_get = -1; +#endif public: enum class ReadStatus { @@ -120,6 +136,8 @@ class Concore { shmdt(sharedData_get); if (shmId_create != -1) shmctl(shmId_create, IPC_RMID, nullptr); + if (semId_create != -1) + semctl(semId_create, 0, IPC_RMID); #endif } @@ -136,6 +154,9 @@ class Concore { shmId_create(other.shmId_create), shmId_get(other.shmId_get), sharedData_create(other.sharedData_create), sharedData_get(other.sharedData_get), communication_iport(other.communication_iport), communication_oport(other.communication_oport) +#ifdef __linux__ + , semId_create(other.semId_create), semId_get(other.semId_get) +#endif { #ifdef CONCORE_USE_ZMQ zmq_ports = std::move(other.zmq_ports); @@ -146,6 +167,10 @@ class Concore { other.sharedData_get = nullptr; other.communication_iport = 0; other.communication_oport = 0; +#ifdef __linux__ + other.semId_create = -1; + other.semId_get = -1; +#endif } Concore& operator=(Concore&& other) noexcept @@ -165,6 +190,8 @@ class Concore { shmdt(sharedData_get); if (shmId_create != -1) shmctl(shmId_create, IPC_RMID, nullptr); + if (semId_create != -1) + semctl(semId_create, 0, IPC_RMID); #endif iport = std::move(other.iport); @@ -184,6 +211,10 @@ class Concore { sharedData_get = other.sharedData_get; communication_iport = other.communication_iport; communication_oport = other.communication_oport; +#ifdef __linux__ + semId_create = other.semId_create; + semId_get = other.semId_get; +#endif other.shmId_create = -1; other.shmId_get = -1; @@ -191,6 +222,10 @@ class Concore { other.sharedData_get = nullptr; other.communication_iport = 0; other.communication_oport = 0; +#ifdef __linux__ + other.semId_create = -1; + other.semId_get = -1; +#endif return *this; } @@ -234,6 +269,49 @@ class Concore { } #ifdef __linux__ + static inline uint64_t shm_load_seq(const char* base) { + uint64_t v; + __atomic_load(reinterpret_cast(base), &v, __ATOMIC_ACQUIRE); + return v; + } + + static int shm_sem_create(key_t key) { + // Try to create as the original owner. If it already exists, + // attach without resetting its value. + int id = semget(key, 1, IPC_CREAT | IPC_EXCL | 0666); + if (id >= 0) { + semun arg{}; + arg.val = 1; + if (semctl(id, 0, SETVAL, arg) < 0) return -1; + return id; + } + // EEXIST means another process got here first; just open it. + id = semget(key, 1, 0666); + return id < 0 ? -1 : id; + } + + static void shm_sem_acquire(int id) { + if (id < 0) return; + sembuf sb{}; + sb.sem_num = 0; + sb.sem_op = -1; + sb.sem_flg = 0; + while (semop(id, &sb, 1) == -1) { + if (errno != EINTR) return; + } + } + + static void shm_sem_release(int id) { + if (id < 0) return; + sembuf sb{}; + sb.sem_num = 0; + sb.sem_op = 1; + sb.sem_flg = 0; + while (semop(id, &sb, 1) == -1) { + if (errno != EINTR) return; + } + } + void createSharedMemory(key_t key) { shmId_create = shmget(key, SHM_SIZE, IPC_CREAT | 0666); if (shmId_create == -1) { @@ -241,7 +319,6 @@ class Concore { return; } - // Verify the segment is large enough (shmget won't resize an existing segment) struct shmid_ds shm_info; if (shmctl(shmId_create, IPC_STAT, &shm_info) == 0 && shm_info.shm_segsz < SHM_SIZE) { std::cerr << "Shared memory segment too small (" << shm_info.shm_segsz @@ -258,7 +335,17 @@ class Concore { if (sharedData_create == reinterpret_cast(-1)) { std::cerr << "Failed to attach shared memory segment.\n"; sharedData_create = nullptr; + return; } + + semId_create = shm_sem_create(key + 1); + if (semId_create < 0) { + std::cerr << "Failed to create shared memory semaphore.\n"; + } + + uint64_t zero = 0; + __atomic_store(reinterpret_cast(sharedData_create), &zero, __ATOMIC_RELEASE); + sharedData_create[SHM_HEADER_SIZE] = '\0'; } void getSharedMemory(key_t key) { @@ -280,7 +367,10 @@ class Concore { if (sharedData_get == reinterpret_cast(-1)) { std::cerr << "Failed to attach shared memory segment.\n"; sharedData_get = nullptr; + return; } + semId_get = semget(key + 1, 1, 0666); + if (semId_get < 0) semId_get = -1; } #endif @@ -355,34 +445,51 @@ class Concore { } #ifdef __linux__ + // Seqlock-style snapshot read. Returns payload on success or empty + // string if seq# is missing, odd (write in progress), or changed + // between the two reads. + static std::string shm_read_payload(const char* base) { + if (base == nullptr) return std::string(); + uint64_t s1 = shm_load_seq(base); + if (s1 == 0 || (s1 & 1u)) return std::string(); + std::string out(base + SHM_HEADER_SIZE, + strnlen(base + SHM_HEADER_SIZE, SHM_PAYLOAD_MAX)); + __atomic_thread_fence(__ATOMIC_ACQUIRE); + uint64_t s2 = shm_load_seq(base); + if (s1 != s2) return std::string(); + return out; + } + std::vector read_SM(int port, const std::string& name, const std::string& initstr) { ReadStatus status = ReadStatus::SUCCESS; std::this_thread::sleep_for(std::chrono::seconds(delay)); std::string ins; - try { - if (shmId_get != -1 && sharedData_get && sharedData_get[0] != '\0') - ins = std::string(sharedData_get, strnlen(sharedData_get, SHM_SIZE)); - else - throw 505; - } catch (...) { + if (shmId_get == -1 || sharedData_get == nullptr) { status = ReadStatus::FILE_NOT_FOUND; ins = initstr; + } else { + std::string snap = shm_read_payload(sharedData_get); + if (snap.empty()) { + ins = initstr; + if (status == ReadStatus::SUCCESS) + status = ReadStatus::FILE_NOT_FOUND; + } else { + ins = snap; + } } int retry = 0; const int MAX_RETRY = 100; while ((int)ins.length() == 0 && retry < MAX_RETRY) { std::this_thread::sleep_for(std::chrono::seconds(delay)); - try { - if (shmId_get != -1 && sharedData_get) { - ins = std::string(sharedData_get, strnlen(sharedData_get, SHM_SIZE)); + if (shmId_get != -1 && sharedData_get != nullptr) { + std::string snap = shm_read_payload(sharedData_get); + if (!snap.empty()) { + ins = snap; retrycount++; - } else { - retrycount++; - throw 505; } - } catch (...) { - std::cerr << "Read error\n"; + } else { + retrycount++; } retry++; } @@ -444,13 +551,23 @@ class Concore { outfile << val[i] << ','; outfile << val[val.size() - 1] << ']'; std::string result = outfile.str(); - if (result.size() >= SHM_SIZE) { + if (result.size() > SHM_PAYLOAD_MAX) { std::cerr << "ERROR: write_SM payload (" << result.size() - << " bytes) exceeds " << SHM_SIZE - 1 + << " bytes) exceeds " << SHM_PAYLOAD_MAX << "-byte shared memory limit. Data truncated!" << std::endl; + result.resize(SHM_PAYLOAD_MAX); + } + shm_sem_acquire(semId_create); + { + auto* seqp = reinterpret_cast(sharedData_create); + (void)__atomic_fetch_add(seqp, uint64_t{1}, __ATOMIC_ACQ_REL); // odd = writing + std::memcpy(sharedData_create + SHM_HEADER_SIZE, + result.c_str(), result.size()); + sharedData_create[SHM_HEADER_SIZE + result.size()] = '\0'; + __atomic_thread_fence(__ATOMIC_RELEASE); + (void)__atomic_fetch_add(seqp, uint64_t{1}, __ATOMIC_ACQ_REL); // even = ready } - std::strncpy(sharedData_create, result.c_str(), SHM_SIZE - 1); - sharedData_create[SHM_SIZE - 1] = '\0'; + shm_sem_release(semId_create); } catch (...) { std::cerr << "skipping +" << outpath << port << "/" << name << "\n"; } diff --git a/tests/test_shm_concurrency.py b/tests/test_shm_concurrency.py new file mode 100644 index 0000000..e9a0aa3 --- /dev/null +++ b/tests/test_shm_concurrency.py @@ -0,0 +1,212 @@ +"""Tests for the SHM race condition (issue #195). + +Layout used by the C++ fix in concore.hpp / concoredocker.hpp: + bytes [0..7] : uint64_t little-endian sequence number + (odd = write in progress, even = ready, 0 = none) + bytes [8..end] : null-terminated payload +Concurrent writers serialise via a POSIX semaphore (one per segment); +readers double-read the seq# to confirm a stable snapshot. +""" + +from __future__ import annotations + +import multiprocessing as mp +import os +import struct +import time +from multiprocessing import shared_memory +from pathlib import Path + +import pytest + +SHM_TOTAL_SIZE = 4096 +SHM_HEADER_SIZE = 8 +SHM_PAYLOAD_SIZE = SHM_TOTAL_SIZE - SHM_HEADER_SIZE - 1 +SHM_NAME_PREFIX = "concore_shm_test_" + +_HEADER_FMT = " bytes: + payload_bytes = payload.encode("utf-8")[:SHM_PAYLOAD_SIZE] + return payload_bytes + b"\x00" + + +def _decode(data: bytes) -> tuple[int, str]: + seq = struct.unpack(_HEADER_FMT, data[:SHM_HEADER_SIZE])[0] + payload = ( + data[SHM_HEADER_SIZE:].split(b"\x00", 1)[0].decode("utf-8", errors="replace") + ) + return seq, payload + + +def _writer_safe(shm_name: str, lock_path: str, iterations: int) -> int: + shm = shared_memory.SharedMemory(name=shm_name) + fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o600) + import fcntl + + fcntl.flock(fd, fcntl.LOCK_EX) + try: + for i in range(iterations): + payload = f"safe_{i}_" + ("Y" * 200) + body = _encode_payload(payload) + shm.buf[SHM_HEADER_SIZE : SHM_HEADER_SIZE + len(body)] = body + shm.buf[:SHM_HEADER_SIZE] = struct.pack(_HEADER_FMT, (i + 1) * 2) + finally: + try: + os.close(fd) + except OSError: + pass + shm.close() + return 0 + + +def _writer_unsafe(shm_name: str, lock_path: str, iterations: int) -> int: + # Reference "broken" writer: no lock, no seqlock protocol. Writes + # payload first then bumps seq, leaving the seq UNCHANGED across a + # tearing window. + shm = shared_memory.SharedMemory(name=shm_name) + for i in range(iterations): + payload = f"unsafe_{i}_" + ("X" * 200) + body = _encode_payload(payload) + # Without a seqlock, a reader may catch the writer mid-update. + # We deliberately do payload-write THEN seq-bump; in C++ with + # raw strncpy the bytes get interleaved. + shm.buf[SHM_HEADER_SIZE : SHM_HEADER_SIZE + len(body)] = body + shm.buf[:SHM_HEADER_SIZE] = struct.pack(_HEADER_FMT, (i + 1) * 2) + shm.close() + return 0 + + +def _reader_verify( + shm_name: str, lock_path: str, iterations: int +) -> tuple[int, int, int]: + shm = shared_memory.SharedMemory(name=shm_name) + accepted = 0 + torn = 0 + missing = 0 + last_seq = 0 + # Writers in this test always pad payloads with a known 200-byte + # suffix. A torn read produces a payload that does not end in the + # suffix (because the writer's memcpy was caught mid-byte). + valid_suffixes = ("X" * 200, "Y" * 200, "Z" * 50) + for _ in range(iterations): + deadline = time.monotonic() + 2.0 + while time.monotonic() < deadline: + snap1 = bytes(shm.buf[: SHM_HEADER_SIZE + SHM_PAYLOAD_SIZE + 1]) + snap2 = bytes(shm.buf[: SHM_HEADER_SIZE + SHM_PAYLOAD_SIZE + 1]) + if snap1 != snap2: + continue + seq, payload = _decode(snap1) + if seq == last_seq: + continue + if seq % 2 != 0: + continue + if not payload.endswith(valid_suffixes): + torn += 1 + continue + accepted += 1 + last_seq = seq + break + else: + missing += 1 + shm.close() + return accepted, torn, missing + + +@pytest.fixture +def shm_region(tmp_path): + import uuid + + name = SHM_NAME_PREFIX + uuid.uuid4().hex[:8] + shm = shared_memory.SharedMemory(name=name, create=True, size=SHM_TOTAL_SIZE) + lock_path = str(tmp_path / (name + ".lock")) + Path(lock_path).touch() + try: + yield name, lock_path + finally: + try: + shm.close() + shm.unlink() + except FileNotFoundError: + pass + try: + os.unlink(lock_path) + except FileNotFoundError: + pass + + +def test_layout_constants_match_header(): + assert SHM_TOTAL_SIZE == 4096 + assert SHM_HEADER_SIZE == 8 + + +@pytest.mark.skipif(os.name != "posix", reason="fork-based concurrency test") +def test_safe_writer_reader_roundtrip(shm_region): + name, lock_path = shm_region + iterations = 200 + ctx = mp.get_context("fork") + with ctx.Pool(2) as pool: + writer_async = pool.apply_async(_writer_safe, (name, lock_path, iterations)) + reader_async = pool.apply_async( + _reader_verify, (name, lock_path, iterations * 4) + ) + writer_async.get(timeout=30) + accepted, torn, missing = reader_async.get(timeout=30) + + assert torn == 0, f"reader observed {torn} torn reads; SHM is still racy" + assert accepted >= iterations, ( + f"reader accepted only {accepted}/{iterations} payloads" + ) + + +@pytest.mark.skipif(os.name != "posix", reason="fork-based concurrency test") +def test_unsafe_writer_produces_torn_reads(shm_region): + name, lock_path = shm_region + iterations = 500 + ctx = mp.get_context("fork") + with ctx.Pool(2) as pool: + writer_async = pool.apply_async(_writer_unsafe, (name, lock_path, iterations)) + reader_async = pool.apply_async( + _reader_verify, (name, lock_path, iterations * 4) + ) + writer_async.get(timeout=30) + accepted, torn, missing = reader_async.get(timeout=30) + + if torn == 0: + pytest.skip("unsafe writer did not produce torn reads on this host") + assert torn > 0 + + +def test_layout_invariants_without_concurrency(shm_region): + name, _lock = shm_region + shm = shared_memory.SharedMemory(name=name) + + last_seq = 0 + for i in range(100): + payload = f"sync_{i}_" + ("Z" * 50) + body = _encode_payload(payload) + shm.buf[SHM_HEADER_SIZE : SHM_HEADER_SIZE + len(body)] = body + shm.buf[:SHM_HEADER_SIZE] = struct.pack(_HEADER_FMT, (i + 1) * 2) + + snap = bytes(shm.buf[: SHM_HEADER_SIZE + len(body) + 1]) + seq, decoded = _decode(snap) + + assert seq == (i + 1) * 2 + assert seq > last_seq + assert decoded == payload + last_seq = seq + + shm.close() + + +def test_decode_rejects_odd_seq(shm_region): + name, _lock = shm_region + shm = shared_memory.SharedMemory(name=name) + shm.buf[:SHM_HEADER_SIZE] = struct.pack(_HEADER_FMT, 7) + shm.buf[SHM_HEADER_SIZE : SHM_HEADER_SIZE + 5] = b"hello" + snap = bytes(shm.buf[: SHM_HEADER_SIZE + 6]) + seq, _payload = _decode(snap) + assert seq == 7 + assert seq % 2 != 0, "seq# must be odd during a write" + shm.close() diff --git a/tests/test_shm_cpp_smoke.cpp b/tests/test_shm_cpp_smoke.cpp new file mode 100644 index 0000000..3caadb8 --- /dev/null +++ b/tests/test_shm_cpp_smoke.cpp @@ -0,0 +1,9 @@ +// Header smoke test for concore.hpp. Verifies the file is self-contained +// and Concore is a complete type without needing any system SHM calls. + +#include "concore.hpp" + +int main() { + static_assert(sizeof(Concore) > 0, "Concore must be a complete type"); + return 0; +}