线性回归的原理分析与多种Python实现

undefined August 27, 2019 View/edit this page on Colab

Creative Commons License This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.



本文简要概括线性回归的各种Python实现,线性回归作为机器学习的入门算法具有异常重要的意义,经常用来理解机器学习中的各种重要概念。不仅如此,线性回归还是很多其他经典算法的基础,大量线性或者非线性模型都由线性回归发展而来,理解线性回归的实现,对实现其他算法具有基础性意义。

线性回归的基本原理

y=jβjxjy = \sum_j\beta_j x_j

其中,yy是因变量或者响应变量,x1...xj{x_1...x_j}是自变量或者输入变量,β1...βj{\beta_1...\beta_j}是待估计的函数参数。

线性回归是关于上述线性函数参数的估计方法,或者也可以说是关于给定线性模型这个机器的学习过程。

这里的线性模型是指函数相对于函数参数是线性的,实际上上述模型的估计可以用数据矩阵表达为

Y^=Xβ^\hat Y = X\hat \beta

这里Y^\hat Y是预测值,XX是已知的输入数据,而β^\hat \beta是估计的函数参数。

线性回归有多种估计(优化)方法。

  1. 最小二乘法,或者Least squares estimates。从统计角度讲,其实就是最大似然性估计。目标函数如下

    argminβ^(YY^)T(YY^)\mathop{\operatorname{argmin}}\limits_{\hat{\beta}}(Y - \hat Y)^T(Y - \hat Y)

    在最优解下,Y^\hat Y其实就是似然概率Pr(YX)Pr(Y|X)的期望E(YX)E(Y|X)

    已经知道,最优解的矩阵形式为,

    β^=(XTX)1XTY\hat \beta = (X^TX)^{-1} X^TY

    由于目标函数是二阶凸函数,梯度下降的收敛速度很快,而且能够找到全局最优解。

  2. 当然还有更复杂一点的贝叶斯线性回归,或者更准确说叫最大事后概率估计,这个下次再讲。

线性回归的基本实现

Input code

import numpy as np
from sklearn import metrics

Input code

class LinearRegression:
    # Optimize B directly with the given analytical solution in previous section
    def train_in_batch(self, X, y):
        # prepend bias column
        X = np.insert(X, 0, 1, axis=1)
        X_tran = np.transpose(X) # (M + 1) x N
        X_square_inverse = np.linalg.inv(X_tran.dot(X)) # (M + 1) x M
        # check dimension match: (M + 1) x (M + 1) x (M + 1) x N x N x 1 = (M + 1) x 1 (correct)
        self.B = X_square_inverse.dot(X_tran.dot(y))
        # training mean squared error
        y_predict = self.predict(X)
        print("Training MSE: ", metrics.mean_squared_error(y, y_predict))
        print("Training variance score: ", metrics.explained_variance_score(y, y_predict))
        print("model parameters: ", self.B)
    
    # test on new data in batch
    def predict(self, new_X):
        # check dimension match: P x (M + 1) x (M + 1) x 1 = P x 1, P: number of new samples, >= 1 
        predicted_y = new_X.dot(self.B)
        return predicted_y

模型的基本测试

Input code

# Load benchmark dataset
from sklearn import datasets
X, y = datasets.load_diabetes(return_X_y=True)
X.shape, y.shape

Output results

((442, 10), (442,))

Input code

# Run model
def test_basic_model():
    model = LinearRegression()
    model.train_in_batch(X, y)
test_basic_model()

Output results

Training MSE:  2859.6903987680657
Training variance score:  0.5177494254132934
model parameters:  [ 152.13348416  -10.01219782 -239.81908937  519.83978679  324.39042769
 -792.18416163  476.74583782  101.04457032  177.06417623  751.27932109
   67.62538639]

线性回归的基本假设

线性回归具有唯一解的条件是每个输入特征列都是线性独立的,换句话说,输入矩阵的列之间没有相关性。上面的实现没有考虑这种情况,所以在下面的test case里会报错。 Input code

