19

如何调用partial_fit()封装在Pipeline () 中的 scikit-learn 分类器?

我正在尝试使用以下方法构建一个可增量训练的文本分类器SGDClassifier

from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.multiclass import OneVsRestClassifier

classifier = Pipeline([
    ('vectorizer', HashingVectorizer(ngram_range=(1,4), non_negative=True)),
    ('tfidf', TfidfTransformer()),
    ('clf', OneVsRestClassifier(SGDClassifier())),
])

但我AttributeError试图打电话classifier.partial_fit(x,y)

它支持fit(),所以我不明白为什么partial_fit()不可用。是否可以自省管道,调用数据转换器,然后直接调用partial_fit()我的分类器?

4

4 回答 4

17

这就是我正在做的事情——“mapper”和“clf”是我的 Pipeline obj 中的两个步骤。

def partial_pipe_fit(pipeline_obj, df):
    X = pipeline_obj.named_steps['mapper'].fit_transform(df)
    Y = df['class']
    pipeline_obj.named_steps['clf'].partial_fit(X,Y)

您可能希望在不断调整/更新分类器时跟踪性能 - 但这是次要的

更具体地说-原始管道的构造如下

to_vect = Pipeline([('vect', CountVectorizer(min_df=2, max_df=.9, ngram_range=(1, 1), max_features = 100)),
                            ('tfidf', TfidfTransformer())])
full_mapper = DataFrameMapper([
            ('norm_text', to_vect),
            ('norm_fname', to_vect), ])

full_pipe = Pipeline([('mapper', full_mapper), ('clf', SGDClassifier(n_iter=15, warm_start=True,
                                                                n_jobs=-1, random_state=self.random_state))])

google DataFrameMapper 以了解更多信息 - 但在这里它只是启用了一个与 pandas 配合得很好的转换步骤

于 2015-12-14T05:34:15.607 回答
8

管道不使用partial_fit,因此不公开它。我们可能需要一个专用的流水线方案来进行核外计算,但这也取决于先前模型的能力。

特别是在这种情况下,您可能希望对数据进行多次传递,一次以适合管道的每个阶段,然后转换数据集以适合下一个阶段,除了第一阶段是无状态的,因此不适合数据中的参数。

同时,根据您的需要推出您自己的包装器代码可能更容易。

于 2013-07-29T19:18:43.627 回答
1

尽管这个问题已有 8 年历史,但它仍然非常相关,并且已经有很长一段时间没有更新了。

事实上,现在有一个很好的包,由 Vincent Warmerdam 创建,名为tokenwiser.

它用于 NLP 的东西,主要是为了适应sklearn基础设施。但是,即使不是 NLP 任务也可以使用主要构建块。

该软件包有PartialPipeline 样板和文档

这里的例子:


import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.feature_extraction.text import HashingVectorizer

from tokenwiser.textprep import Cleaner, Identity, HyphenTextPrep
from tokenwiser.pipeline import PartialPipeline, PartialFeatureUnion

pipe = PartialPipeline([
    ("clean", Cleaner()),
    ("union", PartialFeatureUnion([
        ("full_text_pipe", PartialPipeline([
            ("identity", Identity()),
            ("hash1", HashingVectorizer()),
        ])),
        ("hyphen_pipe", PartialPipeline([
            ("hyphen", HyphenTextPrep()),
            ("hash2", HashingVectorizer()),
        ]))
    ])),
    ("clf", SGDClassifier())
])

X = [
    "i really like this post",
    "thanks for that comment",
    "i enjoy this friendly forum",
    "this is a bad post",
    "i dislike this article",
    "this is not well written"
]

y = np.array([1, 1, 1, 0, 0, 0])

for loop in range(3):
    pipe.partial_fit(X, y, classes=[0, 1])

我可以想象这个模板甚至适用于非 NLP 相关的东西。希望有人会发现这个超级有用。

于 2021-11-20T17:05:29.670 回答
1

我还提出了在 sklearn 管道中使用 partial_fit 的基本实现。

