1
0
mirror of https://github.com/containers/youki synced 2024-11-26 06:08:07 +01:00

Merge pull request #2838 from YJDoc2/fix/dbus_call_issue

Fix/dbus call issue
This commit is contained in:
Thomas Schubart 2024-07-09 23:53:11 +02:00 committed by GitHub
commit d6c59966c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 79 additions and 23 deletions

@ -4,6 +4,7 @@ use std::os::fd::AsRawFd;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use nix::errno::Errno;
use nix::sys::socket; use nix::sys::socket;
use super::client::SystemdClient; use super::client::SystemdClient;
@ -228,9 +229,10 @@ impl DbusConnection {
.filter(|m| m.preamble.mtype == MessageType::MethodReturn) .filter(|m| m.preamble.mtype == MessageType::MethodReturn)
.collect(); .collect();
let res = res.first().ok_or(DbusError::MethodCallErr( let res = res.first().ok_or(DbusError::AuthenticationErr(format!(
"expected method call to have reply, found no reply message".into(), "expected Hello call to have reply, found no reply message, got {:?} instead",
))?; res
)))?;
let mut ctr = 0; let mut ctr = 0;
let id = String::deserialize(&res.body, &mut ctr)?; let id = String::deserialize(&res.body, &mut ctr)?;
self.id = Some(id); self.id = Some(id);
@ -247,13 +249,18 @@ impl DbusConnection {
let mut reply: [u8; REPLY_BUF_SIZE] = [0_u8; REPLY_BUF_SIZE]; let mut reply: [u8; REPLY_BUF_SIZE] = [0_u8; REPLY_BUF_SIZE];
let mut reply_buffer = [IoSliceMut::new(&mut reply[0..])]; let mut reply_buffer = [IoSliceMut::new(&mut reply[0..])];
let reply_rcvd = socket::recvmsg::<()>( let reply_res = socket::recvmsg::<()>(
self.socket, self.socket,
&mut reply_buffer, &mut reply_buffer,
None, None,
socket::MsgFlags::empty(), socket::MsgFlags::empty(),
)?; );
let reply_rcvd = match reply_res {
Ok(msg) => msg,
Err(Errno::EAGAIN) => continue,
Err(e) => return Err(e.into()),
};
let received_byte_count = reply_rcvd.bytes; let received_byte_count = reply_rcvd.bytes;
ret.extend_from_slice(&reply[0..received_byte_count]); ret.extend_from_slice(&reply[0..received_byte_count]);
@ -296,20 +303,47 @@ impl DbusConnection {
None, None,
)?; )?;
let reply = self.receive_complete_response()?;
// note that a single received response can contain multiple
// messages, so we must deserialize it piece by piece
let mut ret = Vec::new(); let mut ret = Vec::new();
let mut buf = &reply[..];
while !buf.is_empty() { // it is possible that while receiving messages, we get some extra/previous message
let mut ctr = 0; // for method calls, we need to have an error or method return type message, so
let msg = Message::deserialize(&buf[ctr..], &mut ctr)?; // we keep looping until we get either of these. see https://github.com/containers/youki/issues/2826
// we reset the buf, because I couldn't figure out how the adjust_counter function // for more detailed analysis.
// should should be changed to work correctly with non-zero start counter, and this solved that issue loop {
buf = &buf[ctr..]; let reply = self.receive_complete_response()?;
ret.push(msg);
// note that a single received response can contain multiple
// messages, so we must deserialize it piece by piece
let mut buf = &reply[..];
while !buf.is_empty() {
let mut ctr = 0;
let msg = Message::deserialize(&buf[ctr..], &mut ctr)?;
// we reset the buf, because I couldn't figure out how the adjust_counter function
// should should be changed to work correctly with non-zero start counter, and this solved that issue
buf = &buf[ctr..];
ret.push(msg);
}
// in Youki, we only ever do method call apart from initial auth
// in case it is, we don't really have a specific message to look
// out of, so we take the buffer and break
if mtype != MessageType::MethodCall {
break;
}
// check if any of the received message is method return or error type
let return_message_count = ret
.iter()
.filter(|m| {
m.preamble.mtype == MessageType::MethodReturn
|| m.preamble.mtype == MessageType::Error
})
.count();
if return_message_count > 0 {
break;
}
} }
Ok(ret) Ok(ret)
} }