def test_correlated_input():
    # add a new column at column 5 that is fully correlated with the original column 8
    correlated_X = np.insert(X, 5, X[:, 8] * 2, axis=1)
    model = LinearRegression()
    model.train_in_batch(correlated_X, y)
try:
    test_correlated_input()
except np.linalg.LinAlgError as e:
    print("Error occurs because input is a", e)

Output results

Error occurs because input is a Singular matrix

当输入数据的数量大于特征的数量时,结果可能不稳定。表现为有些系数很大,而另外一些特别小。 Input code

def test_rank_deficient_input():
    # make sure the number of rows are less than number of columns, we have 10 columns and 8 rows in the following case
    new_N = 8
    deficient_X = X[:new_N,:]
    deficient_y = y[:new_N]
    model = LinearRegression()
    model.train_in_batch(deficient_X, deficient_y)
try:
    test_rank_deficient_input()
except np.linalg.LinAlgError as e:
    print("Error occurs because input is a", e)

Output results

Training MSE:  147836.92360831407
Training variance score:  -77.08338053366681
model parameters:  [-1.6000e+01  2.8160e+03 -2.0480e+03  8.1920e+03 -1.8432e+04  8.1920e+03
 -4.0960e+03 -8.1920e+03 -2.0480e+03 -1.2288e+04  0.0000e+00]

当两个特征高度相关但不完全相关时,结果也可能不稳定。表现为有些模型系数很大,而另外一些特别小。 Input code

def test_highly_correlated_input():
    # feature #5 and #9 are highly correlated
    highly_correlated_data = X[:, 8] * 2
    highly_correlated_data[-1] = highly_correlated_data[-1] + 0.3
    highly_correlated_X = np.insert(X, 5, highly_correlated_data, axis=1)
    highly_correlated_y = y
    model = LinearRegression()
    model.train_in_batch(highly_correlated_X, highly_correlated_y)
try:
    test_rank_deficient_input()
except np.linalg.LinAlgError as e:
    print("Error occurs because input is a", e)

Output results

Training MSE:  147836.92360831407
Training variance score:  -77.08338053366681
model parameters:  [-1.6000e+01  2.8160e+03 -2.0480e+03  8.1920e+03 -1.8432e+04  8.1920e+03
 -4.0960e+03 -8.1920e+03 -2.0480e+03 -1.2288e+04  0.0000e+00]

线性回归的统计推断

线性回归的统计推断主要是通过统计推断的方式来验证线性回归函数参数的假设概率分布模型,以及最优解的相关统计性质,比如信度区间。 线性回归的统计推断基于以下假设(非常重要),

  1. yiy_i之间没有相关性的,并且Pr(yi)Pr(y_i)的方差恒定为σ2\sigma^2σ2\sigma^2可以通过yiy_iy^i\hat y_i的统计方差来估计: σ^2=1NM1i=1N(yiy^i)2\hat \sigma^2 = \frac{1}{N−M−1}\sum^{N}_{i=1}(y_i − \hat y_i)^2
  2. E(YX)+ϵE(Y|X) + \epsilonyiy_i的估计模型,E(YX)E(Y|X)为关于XX的线性模型,并且yiy_i服从正态分布ϵN(0,σ2)\epsilon \sim N(0, \sigma^2)
  3. xix_i已知,并且服从中心极限定理,意味着在NN \sim \infty,输入变量服从正态分布。

基于假设1和2,我们可以有以下推断,

  1. (NM1)σ^2(N-M-1)\hat \sigma^2服从chi-squared概率分布(NM1)σ^2σ2χNM12(N-M-1)\hat \sigma^2 \sim \sigma^2\chi^2_{N-M-1}
  2. β^\hat \beta服从多变量正态分布β^N(β,(XTX)1σ2)\hat \beta \sim N(\beta, (X^TX)^{-1}\sigma^2),并且变量之间相互独立。
  3. β^\hat \betaσ^2\hat \sigma^2相互独立。

线性回归的统计测试

