问题设置和 API 设计

潜在结果表述

我们首先使用潜在结果术语来表述问题。随后,对于更熟悉结构方程表示法的读者,我们也将提供该表示法下的表述。

我们库中开发的方法解决了以下一般问题:设 \(Y(\vec{t})\) 表示随机变量,它对应于我们对样本施加治疗 \(\vec{t} \in \T\) 后的感兴趣结果的值。给定两个治疗向量 \(\vec{t}_0, \vec{t}_1 \in \T\),一个协变量向量 \(\vec{x}\) 和一个潜在结果随机向量 \(Y(\vec{t})\),我们想要估计量

\[\tau(\vec{t}_0, \vec{t}_1, \vec{x}) = \E[Y(\vec{t}_1) - Y(\vec{t}_0) | X=\vec{x}]\]

我们将后一个量称为从治疗 \(\vec{t}_0\) 到治疗 \(\vec{t}_1\)异质治疗效果,条件是可观测变量 \(\vec{x}\)。如果治疗是连续的,那么人们可能也会对治疗点周围的局部效应感兴趣。后者转化为估计治疗向量周围的局部梯度,条件是可观测变量

\[\partial\tau(\vec{t}, \vec{x}) = \E\left[\nabla_{\vec{t}} Y(\vec{t}) | X=\vec{x}\right] \tag{边际 CATE}\]

我们将后者称为异质边际效应1

我们假设数据是由某种收集策略生成的。特别是,我们假设我们有如下形式的数据:\(\{Y_i(T_i), T_i, X_i, W_i, Z_i\}\),其中 \(Y_i(T_i)\) 是所选治疗的观测结果,\(T_i\) 是治疗,\(X_i\) 是用于捕捉异质性的协变量,\(W_i\) 是我们认为影响潜在结果 \(Y_i(T_i)\) 并可能影响治疗 \(T_i\) 的其他可观测协变量;\(Z_i\) 是影响治疗 \(T_i\) 但不直接影响潜在结果的变量。我们将变量 \(W_i\) 称为控制变量,将变量 \(Z_i\) 称为工具变量。变量 \(X_i\) 也可以被视为控制变量,但它们的特殊之处在于它们是控制变量的一个子集,我们希望根据它们来衡量治疗效果的异质性。我们将它们称为特征

最后,有时我们可能不仅对效应感兴趣,还对实际的反事实预测感兴趣,即估计量

\[\mu(\vec{t}, \vec{x}) = \E\left[Y(\vec{t}) | X=\vec{x}\right] \tag{反事实预测}\]

我们的软件包不提供反事实预测的支持。然而,对于我们的大多数估计器(那些假设治疗中线性的模型),可以通过将任何基线预测模型与我们的因果效应模型相结合来轻松构建反事实预测,例如训练任何机器学习模型 \(b(\vec{t}, \vec{x})\) 来解决回归/分类问题 \(\E[Y | T=\vec{t}, X=\vec{x}]\),然后设置 \(\mu(vec{t}, \vec{x}) = \tau(\vec{t}, T, \vec{x}) + b(T, \vec{x})\),其中 \(T\) 是在该观测策略下该样本的观测治疗,或者该观测策略将分配给该样本的治疗。这些辅助机器学习模型可以使用 EconML 之外的任何机器学习软件包进行训练。

结构方程表述

我们可以等效地通过结构方程来描述数据和感兴趣的量。特别是,假设我们从某个联合分布中观测到 i.i.d.(独立同分布)样本 \(\{Y_i, T_i, X_i, W_i, Z_i\}\),并且我们假设以下世界结构方程模型

\[ \begin{align}\begin{aligned}Y =~& g(T, X, W, \epsilon)\\T =~& f(X, W, Z, \eta)\end{aligned}\end{align} \]

其中 \(\epsilon\)\(\eta\) 是独立于 \(X, Z, T, W\)噪声随机变量,但它们之间可能存在相关性。然后,我们想要估计的目标量可以表示为

\begin{align} \tau(\vec{t}_0, \vec{t}_1, \vec{x}) =& \E[g(\vec{t}_1, X, W, \epsilon) - g(\vec{t}_0, X, W, \epsilon) | X=\vec{x}] \tag{CATE} \\ \partial\tau(\vec{t}, \vec{x}) =& \E[\nabla_{\vec{t}} g(\vec{t}, X, W, \epsilon) | X=\vec{x}] \tag{边际 CATE} \\ \end{align}

在这些期望中,随机变量 \(W, \epsilon\) 取自生成数据的同一分布。换句话说,潜在结果表述与结构方程表述之间存在一对一的对应关系,即随机变量 \(Y(t)\) 等于随机变量 \(g(t, X, W, \epsilon)\),其中 \(X, W, \epsilon\) 是从生成数据集中每个样本的分布中抽取的。