我们只需要使用允许部分拟合的模型(例如 SGDregressor、xgboost 等)并创建自己的 sklearn 兼容类

(对在他的TOKENWISER项目中开始此项目的文森特·沃默丹(Vincent Warmerdam)表示极大的敬意)


import xgboost as xgb
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklego.preprocessing import PatsyTransformer

class xgboost_partial_trainer(BaseEstimator, TransformerMixin):
    """
    allows for incremental training od xgboost model within a sklean pipeline
    """

    def __init__(self, training_params: dict = None):

        self.training_params = training_params
        self.trained_model = None
        self._first_call = True
        self.evals_result = {}
        self.iter_number = 1
        self._X_train, self._X_test, self._y_train, self._y_test = (
            None,
            None,
            None,
            None,
        )

    def partial_fit(self, X, y=None, classes=None, **fit_params):

        print(f"firts run: {self._first_call}, n_iter = {self.iter_number}")
        self.iter_number += 1

        if self._first_call:

            # Select random subset of data and store within the model (for error loss over time)
            self._X_train, self._X_test, self._y_train, self._y_test = train_test_split(
                X, y, test_size=0.6, random_state=1
            )

            self._xg_train = xgb.DMatrix(self._X_train, label=self._y_train)
            self._xg_test = xgb.DMatrix(self._X_test, label=self._y_test)

            # validations set to watch performance - same testing data, changebla training data
            self.watchlist = [
                (self._xg_train, "train_batch"),
                (self._xg_test, "eval_fixed"),
            ]

            # Trainig Part Itself
            self.trained_model = xgb.train(
                params=self.training_params,
                dtrain=xgb.DMatrix(X, y),
                xgb_model=self.trained_model,
                evals=self.watchlist,
            )

            # Swich outside firts batch
            self._first_call = False

        else:
            self._xg_train = xgb.DMatrix(X, y)
            self.watchlist = [
                (self._xg_train, "train_batch"),
                (self._xg_test, "eval_fixed"),
            ]

            self.trained_model = xgb.train(
                params=self.training_params,
                dtrain=self._xg_train,
                xgb_model=self.trained_model,
                evals=self.watchlist,
            )
        #             self._predicted_y = self.trained_model.predict(xgb.DMatrix(self._X_test))
        #             print(f"mean_squared_error = {mean_squared_error(self._y_test, self._predicted_y, squared = False)}")

        return self

    def predict(self, X, y=None, **fit_params):
        return self.trained_model.predict(xgb.DMatrix(X))

    def transform(self, X, y=None, **fit_params):
        return self.trained_model.predict(xgb.DMatrix(X))

    def fit(self, X, y=None, **fit_params):
        return self


class PartialPipeline(Pipeline):
    """
    Utility function to generate a `PartialPipeline`

    Arguments:
        steps: a collection of text-transformers
    """

    def partial_fit(self, X, y=None, classes=None, **kwargs):
        """
        Fits the components, but allow for batches.
        """

        #         print(f"there are partial steps {self.steps_partial}")

        for _, step in self.steps:
            if hasattr(step, "partial_fit"):
                step.partial_fit(X, y, **kwargs)

            elif hasattr(step, "fit_transform"):
                X = step.fit_transform(X)

            elif hasattr(step, "transform"):
                X = step.transform(X)

            elif hasattr(step, "fit"):
                X = step.fit(X)

        return self

一旦我们有了这些 sklearn 类,我们就可以使用Pipeline

my_pipeline = PartialPipeline([
("patsy", PatsyTransformer(FORMULA2)),
("xgboost_model", xgboost_partial_trainer(training_params=params)),
])

df_chunked = pd.read_csv(your_date, chunksize=5_000)

for df in df_chunked:
    my_pipeline.partial_fit(df, y=df["speed"])
  

请向我提供反馈和代码清理建议。我完全意识到这并不完美。但是,作为一个不错的原型 - 还不错!

于 2021-11-23T09:38:11.593 回答