在实际应用中,我们通常需要在建模前或者建模后进行一些统计测试,来帮助我们确定最合适的线性回归模型。相关问题可能包括,

  1. 是否有高度相关的特征变量,如果是,是否需要排除?
  2. 怎样评估不同函数参数(对应不同特征变量)对模型的影响程度?(其实就是特征变量的重要性)
  3. 怎样在统计意义下,比较不同参数选择下模型的表现?

相关性表

研究输入变量两两间的相关性,以及与相应变量之间的相关性。相关性系数的公式如下。

Cor(xi,xj)=cov(xi,xj)σxi,σxjCor(x_i, x_j) = \frac{cov(x_i, x_j)}{\sigma_{x_i}, \sigma_{x_j}}

cov(xi,xj)=μxixyμxiμxjcov(x_i, x_j) = \mu_{x_i x_y} - \mu_{x_i}\mu_{x_j}

对于矩阵X=[x1,...,xj]X=[x_1, ..., x_j]来说,

共方差矩阵(covariance matrix)为

cov(X,X)=E(XTX)E(X)E(XT)cov(X, X) = E(X^TX) - E(X)E(X^T)

相关性系数矩阵(correlation matrix)为

cor(X,X)=(diag(cov(X,X))12)cov(X,X)(diag(cov(X,X))12)cor(X, X) = (diag(cov(X, X))^{-\frac{1}{2}}) cov(X, X) (diag(cov(X, X))^{-\frac{1}{2}}) Input code

# Implementation of correlation coefficient matrix
def cov(x1, x2):
    return np.mean(np.multiply(x1, x2)) - np.mean(x1) * np.mean(x2)

def cor(x1, x2):
    return cov(x1, x2) / (np.std(x1) * np.std(x2))

def zscore(X):
    mean_matrix = np.tile(np.mean(X, axis=0), (X.shape[0],1))
    std_matrix = np.tile(np.std(X, axis=0), (X.shape[0],1))
    return np.divide(X - mean_matrix, std_matrix)

def cov_matrix(X1, X2):
    zscore_X1 = zscore(X1)
    zscore_X2 = zscore(X2)
    N = zscore_X1.shape[0]
    return np.transpose(zscore_X1).dot(zscore_X2)/N - np.mean(zscore_X1, axis=0) * np.transpose(np.mean(zscore_X1, axis=0))

def diagonal_matrix(X):
    return np.diag(np.diag(X))

def cor_matrix(X1, X2):
    cov_m = cov_matrix(X1, X2)
    C = np.sqrt(np.linalg.inv(diagonal_matrix(cov_m)))
    return C.dot(cov_m).dot(C)  

Input code

def test_cor_matrix_implementation():
    new_y = np.reshape(y, (y.shape[0], 1))
    Xy = np.append(X, new_y, axis=1)
    return np.allclose(cor_matrix(Xy, Xy), np.corrcoef(np.transpose(Xy)))
test_cor_matrix_implementation()

Output results

True
根据相关系数表来去掉完全相关的输入变量

Input code

# implementation to detect correlated input
def is_correlated_input(X):
    return np.linalg.matrix_rank(np.transpose(X).dot(X)) != X.shape[1]
highly_correlated_data = X[:, 8] * 2
highly_correlated_data[-1] = highly_correlated_data[-1] + 0.3
highly_correlated_X = np.insert(X, 5, highly_correlated_data, axis=1)
fully_correlated_X = np.insert(X, 5, X[:, 8] * 2, axis=1)
print("original input is full rank: ", not is_correlated_input(X))
print("highly correlated input is full rank: ", not is_correlated_input(highly_correlated_X))
print("fully correlated input is full rank: ", not is_correlated_input(fully_correlated_X))

Output results

original input is full rank:  True
highly correlated input is full rank:  True
fully correlated input is full rank:  False

Input code

# correlation coefficient matrix for highly correlated input
m = cor_matrix(highly_correlated_X, highly_correlated_X)
print(np.array_str(m, precision=2))
# output shows at column 5, row 9, the correlation coefficient is 0.99, yet the matrix is still non-singular

Output results

