use crate::{
exceptions::PyTypeError,
ffi,
pyclass::boolean_struct::False,
types::{PyDict, PyString, PyTuple},
FromPyObject, PyAny, PyClass, PyErr, PyRef, PyRefMut, PyResult, Python,
};
pub trait PyFunctionArgument<'a, 'py>: Sized + 'a {
type Holder: FunctionArgumentHolder;
fn extract(obj: &'py PyAny, holder: &'a mut Self::Holder) -> PyResult<Self>;
}
impl<'a, 'py, T> PyFunctionArgument<'a, 'py> for T
where
T: FromPyObject<'py> + 'a,
{
type Holder = ();
#[inline]
fn extract(obj: &'py PyAny, _: &'a mut ()) -> PyResult<Self> {
obj.extract()
}
}
pub trait FunctionArgumentHolder: Sized {
const INIT: Self;
}
impl FunctionArgumentHolder for () {
const INIT: Self = ();
}
impl<T> FunctionArgumentHolder for Option<T> {
const INIT: Self = None;
}
#[inline]
pub fn extract_pyclass_ref<'a, 'py: 'a, T: PyClass>(
obj: &'py PyAny,
holder: &'a mut Option<PyRef<'py, T>>,
) -> PyResult<&'a T> {
#[cfg(not(option_insert))]
{
*holder = Some(obj.extract()?);
return Ok(holder.as_deref().unwrap());
}
#[cfg(option_insert)]
Ok(&*holder.insert(obj.extract()?))
}
#[inline]
pub fn extract_pyclass_ref_mut<'a, 'py: 'a, T: PyClass<Frozen = False>>(
obj: &'py PyAny,
holder: &'a mut Option<PyRefMut<'py, T>>,
) -> PyResult<&'a mut T> {
#[cfg(not(option_insert))]
{
*holder = Some(obj.extract()?);
return Ok(holder.as_deref_mut().unwrap());
}
#[cfg(option_insert)]
Ok(&mut *holder.insert(obj.extract()?))
}
#[doc(hidden)]
pub fn extract_argument<'a, 'py, T>(
obj: &'py PyAny,
holder: &'a mut T::Holder,
arg_name: &str,
) -> PyResult<T>
where
T: PyFunctionArgument<'a, 'py>,
{
match PyFunctionArgument::extract(obj, holder) {
Ok(value) => Ok(value),
Err(e) => Err(argument_extraction_error(obj.py(), arg_name, e)),
}
}
#[doc(hidden)]
pub fn extract_optional_argument<'a, 'py, T>(
obj: Option<&'py PyAny>,
holder: &'a mut T::Holder,
arg_name: &str,
default: fn() -> Option<T>,
) -> PyResult<Option<T>>
where
T: PyFunctionArgument<'a, 'py>,
{
match obj {
Some(obj) => {
if obj.is_none() {
Ok(None)
} else {
extract_argument(obj, holder, arg_name).map(Some)
}
}
_ => Ok(default()),
}
}
#[doc(hidden)]
pub fn extract_argument_with_default<'a, 'py, T>(
obj: Option<&'py PyAny>,
holder: &'a mut T::Holder,
arg_name: &str,
default: fn() -> T,
) -> PyResult<T>
where
T: PyFunctionArgument<'a, 'py>,
{
match obj {
Some(obj) => extract_argument(obj, holder, arg_name),
None => Ok(default()),
}
}
#[doc(hidden)]
pub fn from_py_with<'py, T>(
obj: &'py PyAny,
arg_name: &str,
extractor: fn(&'py PyAny) -> PyResult<T>,
) -> PyResult<T> {
match extractor(obj) {
Ok(value) => Ok(value),
Err(e) => Err(argument_extraction_error(obj.py(), arg_name, e)),
}
}
#[doc(hidden)]
pub fn from_py_with_with_default<'py, T>(
obj: Option<&'py PyAny>,
arg_name: &str,
extractor: fn(&'py PyAny) -> PyResult<T>,
default: fn() -> T,
) -> PyResult<T> {
match obj {
Some(obj) => from_py_with(obj, arg_name, extractor),
None => Ok(default()),
}
}
#[doc(hidden)]
#[cold]
pub fn argument_extraction_error(py: Python<'_>, arg_name: &str, error: PyErr) -> PyErr {
if error.get_type(py).is(py.get_type::<PyTypeError>()) {
let remapped_error =
PyTypeError::new_err(format!("argument '{}': {}", arg_name, error.value(py)));
remapped_error.set_cause(py, error.cause(py));
remapped_error
} else {
error
}
}
#[doc(hidden)]
#[inline]
pub unsafe fn unwrap_required_argument(argument: Option<&PyAny>) -> &PyAny {
match argument {
Some(value) => value,
#[cfg(debug_assertions)]
None => unreachable!("required method argument was not extracted"),
#[cfg(not(debug_assertions))]
None => std::hint::unreachable_unchecked(),
}
}
pub struct KeywordOnlyParameterDescription {
pub name: &'static str,
pub required: bool,
}
pub struct FunctionDescription {
pub cls_name: Option<&'static str>,
pub func_name: &'static str,
pub positional_parameter_names: &'static [&'static str],
pub positional_only_parameters: usize,
pub required_positional_parameters: usize,
pub keyword_only_parameters: &'static [KeywordOnlyParameterDescription],
}
impl FunctionDescription {
fn full_name(&self) -> String {
if let Some(cls_name) = self.cls_name {
format!("{}.{}()", cls_name, self.func_name)
} else {
format!("{}()", self.func_name)
}
}
#[cfg(not(Py_LIMITED_API))]
pub unsafe fn extract_arguments_fastcall<'py, V, K>(
&self,
py: Python<'py>,
args: *const *mut ffi::PyObject,
nargs: ffi::Py_ssize_t,
kwnames: *mut ffi::PyObject,
output: &mut [Option<&'py PyAny>],
) -> PyResult<(V::Varargs, K::Varkeywords)>
where
V: VarargsHandler<'py>,
K: VarkeywordsHandler<'py>,
{
let num_positional_parameters = self.positional_parameter_names.len();
debug_assert!(nargs >= 0);
debug_assert!(self.positional_only_parameters <= num_positional_parameters);
debug_assert!(self.required_positional_parameters <= num_positional_parameters);
debug_assert_eq!(
output.len(),
num_positional_parameters + self.keyword_only_parameters.len()
);
let args: *const Option<&PyAny> = args.cast();
let positional_args_provided = nargs as usize;
let remaining_positional_args = if args.is_null() {
debug_assert_eq!(positional_args_provided, 0);
&[]
} else {
let positional_args_to_consume =
num_positional_parameters.min(positional_args_provided);
let (positional_parameters, remaining) =
std::slice::from_raw_parts(args, positional_args_provided)
.split_at(positional_args_to_consume);
output[..positional_args_to_consume].copy_from_slice(positional_parameters);
remaining
};
let varargs = V::handle_varargs_fastcall(py, remaining_positional_args, self)?;
let mut varkeywords = K::Varkeywords::default();
if let Some(kwnames) = py.from_borrowed_ptr_or_opt::<PyTuple>(kwnames) {
let kwargs =
::std::slice::from_raw_parts((args as *const &PyAny).offset(nargs), kwnames.len());
self.handle_kwargs::<K, _>(
kwnames.iter().zip(kwargs.iter().copied()),
&mut varkeywords,
num_positional_parameters,
output,
)?
}
self.ensure_no_missing_required_positional_arguments(output, positional_args_provided)?;
self.ensure_no_missing_required_keyword_arguments(output)?;
Ok((varargs, varkeywords))
}
pub unsafe fn extract_arguments_tuple_dict<'py, V, K>(
&self,
py: Python<'py>,
args: *mut ffi::PyObject,
kwargs: *mut ffi::PyObject,
output: &mut [Option<&'py PyAny>],
) -> PyResult<(V::Varargs, K::Varkeywords)>
where
V: VarargsHandler<'py>,
K: VarkeywordsHandler<'py>,
{
let args = py.from_borrowed_ptr::<PyTuple>(args);
let kwargs: ::std::option::Option<&PyDict> = py.from_borrowed_ptr_or_opt(kwargs);
let num_positional_parameters = self.positional_parameter_names.len();
debug_assert!(self.positional_only_parameters <= num_positional_parameters);
debug_assert!(self.required_positional_parameters <= num_positional_parameters);
debug_assert_eq!(
output.len(),
num_positional_parameters + self.keyword_only_parameters.len()
);
for (i, arg) in args.iter().take(num_positional_parameters).enumerate() {
output[i] = Some(arg);
}
let varargs = V::handle_varargs_tuple(args, self)?;
let mut varkeywords = K::Varkeywords::default();
if let Some(kwargs) = kwargs {
self.handle_kwargs::<K, _>(kwargs, &mut varkeywords, num_positional_parameters, output)?
}
self.ensure_no_missing_required_positional_arguments(output, args.len())?;
self.ensure_no_missing_required_keyword_arguments(output)?;
Ok((varargs, varkeywords))
}
#[inline]
fn handle_kwargs<'py, K, I>(
&self,
kwargs: I,
varkeywords: &mut K::Varkeywords,
num_positional_parameters: usize,
output: &mut [Option<&'py PyAny>],
) -> PyResult<()>
where
K: VarkeywordsHandler<'py>,
I: IntoIterator<Item = (&'py PyAny, &'py PyAny)>,
{
debug_assert_eq!(
num_positional_parameters,
self.positional_parameter_names.len()
);
debug_assert_eq!(
output.len(),
num_positional_parameters + self.keyword_only_parameters.len()
);
let mut positional_only_keyword_arguments = Vec::new();
for (kwarg_name_py, value) in kwargs {
if let Ok(kwarg_name) = kwarg_name_py.downcast::<PyString>()?.to_str() {
if let Some(i) = self.find_keyword_parameter_in_keyword_only(kwarg_name) {
if output[i + num_positional_parameters]
.replace(value)
.is_some()
{
return Err(self.multiple_values_for_argument(kwarg_name));
}
continue;
}
if let Some(i) = self.find_keyword_parameter_in_positional(kwarg_name) {
if i < self.positional_only_parameters {
if K::handle_varkeyword(varkeywords, kwarg_name_py, value, self).is_err() {
positional_only_keyword_arguments.push(kwarg_name);
}
} else if output[i].replace(value).is_some() {
return Err(self.multiple_values_for_argument(kwarg_name));
}
continue;
}
};
K::handle_varkeyword(varkeywords, kwarg_name_py, value, self)?
}
if !positional_only_keyword_arguments.is_empty() {
return Err(self.positional_only_keyword_arguments(&positional_only_keyword_arguments));
}
Ok(())
}
#[inline]
fn find_keyword_parameter_in_positional(&self, kwarg_name: &str) -> Option<usize> {
self.positional_parameter_names
.iter()
.position(|¶m_name| param_name == kwarg_name)
}
#[inline]
fn find_keyword_parameter_in_keyword_only(&self, kwarg_name: &str) -> Option<usize> {
self.keyword_only_parameters
.iter()
.position(|param_desc| param_desc.name == kwarg_name)
}
#[inline]
fn ensure_no_missing_required_positional_arguments(
&self,
output: &[Option<&PyAny>],
positional_args_provided: usize,
) -> PyResult<()> {
if positional_args_provided < self.required_positional_parameters {
for out in &output[positional_args_provided..self.required_positional_parameters] {
if out.is_none() {
return Err(self.missing_required_positional_arguments(output));
}
}
}
Ok(())
}
#[inline]
fn ensure_no_missing_required_keyword_arguments(
&self,
output: &[Option<&PyAny>],
) -> PyResult<()> {
let keyword_output = &output[self.positional_parameter_names.len()..];
for (param, out) in self.keyword_only_parameters.iter().zip(keyword_output) {
if param.required && out.is_none() {
return Err(self.missing_required_keyword_arguments(keyword_output));
}
}
Ok(())
}
#[cold]
fn too_many_positional_arguments(&self, args_provided: usize) -> PyErr {
let was = if args_provided == 1 { "was" } else { "were" };
let msg = if self.required_positional_parameters != self.positional_parameter_names.len() {
format!(
"{} takes from {} to {} positional arguments but {} {} given",
self.full_name(),
self.required_positional_parameters,
self.positional_parameter_names.len(),
args_provided,
was
)
} else {
format!(
"{} takes {} positional arguments but {} {} given",
self.full_name(),
self.positional_parameter_names.len(),
args_provided,
was
)
};
PyTypeError::new_err(msg)
}
#[cold]
fn multiple_values_for_argument(&self, argument: &str) -> PyErr {
PyTypeError::new_err(format!(
"{} got multiple values for argument '{}'",
self.full_name(),
argument
))
}
#[cold]
fn unexpected_keyword_argument(&self, argument: &PyAny) -> PyErr {
PyTypeError::new_err(format!(
"{} got an unexpected keyword argument '{}'",
self.full_name(),
argument
))
}
#[cold]
fn positional_only_keyword_arguments(&self, parameter_names: &[&str]) -> PyErr {
let mut msg = format!(
"{} got some positional-only arguments passed as keyword arguments: ",
self.full_name()
);
push_parameter_list(&mut msg, parameter_names);
PyTypeError::new_err(msg)
}
#[cold]
fn missing_required_arguments(&self, argument_type: &str, parameter_names: &[&str]) -> PyErr {
let arguments = if parameter_names.len() == 1 {
"argument"
} else {
"arguments"
};
let mut msg = format!(
"{} missing {} required {} {}: ",
self.full_name(),
parameter_names.len(),
argument_type,
arguments,
);
push_parameter_list(&mut msg, parameter_names);
PyTypeError::new_err(msg)
}
#[cold]
fn missing_required_keyword_arguments(&self, keyword_outputs: &[Option<&PyAny>]) -> PyErr {
debug_assert_eq!(self.keyword_only_parameters.len(), keyword_outputs.len());
let missing_keyword_only_arguments: Vec<_> = self
.keyword_only_parameters
.iter()
.zip(keyword_outputs)
.filter_map(|(keyword_desc, out)| {
if keyword_desc.required && out.is_none() {
Some(keyword_desc.name)
} else {
None
}
})
.collect();
debug_assert!(!missing_keyword_only_arguments.is_empty());
self.missing_required_arguments("keyword", &missing_keyword_only_arguments)
}
#[cold]
fn missing_required_positional_arguments(&self, output: &[Option<&PyAny>]) -> PyErr {
let missing_positional_arguments: Vec<_> = self
.positional_parameter_names
.iter()
.take(self.required_positional_parameters)
.zip(output)
.filter_map(|(param, out)| if out.is_none() { Some(*param) } else { None })
.collect();
debug_assert!(!missing_positional_arguments.is_empty());
self.missing_required_arguments("positional", &missing_positional_arguments)
}
}
pub trait VarargsHandler<'py> {
type Varargs;
fn handle_varargs_fastcall(
py: Python<'py>,
varargs: &[Option<&PyAny>],
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs>;
fn handle_varargs_tuple(
args: &'py PyTuple,
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs>;
}
pub struct NoVarargs;
impl<'py> VarargsHandler<'py> for NoVarargs {
type Varargs = ();
#[inline]
fn handle_varargs_fastcall(
_py: Python<'py>,
varargs: &[Option<&PyAny>],
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs> {
let extra_arguments = varargs.len();
if extra_arguments > 0 {
return Err(function_description.too_many_positional_arguments(
function_description.positional_parameter_names.len() + extra_arguments,
));
}
Ok(())
}
#[inline]
fn handle_varargs_tuple(
args: &'py PyTuple,
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs> {
let positional_parameter_count = function_description.positional_parameter_names.len();
let provided_args_count = args.len();
if provided_args_count <= positional_parameter_count {
Ok(())
} else {
Err(function_description.too_many_positional_arguments(provided_args_count))
}
}
}
pub struct TupleVarargs;
impl<'py> VarargsHandler<'py> for TupleVarargs {
type Varargs = &'py PyTuple;
#[inline]
fn handle_varargs_fastcall(
py: Python<'py>,
varargs: &[Option<&PyAny>],
_function_description: &FunctionDescription,
) -> PyResult<Self::Varargs> {
Ok(PyTuple::new(py, varargs))
}
#[inline]
fn handle_varargs_tuple(
args: &'py PyTuple,
function_description: &FunctionDescription,
) -> PyResult<Self::Varargs> {
let positional_parameters = function_description.positional_parameter_names.len();
Ok(args.get_slice(positional_parameters, args.len()))
}
}
pub trait VarkeywordsHandler<'py> {
type Varkeywords: Default;
fn handle_varkeyword(
varkeywords: &mut Self::Varkeywords,
name: &'py PyAny,
value: &'py PyAny,
function_description: &FunctionDescription,
) -> PyResult<()>;
}
pub struct NoVarkeywords;
impl<'py> VarkeywordsHandler<'py> for NoVarkeywords {
type Varkeywords = ();
#[inline]
fn handle_varkeyword(
_varkeywords: &mut Self::Varkeywords,
name: &'py PyAny,
_value: &'py PyAny,
function_description: &FunctionDescription,
) -> PyResult<()> {
Err(function_description.unexpected_keyword_argument(name))
}
}
pub struct DictVarkeywords;
impl<'py> VarkeywordsHandler<'py> for DictVarkeywords {
type Varkeywords = Option<&'py PyDict>;
#[inline]
fn handle_varkeyword(
varkeywords: &mut Self::Varkeywords,
name: &'py PyAny,
value: &'py PyAny,
_function_description: &FunctionDescription,
) -> PyResult<()> {
varkeywords
.get_or_insert_with(|| PyDict::new(name.py()))
.set_item(name, value)
}
}
fn push_parameter_list(msg: &mut String, parameter_names: &[&str]) {
for (i, parameter) in parameter_names.iter().enumerate() {
if i != 0 {
if parameter_names.len() > 2 {
msg.push(',');
}
if i == parameter_names.len() - 1 {
msg.push_str(" and ")
} else {
msg.push(' ')
}
}
msg.push('\'');
msg.push_str(parameter);
msg.push('\'');
}
}
#[cfg(test)]
mod tests {
use crate::{
types::{IntoPyDict, PyTuple},
AsPyPointer, PyAny, Python, ToPyObject,
};
use super::{push_parameter_list, FunctionDescription, NoVarargs, NoVarkeywords};
#[test]
fn unexpected_keyword_argument() {
let function_description = FunctionDescription {
cls_name: None,
func_name: "example",
positional_parameter_names: &[],
positional_only_parameters: 0,
required_positional_parameters: 0,
keyword_only_parameters: &[],
};
Python::with_gil(|py| {
let err = unsafe {
function_description
.extract_arguments_tuple_dict::<NoVarargs, NoVarkeywords>(
py,
PyTuple::new(py, Vec::<&PyAny>::new()).as_ptr(),
[("foo".to_object(py).into_ref(py), 0u8)]
.into_py_dict(py)
.as_ptr(),
&mut [],
)
.unwrap_err()
};
assert_eq!(
err.to_string(),
"TypeError: example() got an unexpected keyword argument 'foo'"
);
})
}
#[test]
fn keyword_not_string() {
let function_description = FunctionDescription {
cls_name: None,
func_name: "example",
positional_parameter_names: &[],
positional_only_parameters: 0,
required_positional_parameters: 0,
keyword_only_parameters: &[],
};
Python::with_gil(|py| {
let err = unsafe {
function_description
.extract_arguments_tuple_dict::<NoVarargs, NoVarkeywords>(
py,
PyTuple::new(py, Vec::<&PyAny>::new()).as_ptr(),
[(1u8.to_object(py).into_ref(py), 1u8)]
.into_py_dict(py)
.as_ptr(),
&mut [],
)
.unwrap_err()
};
assert_eq!(
err.to_string(),
"TypeError: 'int' object cannot be converted to 'PyString'"
);
})
}
#[test]
fn missing_required_arguments() {
let function_description = FunctionDescription {
cls_name: None,
func_name: "example",
positional_parameter_names: &["foo", "bar"],
positional_only_parameters: 0,
required_positional_parameters: 2,
keyword_only_parameters: &[],
};
Python::with_gil(|py| {
let mut output = [None, None];
let err = unsafe {
function_description.extract_arguments_tuple_dict::<NoVarargs, NoVarkeywords>(
py,
PyTuple::new(py, Vec::<&PyAny>::new()).as_ptr(),
std::ptr::null_mut(),
&mut output,
)
}
.unwrap_err();
assert_eq!(
err.to_string(),
"TypeError: example() missing 2 required positional arguments: 'foo' and 'bar'"
);
})
}
#[test]
fn push_parameter_list_empty() {
let mut s = String::new();
push_parameter_list(&mut s, &[]);
assert_eq!(&s, "");
}
#[test]
fn push_parameter_list_one() {
let mut s = String::new();
push_parameter_list(&mut s, &["a"]);
assert_eq!(&s, "'a'");
}
#[test]
fn push_parameter_list_two() {
let mut s = String::new();
push_parameter_list(&mut s, &["a", "b"]);
assert_eq!(&s, "'a' and 'b'");
}
#[test]
fn push_parameter_list_three() {
let mut s = String::new();
push_parameter_list(&mut s, &["a", "b", "c"]);
assert_eq!(&s, "'a', 'b', and 'c'");
}
#[test]
fn push_parameter_list_four() {
let mut s = String::new();
push_parameter_list(&mut s, &["a", "b", "c", "d"]);
assert_eq!(&s, "'a', 'b', 'c', and 'd'");
}
}