条件平均治疗效果包的 API

我们 API 中所有方法的基础类具有以下签名

基础 CATE 估计器类
class BaseCateEstimator

    def fit(self, Y, T, X=None, W=None, Z=None, inference=None):
        ''' Estimates the counterfactual model from data, i.e. estimates functions
        τ(·, ·, ·)}, ∂τ(·, ·) and μ(·, ·)

        Parameters:
        Y: (n × d_y) matrix of outcomes for each sample
        T: (n × d_t) matrix of treatments for each sample
        X: (n × d_x) matrix of features for each sample, optional
        W: (n × d_w) matrix of controls for each sample, optional
        Z: (n × d_z) matrix of instruments for each sample, optional
        inference: str or `Inference` instance, optional
            Method for performing inference.  All estimators support 'bootstrap'
            (or an instance of `BootstrapInference`), some support other methods as well.
        '''

    def effect(self, X=None, *, T0, T1):
        ''' Calculates the heterogeneous treatment effect τ(·, ·, ·) between two treatment
        points conditional on a vector of features on a set of m test samples {T0_i, T1_i, X_i}

        Parameters:
        T0: (m × d_t) matrix of base treatments for each sample
        T1: (m × d_t) matrix of target treatments for each sample
        X:  (m × d_x) matrix of features for each sample, optional

        Returns:
        tau: (m × d_y) matrix of heterogeneous treatment effects on each outcome
            for each sample
        '''

    def marginal_effect(self, T, X=None):
        ''' Calculates the heterogeneous marginal effect ∂τ(·, ·) around a base treatment
        point conditional on a vector of features on a set of m test samples {T_i, X_i}

        Parameters:
        T: (m × d_t) matrix of base treatments for each sample
        X:  (m × d_x) matrix of features for each sample, optional

        Returns:
        grad_tau: (m × d_y × d_t) matrix of heterogeneous marginal effects on each outcome
            for each sample
        '''

    def effect_interval(self, X=None, *, T0=0, T1=1, alpha=0.05):
        ''' Confidence intervals for the quantities τ(·, ·, ·) produced by the model.
        Available only when inference is not None, when calling the fit method.

        Parameters:
        X:  (m, d_x) matrix of features for each sample, optional
        T0: (m, d_t) matrix of base treatments for each sample, optional
        T1: (m, d_t) matrix of target treatments for each sample, optional
        alpha: float in [0, 1] of the (1-alpha) level of confidence, optional

        Returns:
        lower, upper : tuple of the lower and the upper bounds of the confidence interval
            for each quantity.
        '''

    def marginal_effect_interval(self, T, X=None, *, alpha=0.05):
        ''' Confidence intervals for the quantities effect ∂τ(·, ·) produced by the model.
        Available only when inference is not None, when calling the fit method.

        Parameters:
        T: (m, d_t) matrix of base treatments for each sample
        X: (m, d_x) matrix of features for each sample, optional
        alpha: float in [0, 1] of the (1-alpha) level of confidence, optional

        Returns:
        lower, upper : tuple of the lower and the upper bounds of the confidence interval
            for each quantity.
        '''

治疗(Treatment)中线性的 CATE 估计器

恒定边际效应

在许多情况下,我们可能希望对数据生成过程的形式做出进一步的结构性假设。一个特别普遍的假设是结果 \(y\) 在治疗向量中是线性的,因此边际效应在不同治疗中是恒定的,即

\[ \begin{align}\begin{aligned}Y =~& H(X, W) \cdot T + g(X, W, \epsilon)\\T =~& f(X, W, Z, \eta)\end{aligned}\end{align} \]

其中 \(\epsilon, \eta\) 是外生的噪声项。在这种线性响应假设下,我们观察到 CATE 和边际 CATE 呈现出特殊形式

\[ \begin{align}\begin{aligned}\tau(\vec{t}_0, \vec{t}_1, \vec{x}) =~& \E[H(X, W) | X=\vec{x}] \cdot (\vec{t}_1 - \vec{t}_0)\\\partial \tau(\vec{t}, \vec{x}) =~& \E[H(X, W) | X=\vec{x}]\end{aligned}\end{align} \]

因此,边际 CATE 独立于 \(\vec{t}\)。在这种情况下,我们将恒定边际 CATE 记为 \(\theta(\vec{x})\),即

\[\theta(\vec{x}) = \E[H(X, W) | X=\vec{x}] \tag{恒定边际 CATE}\]

给定治疗特征化(Treatment Featurization)的恒定边际效应和边际效应

此外,我们可能对结果线性依赖于治疗向量的变换(通过某个特征提取器 \(\phi\))的情况感兴趣。一些估计器支持将此类特征提取器 \(\phi\) 直接传递给估计器,在这种情况下,结果将按如下方式建模