[[ 1.    0.17  0.19  0.34  0.26  0.26  0.22 -0.08  0.2   0.27  0.3 ]
 [ 0.17  1.    0.09  0.24  0.04  0.14  0.14 -0.38  0.33  0.15  0.21]
 [ 0.19  0.09  1.    0.4   0.25  0.43  0.26 -0.37  0.41  0.45  0.39]
 [ 0.34  0.24  0.4   1.    0.24  0.38  0.19 -0.18  0.26  0.39  0.39]
 [ 0.26  0.04  0.25  0.24  1.    0.52  0.9   0.05  0.54  0.52  0.33]
 [ 0.26  0.14  0.43  0.38  0.52  1.    0.32 -0.37  0.61  0.99  0.46]
 [ 0.22  0.14  0.26  0.19  0.9   0.32  1.   -0.2   0.66  0.32  0.29]
 [-0.08 -0.38 -0.37 -0.18  0.05 -0.37 -0.2   1.   -0.74 -0.4  -0.27]
 [ 0.2   0.33  0.41  0.26  0.54  0.61  0.66 -0.74  1.    0.62  0.42]
 [ 0.27  0.15  0.45  0.39  0.52  0.99  0.32 -0.4   0.62  1.    0.46]
 [ 0.3   0.21  0.39  0.39  0.33  0.46  0.29 -0.27  0.42  0.46  1.  ]]

Input code

# implementation to find fully correlated columns
def find_correlated_columns(X):
    m = cor_matrix(X, X)
    used_cols = set()
    cor_groups = []
    for i in range(0, m.shape[0]):
        if i in used_cols:
            continue
        else:
            indices = list(filter(lambda x: x != i, np.argwhere(m[i,:] == 1).flatten()))
            if len(indices) > 0:
                cor_group = (i,) + tuple(indices)
                cor_groups.append(cor_group)
                used_cols = used_cols | set(cor_group)
    rest_cols = set(range(0, m.shape[1])) - used_cols
    for j in rest_cols:
        indices = list(filter(lambda x: x != j, np.argwhere(m[:,j] == 1).flatten()))
        if len(indices) > 0:
            cor_group = (j,) + tuple(indices)
            cor_groups.append(cor_group)
            used_cols = used_cols | set(cor_group)
    return cor_groups

Input code

# implementation to find fully correlated columns and remove redundant columns
def remove_correlated_columns(X):
    cor_groups = find_correlated_columns(X)
    print('Found correlated column groups: ', cor_groups)
    remove_columns = np.array(list(map(lambda group: group[1:], cor_groups))).flatten().tolist()
    print('Removed correlated columns: ', remove_columns)
    return np.delete(X, remove_columns, 1)
    
print(remove_correlated_columns(fully_correlated_X).shape)

Output results

Found correlated column groups:  [(5, 9)]
Removed correlated columns:  [9]
(442, 10)

函数系数表

函数系数表统计各个系数(对应各个输入变量)的值,标准差,Z-score(这里的z-score是对于函数参数βj\beta_j而言,和前面的输入变量的z-score不同),以及系数对模型影响(去掉或者添加某一系数是否会显著改变模型表现)的显著性pp值。

函数系数βj\beta_j的z-score可以通过与求xjx_j的z-score相似的方法得出,只是βj\beta_j服从不同的概率分布(参考统计推断小结)。

zj=β^jσ^vjz_{j} = \frac{\hat \beta_j}{\hat \sigma \sqrt v_j}

其中vjv_j是矩阵(XTX)1(X^TX)^{-1}的对角元素,并且这里假定βj=0\beta_j = 0 (代表缺少这个参数时对函数表现的影响)。

标准化的βj\beta_jzjz_j服从student-t distribution

zjtNM1N(0,1)(NM)z_j \sim t_{N-M-1} \sim N(0, 1) (N \gg M)

其中MM是函数参数的个数。

已知NN, MM,通过student-t概率分布函数(没有计算机的时候人们通常查表来得到相应的数据!),我们可以得到在概率为$5 % $(95 \% $信度水平)的情况下,拒绝假设(意味着: KaTeX parse error: Can't use function ' in math mode at position 7: 95 \% $̲信度水平)的情况下,拒绝假设(…\beta_j$能显著改善模型表现)需要的最小z-score值。或者给定z-score,我们可以求出在95%95\%信度水平下的显著性pp值。

