4

目前我的数据如下所示:

3-150
2-151
4-152
5-154
7-154
1-155
9-155
6-156

这只是人工的“tick”数据,第一个代表tick的值,第二个代表“午夜后的秒数”

因此,对于股票数据,我需要将这些数据分类为“条形”。那就是我需要在给定的时间内将所有的酒吧组合在一起。

一个例子是 4 秒小节。午夜后 0-3 秒的刻度将是 1 个小节,午夜后 4-7 秒将是另一个小节。

我有看起来像这样的导管/水槽,它将计算 1 个条形尺寸:

{-# LANGUAGE OverloadedStrings #-}

import Data.Maybe (isJust, fromJust)
import qualified Data.ByteString.Char8 as C
import Control.Applicative ((<$>), (<*>))

import Data.Conduit -- the core library
import qualified Data.Conduit.List as CL -- some list-like functions
import qualified Data.Conduit.Binary as CB -- bytes
import qualified Data.Conduit.Text as CT

data MyData = MyData Int Int
    deriving (Show)

binaryToData :: C.ByteString -> Maybe MyData
binaryToData bn = do
    let parts = C.split '-' bn
    case parts of
        (a:b:[]) -> MyData <$> (fst <$> (C.readInt a)) <*> (fst <$> (C.readInt b))
        _ -> Nothing

streamGenerator =
    CB.sourceFile "sample.txt" =$=
    CB.lines =$=
    CL.map binaryToData =$=
    CL.filter isJust =$=
    CL.map fromJust =$=
    CL.groupBy (\(MyData _ x) (MyData _ y) -> (x `quot` 4) == (y `quot` 4))

main :: IO ()
main = do
    mlines <- runResourceT $ streamGenerator $$ CL.consume
    print mlines

但是,我需要同时关闭流中的多个酒吧信息。例如,对于每 2 秒的小节,我需要一个 4 秒的小节。如果被调用的 2 秒条位于 4 秒条的中间,我想输出前 4 秒条。

这就是我的意思:

标准柱(数字表示柱应包含的按午夜后的秒数计的刻度):

2 second bar : 0-1, 2-3, 4-5, etc...
4 second bar : 0-3, 4-7, 8-11, etc...
combo: (0-1, null), (2-3, 0-3), (4-5, 0-3),  (6-7, 4-7), etc... 

因此,而不是我当前的 2 秒和 4 秒小节分组管道:

4 second bar : [[MyData 3 150,MyData 2 151],[MyData 4 152,MyData 5 154,MyData 7 154,MyData 1 155,MyData 9 155],[MyData 6 156]]
2 second bar : [[MyData 3 150,MyData 2 151],[MyData 4 152],[MyData 5 154,MyData 7 154,MyData 1 155,MyData 9 155],[MyData 6 156]]

我想要这个管道流:

[([MyData 3 150,MyData 2 151], [MyData 3 150,MyData 2 151])
,([MyData 4 152], [MyData 3 150,MyData 2 151])
,([MyData 5 154,MyData 7 154,MyData 1 155,MyData 9 155], [MyData 4 152,MyData 5 154,MyData 7 154,MyData 1 155,MyData 9 155])
,([MyData 6 156],[MyData 4 152,MyData 5 154,MyData 7 154,MyData 1 155,MyData 9 155])]

但是如果不做一些丑陋的事情,我似乎无法做到这一点。

4

1 回答 1

3

如果您不介意,我可以使用我的库来回答您的问题pipes,因为这就是我所接受的。conduit如果您愿意,可以将此解决方案翻译为。

解决此问题的干净解决方案需要推回,但pipes还没有推回,所以我继续实施它(我将在不久的将来作为扩展库包含在内):

import Control.Monad
import Control.Proxy
import Control.Proxy.Trans.State

-- Pushback primitives, soon to be in a `pipes` library

require :: (Monad m, Proxy p) => a' -> StateP [a] p a' a b' b m a
require a' = StateP $ \s -> runIdentityP $ do
    case s of
        [] -> do
            a <- request a'
            return (a, s)
        a:as -> do
            return (a, as)

pushback :: (Monad m, Proxy p) => a -> StateP [a] p a' a b' b m ()
pushback a = StateP $ \as -> runIdentityP $ return ((), a:as)

evalPushback = evalStateK []

有了这些,解决方案很简单:

data MyData = MyData Int Int deriving (Eq, Show)

-- Consumes ticks up until the deadline or the end of input
-- Returns the list of all ticks before the deadline
ticksUntil
 :: (Monad m, Proxy p)
 => Int -> () -> Consumer (StateP [Maybe MyData] p) (Maybe MyData) m [MyData]
ticksUntil deadline () = go where
    go = do
        x <- require ()
        case x of
            Just m@(MyData _ time) ->
                if (time < deadline)
                then do
                    ms <- go
                    return (m:ms)
                else do
                    pushback x
                    return []
            Nothing -> return []

bars
 :: (Monad m, Proxy p)
 => () -> Pipe (StateP [Maybe MyData] p) (Maybe MyData) ([MyData], [MyData]) m r
bars () = loop1 2 [] where
    -- First half of a 4-second window
    loop1 deadline b4 = do
        b2 <- (ticksUntil deadline >-> unitU) ()
        respond (b2, b4)
        loop2 (deadline + 2) b2 b4

    -- Second half of a 4-second window
    loop2 deadline b2 b4 = do
        b2' <- (ticksUntil deadline >-> unitU) ()
        let b4' = b2 ++ b2'
        respond (b2', b4')
        loop1 (deadline + 2) b4'

sample :: [MyData]
sample = [
    MyData 3 150,
    MyData 2 151,
    MyData 4 152,
    MyData 5 154,
    MyData 7 154,
    MyData 1 155,
    MyData 9 155,
    MyData 6 156]

-- Use the same trick as conduit: Nothing signals termination
source :: (Monad m, Proxy p) => () -> Producer p (Maybe MyData) m ()
source () = runIdentityP $ do
    (fromListS sample >-> mapD Just) ()
    respond Nothing

main = runProxy $
     source                 -- feed sample data
 >-> evalPushback bars      -- group the data into bars
 >-> filterD (/= ([], []))  -- Ignore empty bars
 >-> printD                 -- print outgoing bars

神奇在于bars功能。它只是在两种状态之间切换。 loop1是第一个状态,它期望在一组 4 个值中的第一个 2 个值,并且loop2是第二个状态,它期望第二个值 2 个值。

实现这一点最困难的部分不是编写代码,而是理解您的规范。不过幸运的是,我想我明白了你的意思,因为我的代码产生了与你的测试示例完全相同的行为:

>>> main
([MyData 3 150,MyData 2 151],[MyData 3 150,MyData 2 151])
([MyData 4 152],[MyData 3 150,MyData 2 151])
([MyData 5 154,MyData 7 154,MyData 1 155,MyData 9 155],[MyData 4 152,MyData 5 154,MyData 7 154,MyData 1 155,MyData 9 155])
([MyData 6 156],[MyData 4 152,MyData 5 154,MyData 7 154,MyData 1 155,MyData 9 155])

如果您对此感兴趣pipes,那么我建议您查看该pipes库,尤其是Control.Proxy.Tutorial中的教程,其中解释了我在代码中使用的许多习语。

于 2012-12-23T22:46:55.740 回答