\[Y = H(X, W) \cdot \phi(T) + g(X, W, \epsilon)\]

然后我们可以在特征化治疗空间中获得恒定边际效应

\[ \begin{align}\begin{aligned}\tau(\phi(\vec{t_0}), \phi(\vec{t_1}), \vec{x}) =~& \E[H(X, W) | X=\vec{x}] \cdot (\phi(\vec{t_1}) - \phi(\vec{t_0}))\\\partial \tau(\phi(\vec{t}), \vec{x}) =~& \E[H(X, W) | X=\vec{x}]\\\theta(\vec{x}) =~& \E[H(X, W) | X=\vec{x}]\end{aligned}\end{align} \]

最后,我们可以通过将恒定边际效应(位于特征化治疗空间中)与治疗特征提取器在 \(\vec{t}\) 处的雅可比矩阵相乘,来恢复原始治疗空间中的边际效应。

\[\partial \tau(\vec{t}, \vec{x}) = \theta(\vec{x}) \nabla \phi(\vec{t}) \tag{边际 CATE}\]

其中 \(\nabla \phi(\vec{t})\)\(d_{ft} \times d_{t}\) 雅可比矩阵,\(d_{ft}\)\(d_{t}\) 分别是特征化治疗和原始治疗的维度。

治疗(Treatment)中线性的 CATE 估计器的 API

考虑到线性治疗效果假设的普遍性,我们将创建一个通用的 LinearCateEstimator,它将支持一个方法,该方法可以在任何目标特征向量 \(\vec{x}\) 处返回恒定边际 CATE 和恒定边际 CATE 区间,并在提供治疗特征提取器时计算原始治疗空间中的边际效应。

线性 CATE 估计器类
class LinearCateEstimator(BaseCateEstimator):
    self.treatment_featurizer = None

    def const_marginal_effect(self, X=None):
        ''' Calculates the constant marginal CATE θ(·) conditional on a vector of
        features on a set of m test samples {X_i}

        Parameters:
        X: (m × d_x) matrix of features for each sample, optional

        Returns:
        theta: (m × d_y × d_f_t) matrix of constant marginal CATE of each treatment on each outcome
        for each sample, where d_f_t is the dimension of the featurized treatment.
        If treatment_featurizer is None, d_f_t = d_t
        '''

    def const_marginal_effect_interval(self, X=None, *, alpha=0.05):
        ''' Confidence intervals for the quantities θ(·) produced by the model.
        Available only when inference is not None, when calling the fit method.

        Parameters:
        X: (m, d_x) matrix of features for each sample, optional
        alpha: float in [0, 1] of the (1-alpha) level of confidence, optional

        Returns:
        lower, upper : tuple of the lower and the upper bounds of the confidence interval
            for each quantity.
        '''

    def effect(self,  X=None, *, T0, T1,):
        if self.treatment_featurizer:
            return const_marginal_effect(X) * (T1 - T0)
        else:
            dt = self.treatment_featurizer.transform(T1) - self.treatment_featurizer.transform(T0)
            return const_marginal_effect(X) * dt

    def marginal_effect(self, T, X=None)
        if self.treatment_featurizer is None:
            return const_marginal_effect(X)
        else:
            # for every observation X_i, T_i,
            # calculate jacobian at T_i and multiply with const_marginal_effect at X_i

    def marginal_effect_interval(self, T, X=None, *, alpha=0.05):
        if self.treatment_featurizer is None:
            return const_marginal_effect_interval(X, alpha=alpha)
        else:
            # perform separate treatment featurization inference logic

API 使用示例

让我们来看一个简单的示例,即使不考虑实际使用的估计方法,通过后一个 API 也能实现什么。

让我们考虑一个由以下方程控制的假设数据生成过程(DGP)

\[\begin{split}\begin{align} Y(t) =~& \gamma t^2 + \delta X t + \langle \zeta, W \rangle + \epsilon\\ T =~& \langle \alpha, W \rangle + \langle \beta, Z \rangle + \eta\\ X, Z, \epsilon, \eta \sim~& N(0, 1), ~~ W \sim N(0, I_{d}) \end{align}\end{split}\]

假设我们有来自这个 DGP 的 \(n\) 个样本。例如,我们可以用以下代码创建这些样本

结构方程生成示例数据
import numpy as np

# Instance parameters
n_controls = 100
n_instruments = 1
n_features = 1
n_treatments = 1
alpha = np.random.normal(size=(n_controls, 1))
beta = np.random.normal(size=(n_instruments, 1))
gamma = np.random.normal(size=(n_treatments, 1))
delta = np.random.normal(size=(n_treatments, 1))
zeta = np.random.normal(size=(n_controls, 1))