由于在实际应用中,NMN \gg M比较容易满足,所以βj\beta_j的信度区间,

(β^jz(1α)vj12σ^,β^j+z(1α)vj12σ^)(\hat \beta_j - z^{(1-\alpha)}v_j^{\frac{1}{2}}\hat \sigma, \hat \beta_j + z^{(1-\alpha)}v_j^{\frac{1}{2}}\hat \sigma)

在$\alpha = 95 % $的显著性水平下,约等于

(β^j2se(β^j),β^j+2se(β^j))(\hat \beta_j - 2 se(\hat \beta_j), \hat \beta_j + 2 se(\hat \beta_j))

F测量 (F measurement)

函数系数表给出了单一变量的增减对于模型表现的影响。F测量主要用来测量多个变量的增减对于模型表现的影响。

F=(RSS0RSS1)/(M1M0)RSS1/(NM11)F = \frac{(RSS_0 - RSS_1)/(M_1 - M_0)}{RSS_1 / (N - M_1 - 1)}

其中,RSS=(yjy^j)2RSS = \sum(y_j - \hat y_j)^2, 0和1代表两种不同模型,M0,M1M_0, M_1是这两个模型的特征向量各自的长度,假设M0<M1M_0 < M_1

FF的取值实际上服从F分布:FFM1M0,NM11χM1M02(NM1)F \sim F_{M_1 - M_0, N - M_1 - 1} \sim \chi^2_{M_1 - M_0} (N \gg M_1)

M1M0=1M_1 - M_0 = 1时,FF的值实际上等价于zjz_j,服从相似的student-t distribution,近似于正态分布。

完整的线性回归算法实现

Input code

from sklearn import metrics
import numpy as np

class LinearRegression:
    def _find_correlated_columns(self, X):
        m = cor_matrix(X, X)
        used_cols = set()
        cor_groups = []
        for i in range(0, m.shape[0]):
            if i in used_cols:
                continue
            else:
                indices = list(filter(lambda x: x != i, np.argwhere(m[i,:] == 1).flatten()))
                if len(indices) > 0:
                    cor_group = (i,) + tuple(indices)
                    cor_groups.append(cor_group)
                    used_cols = used_cols | set(cor_group)
        rest_cols = set(range(0, m.shape[1])) - used_cols
        for j in rest_cols:
            indices = list(filter(lambda x: x != j, np.argwhere(m[:,j] == 1).flatten()))
            if len(indices) > 0:
                cor_group = (j,) + tuple(indices)
                cor_groups.append(cor_group)
                used_cols = used_cols | set(cor_group)
        return cor_groups

    def _remove_correlated_columns(self, X):
        cor_groups = find_correlated_columns(X)
        print('Found correlated column groups: ', cor_groups)
        remove_columns = np.array(list(map(lambda group: group[1:], cor_groups))).flatten().tolist()
        print('Removed correlated columns: ', remove_columns)
        return np.delete(X, remove_columns, 1)
    
    # Optimize B directly with the given analytical solution in previous section
    def train_in_batch(self, X, y):
        X = self._remove_correlated_columns(X)
        X = np.insert(X, 0, 1, axis=1)
        X_tran = np.transpose(X) # M x N
        X_square_inverse = np.linalg.inv(X_tran.dot(X)) # M x M
        # check dimension match: M x M x M x N x N x 1 = M x 1 (correct)
        self.B = X_square_inverse.dot(X_tran.dot(y))
        # training mean squared error
        y_predict = self.predict(X)
        print("Training MSE: ", metrics.mean_squared_error(y, y_predict))
        print("Training variance score: ", metrics.explained_variance_score(y, y_predict))
        print("model parameters: ", self.B)
    
    # test on new data in batch
    def predict(self, new_X):
        # check dimension match: P x M x M x 1 = P x 1, P: number of new samples, >= 1 
        predicted_y = new_X.dot(self.B)
        return predicted_y