use std::io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult}; use std::path::Path; use nix_compat::nix_daemon::types::{AddToStoreNarRequest, UnkeyedValidPathInfo}; use nix_compat::nix_daemon::worker_protocol::{ClientSettings, Operation}; use nix_compat::store_path::StorePath; use nix_compat::wire::ProtocolVersion; use nix_compat::wire::de::{NixRead, NixReader}; use nix_compat::wire::ser::{NixSerialize, NixWrite, NixWriter, NixWriterBuilder}; use num_enum::{IntoPrimitive, TryFromPrimitive}; use thiserror::Error; use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf, split}; use tokio::net::UnixStream; use tokio::sync::Mutex; use super::Store; use crate::error::{Error, Result}; pub struct DaemonStore { runtime: tokio::runtime::Runtime, connection: NixDaemonConnection, } impl DaemonStore { pub fn connect(socket_path: &Path) -> Result { let runtime = tokio::runtime::Runtime::new() .map_err(|e| Error::internal(format!("Failed to create tokio runtime: {}", e)))?; let connection = runtime.block_on(async { NixDaemonConnection::connect(socket_path) .await .map_err(|e| { Error::internal(format!( "Failed to connect to nix-daemon at {}: {}", socket_path.display(), e )) }) })?; Ok(Self { runtime, connection, }) } fn block_on(&self, future: F) -> F::Output where F: std::future::Future, { self.runtime.block_on(future) } } impl Store for DaemonStore { fn get_store_dir(&self) -> &str { "/nix/store" } fn is_valid_path(&self, path: &str) -> Result { self.block_on(async { self.connection .is_valid_path(path) .await .map_err(|e| Error::internal(format!("Daemon error in is_valid_path: {}", e))) }) } fn ensure_path(&self, path: &str) -> Result<()> { self.block_on(async { self.connection.ensure_path(path).await.map_err(|e| { Error::eval_error( format!( "builtins.storePath: path '{}' is not valid in nix store: {}", path, e ), None, ) }) }) } fn add_to_store( &self, name: &str, content: &[u8], recursive: bool, references: Vec, ) -> Result { use std::fs; use nix_compat::nix_daemon::types::AddToStoreNarRequest; use nix_compat::nixhash::{CAHash, NixHash}; use nix_compat::store_path::{StorePath, build_ca_path}; use sha2::{Digest, Sha256}; use tempfile::NamedTempFile; let temp_file = NamedTempFile::new() .map_err(|e| Error::internal(format!("Failed to create temp file: {}", e)))?; fs::write(temp_file.path(), content) .map_err(|e| Error::internal(format!("Failed to write temp file: {}", e)))?; let nar_data = crate::nar::pack_nar(temp_file.path())?; let nar_hash_hex = { let mut hasher = Sha256::new(); hasher.update(&nar_data); hex::encode(hasher.finalize()) }; let nar_hash_bytes = hex::decode(&nar_hash_hex) .map_err(|e| Error::internal(format!("Invalid nar hash: {}", e)))?; let mut nar_hash_arr = [0u8; 32]; nar_hash_arr.copy_from_slice(&nar_hash_bytes); let ca_hash = if recursive { CAHash::Nar(NixHash::Sha256(nar_hash_arr)) } else { let mut content_hasher = Sha256::new(); content_hasher.update(content); let content_hash = content_hasher.finalize(); let mut content_hash_arr = [0u8; 32]; content_hash_arr.copy_from_slice(&content_hash); CAHash::Flat(NixHash::Sha256(content_hash_arr)) }; let ref_store_paths: std::result::Result>, _> = references .iter() .map(|r| StorePath::::from_absolute_path(r.as_bytes())) .collect(); let ref_store_paths = ref_store_paths .map_err(|e| Error::internal(format!("Invalid reference path: {}", e)))?; let store_path: StorePath = build_ca_path(name, &ca_hash, references.clone(), false) .map_err(|e| Error::internal(format!("Failed to build store path: {}", e)))?; let store_path_str = store_path.to_absolute_path(); if self.is_valid_path(&store_path_str)? { return Ok(store_path_str); } let request = AddToStoreNarRequest { path: store_path, deriver: None, nar_hash: unsafe { std::mem::transmute::<[u8; 32], nix_compat::nix_daemon::types::NarHash>( nar_hash_arr, ) }, references: ref_store_paths, registration_time: 0, nar_size: nar_data.len() as u64, ultimate: false, signatures: vec![], ca: Some(ca_hash), repair: false, dont_check_sigs: false, }; self.block_on(async { self.connection .add_to_store_nar(request, &nar_data) .await .map_err(|e| Error::internal(format!("Failed to add to store: {}", e))) })?; Ok(store_path_str) } fn add_to_store_from_path( &self, name: &str, source_path: &std::path::Path, references: Vec, ) -> Result { use nix_compat::nix_daemon::types::AddToStoreNarRequest; use nix_compat::nixhash::{CAHash, NixHash}; use nix_compat::store_path::{StorePath, build_ca_path}; use sha2::{Digest, Sha256}; let nar_data = crate::nar::pack_nar(source_path)?; let nar_hash: [u8; 32] = { let mut hasher = Sha256::new(); hasher.update(&nar_data); hasher.finalize().into() }; let ca_hash = CAHash::Nar(NixHash::Sha256(nar_hash)); let ref_store_paths: std::result::Result>, _> = references .iter() .map(|r| StorePath::::from_absolute_path(r.as_bytes())) .collect(); let ref_store_paths = ref_store_paths .map_err(|e| Error::internal(format!("Invalid reference path: {}", e)))?; let store_path: StorePath = build_ca_path(name, &ca_hash, references.clone(), false) .map_err(|e| Error::internal(format!("Failed to build store path: {}", e)))?; let store_path_str = store_path.to_absolute_path(); if self.is_valid_path(&store_path_str)? { return Ok(store_path_str); } let request = AddToStoreNarRequest { path: store_path, deriver: None, nar_hash: unsafe { std::mem::transmute::<[u8; 32], nix_compat::nix_daemon::types::NarHash>(nar_hash) }, references: ref_store_paths, registration_time: 0, nar_size: nar_data.len() as u64, ultimate: false, signatures: vec![], ca: Some(ca_hash), repair: false, dont_check_sigs: false, }; self.block_on(async { self.connection .add_to_store_nar(request, &nar_data) .await .map_err(|e| Error::internal(format!("Failed to add to store: {}", e))) })?; Ok(store_path_str) } fn add_text_to_store( &self, name: &str, content: &str, references: Vec, ) -> Result { use std::fs; use nix_compat::nix_daemon::types::AddToStoreNarRequest; use nix_compat::nixhash::CAHash; use nix_compat::store_path::{StorePath, build_text_path}; use sha2::{Digest, Sha256}; use tempfile::NamedTempFile; let temp_file = NamedTempFile::new() .map_err(|e| Error::internal(format!("Failed to create temp file: {}", e)))?; fs::write(temp_file.path(), content.as_bytes()) .map_err(|e| Error::internal(format!("Failed to write temp file: {}", e)))?; let nar_data = crate::nar::pack_nar(temp_file.path())?; let nar_hash: [u8; 32] = { let mut hasher = Sha256::new(); hasher.update(&nar_data); hasher.finalize().into() }; let content_hash = { let mut hasher = Sha256::new(); hasher.update(content.as_bytes()); hasher.finalize().into() }; let ref_store_paths: std::result::Result>, _> = references .iter() .map(|r| StorePath::::from_absolute_path(r.as_bytes())) .collect(); let ref_store_paths = ref_store_paths .map_err(|e| Error::internal(format!("Invalid reference path: {}", e)))?; let store_path: StorePath = build_text_path(name, content, references.clone()) .map_err(|e| Error::internal(format!("Failed to build text store path: {}", e)))?; let store_path_str = store_path.to_absolute_path(); if self.is_valid_path(&store_path_str)? { return Ok(store_path_str); } let request = AddToStoreNarRequest { path: store_path, deriver: None, nar_hash: unsafe { std::mem::transmute::<[u8; 32], nix_compat::nix_daemon::types::NarHash>(nar_hash) }, references: ref_store_paths, registration_time: 0, nar_size: nar_data.len() as u64, ultimate: false, signatures: vec![], ca: Some(CAHash::Text(content_hash)), repair: false, dont_check_sigs: false, }; self.block_on(async { self.connection .add_to_store_nar(request, &nar_data) .await .map_err(|e| Error::internal(format!("Failed to add text to store: {}", e))) })?; Ok(store_path_str) } } const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::from_parts(1, 37); // Protocol magic numbers (from nix-compat worker_protocol.rs) const WORKER_MAGIC_1: u64 = 0x6e697863; // "nixc" const WORKER_MAGIC_2: u64 = 0x6478696f; // "dxio" const STDERR_LAST: u64 = 0x616c7473; // "alts" const STDERR_ERROR: u64 = 0x63787470; // "cxtp" /// Performs the client handshake with a nix-daemon server /// /// This is the client-side counterpart to `server_handshake_client`. /// It exchanges magic numbers, negotiates protocol version, and sends client settings. async fn client_handshake( conn: &mut RW, client_settings: &ClientSettings, ) -> IoResult where RW: AsyncReadExt + AsyncWriteExt + Unpin, { // 1. Send magic number 1 conn.write_u64_le(WORKER_MAGIC_1).await?; // 2. Receive magic number 2 let magic2 = conn.read_u64_le().await?; if magic2 != WORKER_MAGIC_2 { return Err(IoError::new( IoErrorKind::InvalidData, format!("Invalid magic number from server: {}", magic2), )); } // 3. Receive server protocol version let server_version_raw = conn.read_u64_le().await?; let server_version: ProtocolVersion = server_version_raw.try_into().map_err(|e| { IoError::new( IoErrorKind::InvalidData, format!("Invalid protocol version: {}", e), ) })?; // 4. Send our protocol version conn.write_u64_le(PROTOCOL_VERSION.into()).await?; // Pick the minimum version let protocol_version = std::cmp::min(PROTOCOL_VERSION, server_version); // 5. Send obsolete fields based on protocol version if protocol_version.minor() >= 14 { // CPU affinity (obsolete, send 0) conn.write_u64_le(0).await?; } if protocol_version.minor() >= 11 { // Reserve space (obsolete, send 0) conn.write_u64_le(0).await?; } if protocol_version.minor() >= 33 { // Read Nix version string let version_len = conn.read_u64_le().await? as usize; let mut version_bytes = vec![0u8; version_len]; conn.read_exact(&mut version_bytes).await?; // Padding let padding = (8 - (version_len % 8)) % 8; if padding > 0 { let mut pad = vec![0u8; padding]; conn.read_exact(&mut pad).await?; } } if protocol_version.minor() >= 35 { // Read trust level let _trust = conn.read_u64_le().await?; } // 6. Read STDERR_LAST let stderr_last = conn.read_u64_le().await?; if stderr_last != STDERR_LAST { return Err(IoError::new( IoErrorKind::InvalidData, format!("Expected STDERR_LAST, got: {}", stderr_last), )); } // 7. Send SetOptions operation with client settings conn.write_u64_le(Operation::SetOptions.into()).await?; conn.flush().await?; // Serialize client settings let mut settings_buf = Vec::new(); { let mut writer = NixWriterBuilder::default() .set_version(protocol_version) .build(&mut settings_buf); writer.write_value(client_settings).await?; writer.flush().await?; } conn.write_all(&settings_buf).await?; conn.flush().await?; // 8. Read response to SetOptions let response = conn.read_u64_le().await?; if response != STDERR_LAST { return Err(IoError::new( IoErrorKind::InvalidData, format!("Expected STDERR_LAST after SetOptions, got: {}", response), )); } Ok(protocol_version) } /// Low-level Nix Daemon client /// /// This struct manages communication with a nix-daemon using the wire protocol. /// It is NOT thread-safe and should be wrapped in a Mutex for concurrent access. pub struct NixDaemonClient { protocol_version: ProtocolVersion, reader: NixReader>, writer: NixWriter>, _marker: std::marker::PhantomData>, } impl NixDaemonClient { /// Connect to a nix-daemon at the given Unix socket path pub async fn connect(socket_path: &Path) -> IoResult { let stream = UnixStream::connect(socket_path).await?; Self::from_stream(stream).await } /// Create a client from an existing Unix stream pub async fn from_stream(mut stream: UnixStream) -> IoResult { let client_settings = ClientSettings::default(); // Perform handshake let protocol_version = client_handshake(&mut stream, &client_settings).await?; // Split stream into reader and writer let (read_half, write_half) = split(stream); let reader = NixReader::builder() .set_version(protocol_version) .build(read_half); let writer = NixWriterBuilder::default() .set_version(protocol_version) .build(write_half); Ok(Self { protocol_version, reader, writer, _marker: Default::default(), }) } /// Execute an operation with a single parameter async fn execute_with(&mut self, operation: Operation, param: &P) -> IoResult where P: NixSerialize + Send, T: nix_compat::wire::de::NixDeserialize, { // Send operation self.writer.write_value(&operation).await?; // Send parameter self.writer.write_value(param).await?; self.writer.flush().await?; self.read_response().await } /// Read a response from the daemon /// /// The daemon sends either: /// - STDERR_LAST followed by the result /// - STDERR_ERROR followed by a structured error async fn read_response(&mut self) -> IoResult where T: nix_compat::wire::de::NixDeserialize, { loop { let msg = self.reader.read_number().await?; if msg == STDERR_LAST { let result: T = self.reader.read_value().await?; return Ok(result); } else if msg == STDERR_ERROR { let error_msg = self.read_daemon_error().await?; return Err(IoError::other(error_msg)); } else { let _data: String = self.reader.read_value().await?; continue; } } } async fn read_daemon_error(&mut self) -> IoResult { let type_marker: String = self.reader.read_value().await?; assert_eq!(type_marker, "Error"); let level = NixDaemonErrorLevel::try_from_primitive( self.reader .read_number() .await? .try_into() .map_err(|_| IoError::other("invalid nix-daemon error level"))?, ) .map_err(|_| IoError::other("invalid nix-daemon error level"))?; // removed let _name: String = self.reader.read_value().await?; let msg: String = self.reader.read_value().await?; let have_pos: u64 = self.reader.read_number().await?; assert_eq!(have_pos, 0); let nr_traces: u64 = self.reader.read_number().await?; let mut traces = Vec::new(); for _ in 0..nr_traces { let _trace_pos: u64 = self.reader.read_number().await?; let trace_hint: String = self.reader.read_value().await?; traces.push(trace_hint); } Ok(NixDaemonError { level, msg, traces }) } /// Check if a path is valid in the store pub async fn is_valid_path(&mut self, path: &str) -> IoResult { let store_path = StorePath::::from_absolute_path(path.as_bytes()) .map_err(|e| IoError::new(IoErrorKind::InvalidInput, e.to_string()))?; self.execute_with(Operation::IsValidPath, &store_path).await } /// Query information about a store path #[allow(dead_code)] pub async fn query_path_info(&mut self, path: &str) -> IoResult> { let store_path = StorePath::::from_absolute_path(path.as_bytes()) .map_err(|e| IoError::new(IoErrorKind::InvalidInput, e.to_string()))?; self.writer.write_value(&Operation::QueryPathInfo).await?; self.writer.write_value(&store_path).await?; self.writer.flush().await?; loop { let msg = self.reader.read_number().await?; if msg == STDERR_LAST { let has_value: bool = self.reader.read_value().await?; if has_value { use nix_compat::narinfo::Signature; use nix_compat::nixhash::CAHash; let deriver = self.reader.read_value().await?; let nar_hash: String = self.reader.read_value().await?; let references = self.reader.read_value().await?; let registration_time = self.reader.read_value().await?; let nar_size = self.reader.read_value().await?; let ultimate = self.reader.read_value().await?; let signatures: Vec> = self.reader.read_value().await?; let ca: Option = self.reader.read_value().await?; let value = UnkeyedValidPathInfo { deriver, nar_hash, references, registration_time, nar_size, ultimate, signatures, ca, }; return Ok(Some(value)); } else { return Ok(None); } } else if msg == STDERR_ERROR { let error_msg = self.read_daemon_error().await?; return Err(IoError::other(error_msg)); } else { let _data: String = self.reader.read_value().await?; continue; } } } /// Ensure a path is available in the store pub async fn ensure_path(&mut self, path: &str) -> IoResult<()> { let store_path = StorePath::::from_absolute_path(path.as_bytes()) .map_err(|e| IoError::new(IoErrorKind::InvalidInput, e.to_string()))?; self.writer.write_value(&Operation::EnsurePath).await?; self.writer.write_value(&store_path).await?; self.writer.flush().await?; loop { let msg = self.reader.read_number().await?; if msg == STDERR_LAST { return Ok(()); } else if msg == STDERR_ERROR { let error_msg = self.read_daemon_error().await?; return Err(IoError::other(error_msg)); } else { let _data: String = self.reader.read_value().await?; continue; } } } /// Query which paths are valid #[allow(dead_code)] pub async fn query_valid_paths(&mut self, paths: Vec) -> IoResult> { let store_paths: IoResult>> = paths .iter() .map(|p| { StorePath::::from_absolute_path(p.as_bytes()) .map_err(|e| IoError::new(IoErrorKind::InvalidInput, e.to_string())) }) .collect(); let store_paths = store_paths?; // Send operation self.writer.write_value(&Operation::QueryValidPaths).await?; // Manually serialize the request since QueryValidPaths doesn't impl NixSerialize // QueryValidPaths = { paths: Vec, substitute: bool } self.writer.write_value(&store_paths).await?; // For protocol >= 1.27, send substitute flag if self.protocol_version.minor() >= 27 { self.writer.write_value(&false).await?; } self.writer.flush().await?; let result: Vec> = self.read_response().await?; Ok(result.into_iter().map(|p| p.to_absolute_path()).collect()) } /// Add a NAR to the store pub async fn add_to_store_nar( &mut self, request: AddToStoreNarRequest, nar_data: &[u8], ) -> IoResult<()> { tracing::debug!( "add_to_store_nar: path={}, nar_size={}", request.path.to_absolute_path(), request.nar_size, ); self.writer.write_value(&Operation::AddToStoreNar).await?; self.writer.write_value(&request.path).await?; self.writer.write_value(&request.deriver).await?; let nar_hash_hex = hex::encode(request.nar_hash.as_ref()); self.writer.write_value(&nar_hash_hex).await?; self.writer.write_value(&request.references).await?; self.writer.write_value(&request.registration_time).await?; self.writer.write_value(&request.nar_size).await?; self.writer.write_value(&request.ultimate).await?; self.writer.write_value(&request.signatures).await?; self.writer.write_value(&request.ca).await?; self.writer.write_value(&request.repair).await?; self.writer.write_value(&request.dont_check_sigs).await?; if self.protocol_version.minor() >= 23 { self.writer.write_number(nar_data.len() as u64).await?; self.writer.write_all(nar_data).await?; self.writer.write_number(0u64).await?; } else { self.writer.write_slice(nar_data).await?; } self.writer.flush().await?; loop { let msg = self.reader.read_number().await?; if msg == STDERR_LAST { return Ok(()); } else if msg == STDERR_ERROR { let error_msg = self.read_daemon_error().await?; return Err(IoError::other(error_msg)); } else { let _data: String = self.reader.read_value().await?; continue; } } } } /// Thread-safe wrapper around NixDaemonClient pub struct NixDaemonConnection { client: Mutex, } impl NixDaemonConnection { /// Connect to a nix-daemon at the given socket path pub async fn connect(socket_path: &Path) -> IoResult { let client = NixDaemonClient::connect(socket_path).await?; Ok(Self { client: Mutex::new(client), }) } /// Check if a path is valid in the store pub async fn is_valid_path(&self, path: &str) -> IoResult { let mut client = self.client.lock().await; client.is_valid_path(path).await } /// Query information about a store path #[allow(dead_code)] pub async fn query_path_info(&self, path: &str) -> IoResult> { let mut client = self.client.lock().await; client.query_path_info(path).await } /// Ensure a path is available in the store pub async fn ensure_path(&self, path: &str) -> IoResult<()> { let mut client = self.client.lock().await; client.ensure_path(path).await } /// Query which paths are valid #[allow(dead_code)] pub async fn query_valid_paths(&self, paths: Vec) -> IoResult> { let mut client = self.client.lock().await; client.query_valid_paths(paths).await } /// Add a NAR to the store pub async fn add_to_store_nar( &self, request: AddToStoreNarRequest, nar_data: &[u8], ) -> IoResult<()> { let mut client = self.client.lock().await; client.add_to_store_nar(request, nar_data).await } } #[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)] #[repr(u8)] pub enum NixDaemonErrorLevel { Error = 0, Warn, Notice, Info, Talkative, Chatty, Debug, Vomit, } #[derive(Debug, Error)] #[error("{msg}")] pub struct NixDaemonError { level: NixDaemonErrorLevel, msg: String, traces: Vec, }