4

我正在尝试使用 ReactiveX (RxSwift) 将多张照片上传到服务器,收集每个请求的响应,然后发出一个最终请求以完成提交。

reduce在我尝试所有响应之前,一切似乎都运行良好。决赛subscribeNext永远不会被调用。(也许我误解了如何flatMapreduce工作?)

具体来说,这就是我尝试执行此过程的方式。

  • 准备一个 observable 来对每张照片进行编码(self.imageMgr是 的一个实例PHCachingImageManager()

    func getPhotoDataObservable(asset: PHAsset) -> Observable<NSData> {
        return create { observer in
            self.imageMgr.requestImageForAsset(asset,
                targetSize: PHImageManagerMaximumSize,
                contentMode: .AspectFit,
                options: nil,
                resultHandler: { (myImage, myInfo) -> Void in
                    let data = UIImageJPEGRepresentation(myImage!, 1.0)!
                    NSLog("Encoded photo")
                    observer.onNext(data)
                    self.converts += 1
                    if self.converts == self.userReview.photos.count {
                        NSLog("Completed encoding photos")
                        observer.onCompleted()
                    }
                })
            return NopDisposable.instance
        }
    }
    
  • 准备一个 observable 来上传每张编码后的照片(使用 Alamofire 和 RxAlamofire)

    func getPostPhotoObservable(photoData: NSData) -> Observable<ReviewPhotoObject> {
        return create { observer in
            NSLog("Uploading Photo")
    
            upload(.POST,
                urlRequest.URLString,
                headers: nil,
                multipartFormData: { mfd in
                    mfd.appendBodyPart(data: photoData, name: "image", fileName: "image", mimeType: "image/jpeg")
                },
                encodingMemoryThreshold: Manager.MultipartFormDataEncodingMemoryThreshold,
                encodingCompletion: { encodingResult in
                    switch encodingResult {
                    case .Success(let upload, _, _):
                        upload.responseJSON(completionHandler: { (myResponse) -> Void in
                            if let photoResponse = myResponse.result.value {
                                let photoObject = photoResponse.objectForKey("photo")!
                                let photo = ReviewPhotoObject()
                                photo.photoID = photoObject.objectForKey("id")! as! NSNumber
                                NSLog("Uploaded Photo")
                                observer.onNext(photo)
                            }
    
                            self.uploads += 1
                            if self.uploads == self.userReview.photos.count {
                                NSLog("Completed uploading photos")
                                observer.onCompleted()
                            }
                        })
    
                    case .Failure(let encodingError):
                        observer.onError(encodingError)
                        print(encodingError)
                    }
                })
    
            return NopDisposable.instance
        }
    }
    
  • 最后,把它们放在一起

    func postReview(review: MyReview) {
        self.userReview = review
    
        _ = review.photos.toObservable().flatMap { photos in
            return self.getPhotoDataObservable(photos)
        }.flatMap { photoData in 
            return self.getPostPhotoObservable(photoData)
        }.reduce([], { var accumulator, photo: ReviewPhotoObject) -> [Int] in
            accumulator.append(Int(photo.photoID))
            return accumulator
        }).subscribeNext({ (photoIds) -> Void in
            print(photoIds) // Never called
        })
    }
    

运行时(例如 2 张照片),输出如下:

Encoded photo
Uploading photo
Encoded photo
Uploading photo
Completed encoding photos
Uploaded photo
Uploaded photo
Completed uploading photos

subscribeNext永远不会被调用。由于关于 RxSwift 的文档仍然有点薄,我希望这里的人能告诉我我误解了什么。

4

1 回答 1

1

这里的想法是,一旦一个可观察对象完成发送它要发送的所有元素,它就应该完成。您正在为每个 PHAsset 创建一个 observable,并且该 observable 只发送一个元素,因此它应该在那之后完成。按照你的代码方式,只有最后一个会完成,所以reduce操作员只是坐在那里等待其余的完成,然后才能完成工作。

这是我编写第一个函数的方式(在 Swift 3 中而不是 2 中。)

extension PHImageManager {

    func requestMaximumSizeImage(for asset: PHAsset) -> Observable<UIImage> {
        return .create { observer in
            let request = self.requestImage(for: asset, targetSize: PHImageManagerMaximumSize, contentMode: .aspectFit, options: nil, resultHandler: { image, info in
                if let image = image {
                    observer.onNext(image)
                    observer.onCompleted()
                }
                else if let info = info, let error = info[PHImageErrorKey] as? Error {
                    observer.onError(error)
                }
            })
            return Disposables.create { self.cancelImageRequest(request) }
        }
    }
}

您会看到我会将其作为 PHImageManager 的扩展而不是免费功能,但这只是风格上的差异。功能上的区别是,如果底层请求出错,我的代码将发出错误,如果订阅者在请求完成之前全部退出,我的代码将取消请求。此外,它不进行 JPEG 转换。保持这些操作很小,并在这样的地图中进行 JPEG 转换:

    let imagesData = review.photos.toObservable().flatMap {
        self.imageMgr.requestMaximumSizeImage(for: $0)
    }.map {
        UIImageJPEGRepresentation($0, 1.0)
    }.filter { $0 != nil }.map { $0! }

上面的代码向管理器请求图像,然后将它们转换为 JPEG 数据,过滤掉任何转换失败的图像。imagesData是一种Observable<Data>类型。

getPostPhotoObservable很好,除了已完成的问题,以及它不处理一次性取消的事实。此外,您可以让您的 post 函数返回一个 Observable,而不是将结果包装在 ReviewPhotoObject 中。

其他警告:

  1. 您将它们放在一起的方式并不能确保ReviewPhotoObjects 与进入的照片的顺序相同(因为您无法保证上传完成的顺序。)如有必要,要解决此问题,您需要使用concat而不是flatMap.

  2. 如果任何上传失败,整个管道将断开连接并中止任何后续上传。您可能应该设置一些东西来捕捉错误并做一些适当的事情。要么catchErrorJustReturncatchError取决于您的要求。

于 2017-04-19T01:21:10.977 回答