@ -54,7 +54,7 @@ impl HeaderSignature {
} }
/// Type of message /// Type of message
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum MessageType { pub enum MessageType {
MethodCall, MethodCall,
MethodReturn, MethodReturn,

@ -47,6 +47,7 @@ impl<'conn> Proxy<'conn> {
member: &str, member: &str,
body: Option<Body>, body: Option<Body>,
) -> Result<Output> { ) -> Result<Output> {
tracing::trace!("dbus call at interface {} member {}", interface, member);
let mut headers = Vec::with_capacity(4); let mut headers = Vec::with_capacity(4);
// create necessary headers // create necessary headers
@ -114,9 +115,10 @@ impl<'conn> Proxy<'conn> {
// we are only going to consider first reply, cause... so. // we are only going to consider first reply, cause... so.
// realistically there should only be at most one method return type of message // realistically there should only be at most one method return type of message
// for a method call // for a method call
let reply = reply.first().ok_or(DbusError::MethodCallErr( let reply = reply.first().ok_or(DbusError::MethodCallErr(format!(
"expected to get a reply for method call, didn't get any".into(), "expected to get a reply for method call, got {:?} instead",
))?; reply_messages
)))?;
let headers = &reply.headers; let headers = &reply.headers;
let expected_signature = Output::get_signature(); let expected_signature = Output::get_signature();

@ -159,7 +159,7 @@ impl ContainerBuilderImpl {
let (init_pid, need_to_clean_up_intel_rdt_dir) = let (init_pid, need_to_clean_up_intel_rdt_dir) =
process::container_main_process::container_main_process(&container_args).map_err( process::container_main_process::container_main_process(&container_args).map_err(
|err| { |err| {
tracing::error!(?err, "failed to run container process"); tracing::error!("failed to run container process {}", err);
LibcontainerError::MainProcess(err) LibcontainerError::MainProcess(err)
}, },
)?; )?;

@ -24,6 +24,8 @@ pub enum ChannelError {
MissingSeccompFds, MissingSeccompFds,
#[error("exec process failed with error {0}")] #[error("exec process failed with error {0}")]
ExecError(String), ExecError(String),
#[error("intermediate process error {0}")]
OtherError(String),
} }
/// Channel Design /// Channel Design
@ -83,6 +85,11 @@ impl MainSender {
Ok(()) Ok(())
} }
pub fn send_error(&mut self, err: String) -> Result<(), ChannelError> {
self.sender.send(Message::OtherError(err))?;
Ok(())
}
pub fn close(&self) -> Result<(), ChannelError> { pub fn close(&self) -> Result<(), ChannelError> {
self.sender.close()?; self.sender.close()?;
@ -110,6 +117,7 @@ impl MainReceiver {
match msg { match msg {
Message::IntermediateReady(pid) => Ok(Pid::from_raw(pid)), Message::IntermediateReady(pid) => Ok(Pid::from_raw(pid)),
Message::ExecFailed(err) => Err(ChannelError::ExecError(err)), Message::ExecFailed(err) => Err(ChannelError::ExecError(err)),
Message::OtherError(err) => Err(ChannelError::OtherError(err)),
msg => Err(ChannelError::UnexpectedMessage { msg => Err(ChannelError::UnexpectedMessage {
expected: Message::IntermediateReady(0), expected: Message::IntermediateReady(0),
received: msg, received: msg,

@ -62,7 +62,17 @@ pub fn container_main_process(container_args: &ContainerArgs) -> Result<(Pid, bo
) { ) {
Ok(_) => 0, Ok(_) => 0,
Err(err) => { Err(err) => {
tracing::error!(?err, "failed to run intermediate process"); tracing::error!("failed to run intermediate process {}", err);
match main_sender.send_error(err.to_string()) {
Ok(_) => {}
Err(e) => {
tracing::error!(
"error in sending intermediate error message {} to main: {}",
err,
e
)
}
}
-1 -1
} }
} }

@ -12,6 +12,7 @@ pub enum Message {
SeccompNotify, SeccompNotify,
SeccompNotifyDone, SeccompNotifyDone,
ExecFailed(String), ExecFailed(String),
OtherError(String),
} }
impl fmt::Display for Message { impl fmt::Display for Message {
@ -24,6 +25,7 @@ impl fmt::Display for Message {
Message::SeccompNotify => write!(f, "SeccompNotify"), Message::SeccompNotify => write!(f, "SeccompNotify"),
Message::SeccompNotifyDone => write!(f, "SeccompNotifyDone"), Message::SeccompNotifyDone => write!(f, "SeccompNotifyDone"),
Message::ExecFailed(s) => write!(f, "ExecFailed({})", s), Message::ExecFailed(s) => write!(f, "ExecFailed({})", s),
Message::OtherError(s) => write!(f, "OtherError({})", s),
} }
} }
} }