Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cecli/coders/agent_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,11 @@ def get_background_command_output(self):
command_str = command_info.get(command_key, {}).get("command", command_key)
output += f"\n[bg: {command_str}]\n{cmd_output}\n"

# Clean up stale (finished) background commands after reading their output
for command_key, info in command_info.items():
if not info.get("running", False):
BackgroundCommandManager.stop_background_command(command_key)

return output

def get_git_status(self):
Expand Down
14 changes: 7 additions & 7 deletions cecli/coders/base_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,9 +876,8 @@ def get_announcements(self):
env_items.append(f"{rel_repo_dir} ({num_files:,} files)")
if num_files > 1000:
env_items.append(
"Warning: For large repos, consider using --subtree-only and .cecli_ignore"
"Warning: For large repos, consider using --subtree-only and .cecli.ignore"
)
env_items.append(f"See: {urls.large_repos}")
else:
env_items.append("no git repo")

Expand Down Expand Up @@ -4024,7 +4023,9 @@ def compute_costs_from_tokens(
input_cost_per_token = self.get_active_model().info.get("input_cost_per_token") or 0
output_cost_per_token = self.get_active_model().info.get("output_cost_per_token") or 0
input_cost_per_token_cache_hit = (
self.get_active_model().info.get("input_cost_per_token_cache_hit") or 0
self.get_active_model().info.get("input_cost_per_token_cache_hit")
or self.get_active_model().info.get("cache_read_input_token_cost")
or 0
)

# deepseek
Expand All @@ -4036,14 +4037,13 @@ def compute_costs_from_tokens(
# == total tokens that were

if input_cost_per_token_cache_hit:
# must be deepseek
cost += input_cost_per_token_cache_hit * cache_hit_tokens
cost += (prompt_tokens - input_cost_per_token_cache_hit) * input_cost_per_token
cost += cache_hit_tokens * input_cost_per_token_cache_hit
cost += (prompt_tokens - cache_hit_tokens) * input_cost_per_token
else:
# hard code the anthropic adjustments, no-ops for other models since cache_x_tokens==0
cost += cache_write_tokens * input_cost_per_token * 1.25
cost += cache_hit_tokens * input_cost_per_token * 0.10
cost += prompt_tokens * input_cost_per_token
cost += (prompt_tokens - cache_hit_tokens) * input_cost_per_token

cost += completion_tokens * output_cost_per_token
return cost
Expand Down
171 changes: 144 additions & 27 deletions cecli/helpers/background_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
in the background and capturing their output for injection into chat streams.
"""

import codecs
import os
import platform
import subprocess
Expand Down Expand Up @@ -171,31 +170,86 @@ def _start_output_reader(self) -> None:
"""Start thread to read process output."""

def reader():
try:
# Simple approach: read lines when available
# This will block on readline(), but that's OK because
# we're in a separate thread and the buffer will capture
# output as soon as it's available
import re

# Regex to strip ANSI terminal escape sequences
_ansi_escape = re.compile(
r"\x1b\[[0-9;]*[a-zA-Z]|\x1b[\]^_]|[\x1b\x9b][][()#;?]*"
r"(?:[0-9]{1,4}(?:;[0-9]{0,4})*)?[0-9A-PRZcf-nqry=><~]"
)

def _strip_ansi(text: str) -> str:
return _ansi_escape.sub("", text)

try:
if self.master_fd is not None:
while not self._stop_event.is_set():
try:
data = os.read(self.master_fd, 4096).decode(errors="replace")
if not data:
break
self.buffer.append(data)
# Strip ANSI escape sequences from PTY output
self.buffer.append(_strip_ansi(data))
except (OSError, EOFError):
break
else:
# Read stdout
for line in iter(self.process.stdout.readline, ""):
if line:
self.buffer.append(line)
has_stdout_fileno = hasattr(self.process.stdout, "fileno")
if has_stdout_fileno:
os.set_blocking(self.process.stdout.fileno(), False)
while not self._stop_event.is_set():
try:
if has_stdout_fileno:
# Use os.read() instead of readline() to capture
# partial line output (e.g. REPL prompts without newlines)
data = os.read(self.process.stdout.fileno(), 4096).decode(
errors="replace"
)
else:
# Fallback to readline when fileno() is unavailable
# (e.g. mock objects in tests)
data = self.process.stdout.readline()
if data:
self.buffer.append(data)
else:
# Check if process died
if not self.is_alive():
if has_stdout_fileno:
# Read any remaining data
try:
remaining = os.read(
self.process.stdout.fileno(), 4096
).decode(errors="replace")
if remaining:
self.buffer.append(remaining)
except (OSError, EOFError):
pass
break
import time

time.sleep(0.05)
except (OSError, EOFError, ValueError):
if not self.is_alive():
break
import time

time.sleep(0.05)

# Read stderr
for line in iter(self.process.stderr.readline, ""):
if line:
self.buffer.append(line)
# Also capture stderr (best-effort, non-blocking)
if hasattr(self.process.stderr, "fileno"):
try:
os.set_blocking(self.process.stderr.fileno(), False)
while True:
try:
err_data = os.read(self.process.stderr.fileno(), 4096).decode(
errors="replace"
)
if not err_data:
break
self.buffer.append(err_data)
except (OSError, EOFError):
break
except Exception:
pass

except Exception as e:
self.buffer.append(f"\n[Error reading process output: {str(e)}]\n")
Expand Down Expand Up @@ -369,6 +423,7 @@ def start_background_command(
persist: bool = False,
existing_input_buffer: Optional[InputBuffer] = None,
use_pty: bool = False,
master_fd: Optional[int] = None,
) -> str:
"""
Start a command in background.
Expand All @@ -390,9 +445,16 @@ def start_background_command(
buffer = existing_buffer or CircularBuffer(max_size=max_buffer_size)

# Use existing process or start new one
master_fd = None
if use_pty and HAS_PTY and platform.system() != "Windows":
master_fd, slave_fd = pty.openpty()
# Use provided master_fd (e.g., from _execute_with_timeout) or default to None
final_master_fd = master_fd

# Only create a new PTY if no external master_fd was provided
# (prevents overwriting an externally-created PTY fd)
can_use_pty = (
use_pty and master_fd is None and HAS_PTY and platform.system() != "Windows"
)
if can_use_pty:
final_master_fd, slave_fd = pty.openpty()

# Disable echo on the slave PTY
attr = termios.tcgetattr(slave_fd)
Expand All @@ -415,8 +477,13 @@ def start_background_command(
elif existing_process:
process = existing_process
else:
# When PTY was requested but isn't available (e.g. Windows),
# wrap with stdbuf -oL to force line-buffered output at the libc level
resolved_command = (
cls._wrap_line_buffered(command) if use_pty and not HAS_PTY else command
)
process = subprocess.Popen(
command,
resolved_command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
Expand All @@ -434,7 +501,7 @@ def start_background_command(
buffer,
persist=persist,
input_buffer=existing_input_buffer,
master_fd=master_fd,
master_fd=final_master_fd,
)

# Generate unique key and store
Expand Down Expand Up @@ -519,13 +586,8 @@ def send_command_input(cls, command_key: str, text: str) -> bool:
bg_process = cls._background_commands.get(command_key)
if not bg_process:
return False
# Decode escape sequences (like \x1b) if present in the string
try:
text = codecs.decode(text, "unicode_escape")
except Exception:
pass
bg_process.send_input(text)
return True
bg_process.send_input(text)
return True

@classmethod
def get_all_command_outputs(cls, clear: bool = False) -> Dict[str, str]:
Expand Down Expand Up @@ -625,6 +687,61 @@ def list_background_commands(cls) -> Dict[str, Dict[str, any]]:
}
return result

_line_buffered_tool = None

@staticmethod
def _detect_line_buffered_tool() -> str:
"""
Detect the best available tool for forcing line-buffered stdout.

Checks for `stdbuf` (GNU coreutils, most Linux distros) and falls
back to `unbuffer` (expect package, available on many systems).
If neither is available, returns None.

Returns:
The tool command (e.g., 'stdbuf -oL' or 'unbuffer'), or None
"""
import shutil

if shutil.which("stdbuf"):
return "stdbuf -oL"

if shutil.which("unbuffer"):
return "unbuffer"

return None

@staticmethod
def _wrap_line_buffered(command: str) -> str:
"""
Wrap a command to force line-buffered stdout when PTY is unavailable.

When stdout is connected to a pipe instead of a TTY, most programs
(Python, C, Ruby, etc.) use full buffering. The stdbuf command uses
LD_PRELOAD to override buffering at the libc level. The unbuffer
command (from expect) creates a PTY wrapper.

The detected tool is cached after first check to avoid repeated
subprocess/shell calls.

Args:
command: The shell command string

Returns:
Command wrapped with line-buffering prefix, or the original
command if no tool is available
"""
if BackgroundCommandManager._line_buffered_tool is None:
BackgroundCommandManager._line_buffered_tool = (
BackgroundCommandManager._detect_line_buffered_tool()
)

tool = BackgroundCommandManager._line_buffered_tool
if tool:
return f"{tool} {command}"

return command

@staticmethod
def save_paginated_output(
output: str,
Expand Down
14 changes: 0 additions & 14 deletions cecli/helpers/conversation/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,26 +289,12 @@ def update_file_diff(self, fname: str) -> Optional[str]:
),
}

assistant_msg = {
"role": "assistant",
"content": (
f"Thank you for sharing this {prefix_str}diff of the updates to {rel_fname}."
" I will review their contents."
),
}

ConversationService.get_manager(coder).add_message(
message_dict=diff_message,
tag=MessageTag.DIFFS,
hash_key=("file_diff_user", rel_fname, content_hash),
)

ConversationService.get_manager(coder).add_message(
message_dict=assistant_msg,
tag=MessageTag.DIFFS,
hash_key=("file_diff_assistant", rel_fname, content_hash),
)

return diff

def get_file_stub(self, fname: str) -> str:
Expand Down
Loading
Loading