我还提出了在 sklearn 管道中使用 partial_fit 的基本实现。
我们只需要使用允许部分拟合的模型(例如 SGDregressor、xgboost 等)并创建自己的 sklearn 兼容类
(对在他的TOKENWISER项目中开始此项目的文森特·沃默丹(Vincent Warmerdam)表示极大的敬意)
import xgboost as xgb
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklego.preprocessing import PatsyTransformer
class xgboost_partial_trainer(BaseEstimator, TransformerMixin):
"""
allows for incremental training od xgboost model within a sklean pipeline
"""
def __init__(self, training_params: dict = None):
self.training_params = training_params
self.trained_model = None
self._first_call = True
self.evals_result = {}
self.iter_number = 1
self._X_train, self._X_test, self._y_train, self._y_test = (
None,
None,
None,
None,
)
def partial_fit(self, X, y=None, classes=None, **fit_params):
print(f"firts run: {self._first_call}, n_iter = {self.iter_number}")
self.iter_number += 1
if self._first_call:
# Select random subset of data and store within the model (for error loss over time)
self._X_train, self._X_test, self._y_train, self._y_test = train_test_split(
X, y, test_size=0.6, random_state=1
)
self._xg_train = xgb.DMatrix(self._X_train, label=self._y_train)
self._xg_test = xgb.DMatrix(self._X_test, label=self._y_test)
# validations set to watch performance - same testing data, changebla training data
self.watchlist = [
(self._xg_train, "train_batch"),
(self._xg_test, "eval_fixed"),
]
# Trainig Part Itself
self.trained_model = xgb.train(
params=self.training_params,
dtrain=xgb.DMatrix(X, y),
xgb_model=self.trained_model,
evals=self.watchlist,
)
# Swich outside firts batch
self._first_call = False
else:
self._xg_train = xgb.DMatrix(X, y)
self.watchlist = [
(self._xg_train, "train_batch"),
(self._xg_test, "eval_fixed"),
]
self.trained_model = xgb.train(
params=self.training_params,
dtrain=self._xg_train,
xgb_model=self.trained_model,
evals=self.watchlist,
)
# self._predicted_y = self.trained_model.predict(xgb.DMatrix(self._X_test))
# print(f"mean_squared_error = {mean_squared_error(self._y_test, self._predicted_y, squared = False)}")
return self
def predict(self, X, y=None, **fit_params):
return self.trained_model.predict(xgb.DMatrix(X))
def transform(self, X, y=None, **fit_params):
return self.trained_model.predict(xgb.DMatrix(X))
def fit(self, X, y=None, **fit_params):
return self
class PartialPipeline(Pipeline):
"""
Utility function to generate a `PartialPipeline`
Arguments:
steps: a collection of text-transformers
"""
def partial_fit(self, X, y=None, classes=None, **kwargs):
"""
Fits the components, but allow for batches.
"""
# print(f"there are partial steps {self.steps_partial}")
for _, step in self.steps:
if hasattr(step, "partial_fit"):
step.partial_fit(X, y, **kwargs)
elif hasattr(step, "fit_transform"):
X = step.fit_transform(X)
elif hasattr(step, "transform"):
X = step.transform(X)
elif hasattr(step, "fit"):
X = step.fit(X)
return self
一旦我们有了这些 sklearn 类,我们就可以使用Pipeline
:
my_pipeline = PartialPipeline([
("patsy", PatsyTransformer(FORMULA2)),
("xgboost_model", xgboost_partial_trainer(training_params=params)),
])
df_chunked = pd.read_csv(your_date, chunksize=5_000)
for df in df_chunked:
my_pipeline.partial_fit(df, y=df["speed"])
请向我提供反馈和代码清理建议。我完全意识到这并不完美。但是,作为一个不错的原型 - 还不错!