3

我正在尝试运行一个 python 脚本来模拟交通传感器实时向我的 Google Cloud Shell 上的 PubSub 发送数据。我收到此错误

Traceback (most recent call last):
  File "./send_sensor_data.py", line 87, in <module>
    psclient = pubsub.Client()
AttributeError: 'module' object has no attribute 'Client'

尝试运行google.cloud.pubsub.__file__,不存在重复项。我一直在到处搜索,普遍的共识是将 pubsub 包安装到我尝试无济于事的虚拟环境中。到目前为止我已经尝试过:

  • 将虚拟机设置为干净状态
  • 卸载并重新安装所有 gcloud 组件
  • 将所有 gcloud 组件更新到最新版本
  • 卸载并重新安装 python pubsub 库
  • 在 virtualenv 中安装了 pubsub
  • 从不同的项目尝试
  • 从其他 GCP 帐户尝试过

这是我的脚本:

import time
import gzip
import logging
import argparse
import datetime
from google.cloud import pubsub

TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
TOPIC = 'sandiego'
INPUT = 'sensor_obs2008.csv.gz'

def publish(topic, events):
   numobs = len(events)
   if numobs > 0:
      with topic.batch() as batch:
         logging.info('Publishing {} events from {}'.
                    format(numobs, get_timestamp(events[0])))
         for event_data in events:
              batch.publish(event_data)

def get_timestamp(line):
   # look at first field of row
   timestamp = line.split(',')[0]
   return datetime.datetime.strptime(timestamp, TIME_FORMAT)

def simulate(topic, ifp, firstObsTime, programStart, speedFactor):
   # sleep computation
   def compute_sleep_secs(obs_time):
        time_elapsed = (datetime.datetime.utcnow() - programStart).seconds
        sim_time_elapsed = (obs_time - firstObsTime).seconds / speedFactor
        to_sleep_secs = sim_time_elapsed - time_elapsed
        return to_sleep_secs

   topublish = list() 

   for line in ifp:
       event_data = line   # entire line of input CSV is the message
       obs_time = get_timestamp(line) # from first column

       # how much time should we sleep?
       if compute_sleep_secs(obs_time) > 1:
          # notify the accumulated topublish
          publish(topic, topublish) # notify accumulated messages
          topublish = list() # empty out list

          # recompute sleep, since notification takes a while
          to_sleep_secs = compute_sleep_secs(obs_time)
          if to_sleep_secs > 0:
             logging.info('Sleeping {} seconds'.format(to_sleep_secs))
             time.sleep(to_sleep_secs)
       topublish.append(event_data)

   # left-over records; notify again
   publish(topic, topublish)

def peek_timestamp(ifp):
   # peek ahead to next line, get timestamp and go back
   pos = ifp.tell()
   line = ifp.readline()
   ifp.seek(pos)
   return get_timestamp(line)


if __name__ == '__main__':
   parser = argparse.ArgumentParser(description='Send sensor data to Cloud Pub/Sub in small groups, simulating real-time behavior')
   parser.add_argument('--speedFactor', help='Example: 60 implies 1 hour of data sent to Cloud Pub/Sub in 1 minute', required=True, type=float)
   args = parser.parse_args()

   # create Pub/Sub notification topic
   logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
   psclient = pubsub.Client()
   topic = psclient.topic(TOPIC)
   if not topic.exists():
      logging.info('Creating pub/sub topic {}'.format(TOPIC))
      topic.create()
   else:
      logging.info('Reusing pub/sub topic {}'.format(TOPIC))

   # notify about each line in the input file
   programStartTime = datetime.datetime.utcnow() 
   with gzip.open(INPUT, 'rb') as ifp:
      header = ifp.readline()  # skip header
      firstObsTime = peek_timestamp(ifp)
      logging.info('Sending sensor data from {}'.format(firstObsTime))
      simulate(topic, ifp, firstObsTime, programStartTime, args.speedFactor)
4

3 回答 3

4

pubsub.Client 类在 pubsub python 包的 0.27.0 版本之前一直存在。所以我只是创建了一个虚拟环境并在其中安装了0.27.0版本的pubsub。以下是命令:

virtualenv venv
source venv/bin/activate
pip install google-cloud-pubsub==0.27.0
于 2017-11-21T09:56:32.123 回答
1

谷歌云平台的解决方案是:

  1. 修改 send_senor_data.py 文件如下:

    一个。注释掉 pub_sub 的原始导入语句,使用 _v1 版本

      #from google.cloud import pubsub
       from google.cloud import pubsub_v1
    

    湾。找到此代码并将其替换如下:

      #publisher = pubsub.PublisherClient()
      publisher = pubsub_v1.PublisherClient()
    
    1. 然后执行你的 send_sensor_data.py 如下:

      ./send_sensor_data.py --speedFactor=60 --project=YOUR-PROJECT-NAME

于 2019-07-14T06:24:49.423 回答
0

没有pubsub.Client课。您需要选择一个PublisherClientSubscriberClient

https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/pubsub/google/cloud/pubsub.py

于 2017-11-18T07:09:10.160 回答