1

我正在关注 Spark 的流媒体指南。我没有使用nc -lk 9999,而是创建了自己的简单 Python 服务器,如下所示。从下面的代码可以看出,它会a通过随机生成字母z

import socketserver
import time
from random import choice

class AlphaTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print('AlphaTCPHandler')
        alphabets = list('abcdefghikjklmnopqrstuvwxyz')

        try:
            while True:
                s = f'{choice(alphabets)}'
                b = bytes(s, 'utf-8')
                self.request.sendall(b)
                time.sleep(1)
        except BrokenPipeError:
            print('broken pipe detected')

if __name__ == '__main__':
    host = '0.0.0.0'
    port = 301

    server = socketserver.TCPServer((host, port), AlphaTCPHandler)
    print(f'server starting {host}:{port}')
    server.serve_forever()

我用客户端代码测试了这个服务器,如下所示。

import socket
import sys
import time

HOST, PORT = 'localhost', 301
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:
    sock.connect((HOST, PORT))
    print('socket opened')

    while True:    
        received = str(sock.recv(1024), 'utf-8')
        if len(received.strip()) > 0:
            print(f'{received}')
        time.sleep(1)
finally:
    sock.close()
    print('socket closed')

但是,我的 Spark 流代码似乎没有收到任何数据,或者它没有打印任何内容。代码如下。

from pyspark.streaming import StreamingContext
from time import sleep

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

lines = ssc.socketTextStream('0.0.0.0', 301)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

counts.pprint()

ssc.start()
sleep(5)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

我从输出中看到的只是下面的重复模式。

------------------------------------------
时间:2019-10-31 08:38:22
------------------------------------------

------------------------------------------
时间:2019-10-31 08:38:23
------------------------------------------

------------------------------------------
时间:2019-10-31 08:38:24
------------------------------------------

关于我做错了什么的任何想法?

4

1 回答 1

1

Your streaming code is working properly. It is your server that is feeding it the wrong information - there are no line separators after each letter, so what Spark sees is one constantly growing line and it simply keeps waiting for that line to finish, which never happens. Modify your server to send a new line with each letter:

while True:
    s = f'{choice(alphabets)}\n'  # <-- inserted \n in here
    b = bytes(s, 'utf-8')
    self.request.sendall(b)
    time.sleep(1)

And the result:

-------------------------------------------
Time: 2019-10-31 12:09:26
-------------------------------------------
('t', 1)

-------------------------------------------
Time: 2019-10-31 12:09:27
-------------------------------------------
('t', 1)

-------------------------------------------
Time: 2019-10-31 12:09:28
-------------------------------------------
('x', 1)
于 2019-10-31T12:15:48.107 回答