n_samples = 1000
W = np.random.normal(size=(n_samples, n_controls))
Z = np.random.normal(size=(n_samples, n_instruments))
X = np.random.normal(size=(n_samples, n_features))
eta = np.random.normal(size=(n_samples, n_treatments))
epsilon = np.random.normal(size=(n_samples, 1))
T = np.dot(W, alpha) + np.dot(Z, beta) + eta
y = np.dot(T**2, gamma) + np.dot(np.multiply(T, X), delta) + np.dot(W, zeta) + epsilon

然后我们可以将反事实模型拟合到数据。为了学习 CATE 的置信区间,我们可以将一个额外的推断参数传递给 fit 方法,所有估计器都支持 bootstrap 区间。我们可以运行以下代码

因果模型拟合示例
# Fit counterfactual model
cfest = BaseCateEstimator()
cfest.fit(y, T, X=X, W=W, Z=Z, inference='bootstrap')

现在假设我们想估计训练数据中每个点 \(X_i\) 在治疗 1 和治疗 0 之间的条件平均治疗效果。这应该是量 \(\gamma + \delta X_i\) 的估计。我们还可以获得 CATE 的置信区间。我们可以运行以下代码

估计从治疗 0 到 1 的所有训练特征的 cate
X_test = X
# Estimate heterogeneous treatment effects from going from treatment 0 to treatment 1
T0_test = np.zeros((X_test.shape[0], n_treatments))
T1_test = np.ones((X_test.shape[0], n_treatments))
hetero_te = cfest.effect(X_test, T0=T0_test, T1=T1_test)
hetero_te_interval =  cfest.effect_interval(X_test, T0=T0_test, T1=T1_test, alpha=0.1)

现在假设我们想估计在治疗 0 时,每个点 \(X_i\) 的条件边际效应。这应该是量 \(\delta X_i\) 的估计。我们还可以获得 CATE 的置信区间。我们可以运行以下代码

估计在治疗 0 时所有训练特征的边际 cate
# Estimate heterogeneous marginal effects around treatment 0
T_test = np.zeros((X_test.shape[0], n_treatments))
hetero_marginal_te = cfest.marginal_effect(T_test, X_test)
hetero_marginal_te_interval = cfest.marginal_effect_interval(T_test, X_test, alpha=0.1)

假设我们想在子人群上创建这些估计量的投影,即平均治疗效果,或者在 \(X_i\geq 1/2\) 人群上的平均治疗效果。我们可以简单地按如下方式实现

在子人群上投影
# Estimate average treatment effects over a population of z's
T0_test = np.zeros((X_test.shape[0], n_treatments))
T1_test = np.ones((X_test.shape[0], n_treatments))

# average treatment effect
ate = np.mean(cfest.effect(X_test, T0=T0_test, T1=T1_test)) # returns estimate of γ + δ 𝔼[x]

# average treatment effect of population with x>1/2
# returns estimate of γ + δ 𝔼[x | x>1/2]
cate = np.mean(cfest.effect(X_test[X_test>1/2], T0=T0_test[X_test>1/2], T1=T1_test[X_test>1/2]))

更重要的是,假设我们想了解如果我们遵循某种治疗策略(例如,对所有具有 \(X_i\geq 0\) 特征的人进行治疗)时,整体预期响应变化会是什么。这也可以轻松地按如下方式完成

估计某种治疗策略的预期提升
# Estimate expected lift of treatment policy: π(z) = 𝟙{x > 0} over existing policy
Pi0_test = T
Pi1_test = (X_test > 0) * 1.
# returns estimate of γ/2 + δ/√(2π)
policy_effect = np.mean(cfest.effect(X_test, T0=Pi0_test, T1=Pi1_test))

# Estimate expected lift of treatment policy: π(x) = 𝟙{x > 0} over baseline of no treatment
Pi0_test = np.zeros((X_test.shape[0], n_treatments))
Pi1_test = (X_test > 0) * 1.
# returns estimate of γ/2 + δ/√(2π)
policy_effect = np.mean(cfest.effect(X_test, T0=Pi0_test, T1=Pi1_test))

脚注

1

总是可以用前者近似后者,反之亦然,即对于足够小的 \(\delta\),有 \(\partial_i \tau(\vec{t},\vec{x}) \approx \tau(\vec{t}, \vec{t} + \delta \vec{e}_i, \vec{x})/\delta\),类似地,有 \(\tau(\vec{t_0}, \vec{t_1}, \vec{x}) = \int_{0}^{1} \partial\tau(\vec{t}_0 + q (\vec{t}_1 - \vec{t}_0), \vec{x}) (\vec{t}_1 - \vec{t_0})dq\)。然而,在许多情况下,利用结构的更直接的方法可能会简化这些通用变换。