Concurrency.swift 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. //
  2. // Concurrency.swift
  3. //
  4. // Copyright (c) 2021 Alamofire Software Foundation (http://alamofire.org/)
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. //
  24. #if compiler(>=5.6.0) && canImport(_Concurrency)
  25. import Foundation
  26. // MARK: - Request Event Streams
  27. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  28. extension Request {
  29. /// Creates a `StreamOf<Progress>` for the instance's upload progress.
  30. ///
  31. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  32. ///
  33. /// - Returns: The `StreamOf<Progress>`.
  34. public func uploadProgress(bufferingPolicy: StreamOf<Progress>.BufferingPolicy = .unbounded) -> StreamOf<Progress> {
  35. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  36. uploadProgress(queue: underlyingQueue) { progress in
  37. continuation.yield(progress)
  38. }
  39. }
  40. }
  41. /// Creates a `StreamOf<Progress>` for the instance's download progress.
  42. ///
  43. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  44. ///
  45. /// - Returns: The `StreamOf<Progress>`.
  46. public func downloadProgress(bufferingPolicy: StreamOf<Progress>.BufferingPolicy = .unbounded) -> StreamOf<Progress> {
  47. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  48. downloadProgress(queue: underlyingQueue) { progress in
  49. continuation.yield(progress)
  50. }
  51. }
  52. }
  53. /// Creates a `StreamOf<URLRequest>` for the `URLRequest`s produced for the instance.
  54. ///
  55. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  56. ///
  57. /// - Returns: The `StreamOf<URLRequest>`.
  58. public func urlRequests(bufferingPolicy: StreamOf<URLRequest>.BufferingPolicy = .unbounded) -> StreamOf<URLRequest> {
  59. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  60. onURLRequestCreation(on: underlyingQueue) { request in
  61. continuation.yield(request)
  62. }
  63. }
  64. }
  65. /// Creates a `StreamOf<URLSessionTask>` for the `URLSessionTask`s produced for the instance.
  66. ///
  67. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  68. ///
  69. /// - Returns: The `StreamOf<URLSessionTask>`.
  70. public func urlSessionTasks(bufferingPolicy: StreamOf<URLSessionTask>.BufferingPolicy = .unbounded) -> StreamOf<URLSessionTask> {
  71. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  72. onURLSessionTaskCreation(on: underlyingQueue) { task in
  73. continuation.yield(task)
  74. }
  75. }
  76. }
  77. /// Creates a `StreamOf<String>` for the cURL descriptions produced for the instance.
  78. ///
  79. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  80. ///
  81. /// - Returns: The `StreamOf<String>`.
  82. public func cURLDescriptions(bufferingPolicy: StreamOf<String>.BufferingPolicy = .unbounded) -> StreamOf<String> {
  83. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  84. cURLDescription(on: underlyingQueue) { description in
  85. continuation.yield(description)
  86. }
  87. }
  88. }
  89. fileprivate func stream<T>(of type: T.Type = T.self,
  90. bufferingPolicy: StreamOf<T>.BufferingPolicy = .unbounded,
  91. yielder: @escaping (StreamOf<T>.Continuation) -> Void) -> StreamOf<T> {
  92. StreamOf<T>(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  93. yielder(continuation)
  94. // Must come after serializers run in order to catch retry progress.
  95. onFinish {
  96. continuation.finish()
  97. }
  98. }
  99. }
  100. }
  101. // MARK: - DataTask
  102. /// Value used to `await` a `DataResponse` and associated values.
  103. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  104. public struct DataTask<Value> {
  105. /// `DataResponse` produced by the `DataRequest` and its response handler.
  106. public var response: DataResponse<Value, AFError> {
  107. get async {
  108. if shouldAutomaticallyCancel {
  109. return await withTaskCancellationHandler {
  110. await task.value
  111. } onCancel: {
  112. cancel()
  113. }
  114. } else {
  115. return await task.value
  116. }
  117. }
  118. }
  119. /// `Result` of any response serialization performed for the `response`.
  120. public var result: Result<Value, AFError> {
  121. get async { await response.result }
  122. }
  123. /// `Value` returned by the `response`.
  124. public var value: Value {
  125. get async throws {
  126. try await result.get()
  127. }
  128. }
  129. private let request: DataRequest
  130. private let task: Task<DataResponse<Value, AFError>, Never>
  131. private let shouldAutomaticallyCancel: Bool
  132. fileprivate init(request: DataRequest, task: Task<DataResponse<Value, AFError>, Never>, shouldAutomaticallyCancel: Bool) {
  133. self.request = request
  134. self.task = task
  135. self.shouldAutomaticallyCancel = shouldAutomaticallyCancel
  136. }
  137. /// Cancel the underlying `DataRequest` and `Task`.
  138. public func cancel() {
  139. task.cancel()
  140. }
  141. /// Resume the underlying `DataRequest`.
  142. public func resume() {
  143. request.resume()
  144. }
  145. /// Suspend the underlying `DataRequest`.
  146. public func suspend() {
  147. request.suspend()
  148. }
  149. }
  150. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  151. extension DataRequest {
  152. /// Creates a `StreamOf<HTTPURLResponse>` for the instance's responses.
  153. ///
  154. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  155. ///
  156. /// - Returns: The `StreamOf<HTTPURLResponse>`.
  157. public func httpResponses(bufferingPolicy: StreamOf<HTTPURLResponse>.BufferingPolicy = .unbounded) -> StreamOf<HTTPURLResponse> {
  158. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  159. onHTTPResponse(on: underlyingQueue) { response in
  160. continuation.yield(response)
  161. }
  162. }
  163. }
  164. #if swift(>=5.7)
  165. /// Sets an async closure returning a `Request.ResponseDisposition`, called whenever the `DataRequest` produces an
  166. /// `HTTPURLResponse`.
  167. ///
  168. /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
  169. /// However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
  170. /// where responses after the first will contain the part headers.
  171. ///
  172. /// - Parameters:
  173. /// - handler: Async closure executed when a new `HTTPURLResponse` is received and returning a
  174. /// `ResponseDisposition` value. This value determines whether to continue the request or cancel it as
  175. /// if `cancel()` had been called on the instance. Note, this closure is called on an arbitrary thread,
  176. /// so any synchronous calls in it will execute in that context.
  177. ///
  178. /// - Returns: The instance.
  179. @_disfavoredOverload
  180. @discardableResult
  181. public func onHTTPResponse(
  182. perform handler: @escaping @Sendable (_ response: HTTPURLResponse) async -> ResponseDisposition
  183. ) -> Self {
  184. onHTTPResponse(on: underlyingQueue) { response, completionHandler in
  185. Task {
  186. let disposition = await handler(response)
  187. completionHandler(disposition)
  188. }
  189. }
  190. return self
  191. }
  192. /// Sets an async closure called whenever the `DataRequest` produces an `HTTPURLResponse`.
  193. ///
  194. /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
  195. /// However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
  196. /// where responses after the first will contain the part headers.
  197. ///
  198. /// - Parameters:
  199. /// - handler: Async closure executed when a new `HTTPURLResponse` is received. Note, this closure is called on an
  200. /// arbitrary thread, so any synchronous calls in it will execute in that context.
  201. ///
  202. /// - Returns: The instance.
  203. @discardableResult
  204. public func onHTTPResponse(perform handler: @escaping @Sendable (_ response: HTTPURLResponse) async -> Void) -> Self {
  205. onHTTPResponse { response in
  206. await handler(response)
  207. return .allow
  208. }
  209. return self
  210. }
  211. #endif
  212. /// Creates a `DataTask` to `await` a `Data` value.
  213. ///
  214. /// - Parameters:
  215. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  216. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  217. /// properties. `true` by default.
  218. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before completion.
  219. /// - emptyResponseCodes: HTTP response codes for which empty responses are allowed. `[204, 205]` by default.
  220. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  221. ///
  222. /// - Returns: The `DataTask`.
  223. public func serializingData(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  224. dataPreprocessor: DataPreprocessor = DataResponseSerializer.defaultDataPreprocessor,
  225. emptyResponseCodes: Set<Int> = DataResponseSerializer.defaultEmptyResponseCodes,
  226. emptyRequestMethods: Set<HTTPMethod> = DataResponseSerializer.defaultEmptyRequestMethods) -> DataTask<Data> {
  227. serializingResponse(using: DataResponseSerializer(dataPreprocessor: dataPreprocessor,
  228. emptyResponseCodes: emptyResponseCodes,
  229. emptyRequestMethods: emptyRequestMethods),
  230. automaticallyCancelling: shouldAutomaticallyCancel)
  231. }
  232. /// Creates a `DataTask` to `await` serialization of a `Decodable` value.
  233. ///
  234. /// - Parameters:
  235. /// - type: `Decodable` type to decode from response data.
  236. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  237. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  238. /// properties. `true` by default.
  239. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the serializer.
  240. /// `PassthroughPreprocessor()` by default.
  241. /// - decoder: `DataDecoder` to use to decode the response. `JSONDecoder()` by default.
  242. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  243. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  244. ///
  245. /// - Returns: The `DataTask`.
  246. public func serializingDecodable<Value: Decodable>(_ type: Value.Type = Value.self,
  247. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  248. dataPreprocessor: DataPreprocessor = DecodableResponseSerializer<Value>.defaultDataPreprocessor,
  249. decoder: DataDecoder = JSONDecoder(),
  250. emptyResponseCodes: Set<Int> = DecodableResponseSerializer<Value>.defaultEmptyResponseCodes,
  251. emptyRequestMethods: Set<HTTPMethod> = DecodableResponseSerializer<Value>.defaultEmptyRequestMethods) -> DataTask<Value> {
  252. serializingResponse(using: DecodableResponseSerializer<Value>(dataPreprocessor: dataPreprocessor,
  253. decoder: decoder,
  254. emptyResponseCodes: emptyResponseCodes,
  255. emptyRequestMethods: emptyRequestMethods),
  256. automaticallyCancelling: shouldAutomaticallyCancel)
  257. }
  258. /// Creates a `DataTask` to `await` serialization of a `String` value.
  259. ///
  260. /// - Parameters:
  261. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  262. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  263. /// properties. `true` by default.
  264. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the serializer.
  265. /// `PassthroughPreprocessor()` by default.
  266. /// - encoding: `String.Encoding` to use during serialization. Defaults to `nil`, in which case
  267. /// the encoding will be determined from the server response, falling back to the
  268. /// default HTTP character set, `ISO-8859-1`.
  269. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  270. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  271. ///
  272. /// - Returns: The `DataTask`.
  273. public func serializingString(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  274. dataPreprocessor: DataPreprocessor = StringResponseSerializer.defaultDataPreprocessor,
  275. encoding: String.Encoding? = nil,
  276. emptyResponseCodes: Set<Int> = StringResponseSerializer.defaultEmptyResponseCodes,
  277. emptyRequestMethods: Set<HTTPMethod> = StringResponseSerializer.defaultEmptyRequestMethods) -> DataTask<String> {
  278. serializingResponse(using: StringResponseSerializer(dataPreprocessor: dataPreprocessor,
  279. encoding: encoding,
  280. emptyResponseCodes: emptyResponseCodes,
  281. emptyRequestMethods: emptyRequestMethods),
  282. automaticallyCancelling: shouldAutomaticallyCancel)
  283. }
  284. /// Creates a `DataTask` to `await` serialization using the provided `ResponseSerializer` instance.
  285. ///
  286. /// - Parameters:
  287. /// - serializer: `ResponseSerializer` responsible for serializing the request, response, and data.
  288. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  289. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  290. /// properties. `true` by default.
  291. ///
  292. /// - Returns: The `DataTask`.
  293. public func serializingResponse<Serializer: ResponseSerializer>(using serializer: Serializer,
  294. automaticallyCancelling shouldAutomaticallyCancel: Bool = true)
  295. -> DataTask<Serializer.SerializedObject> {
  296. dataTask(automaticallyCancelling: shouldAutomaticallyCancel) { [self] in
  297. response(queue: underlyingQueue,
  298. responseSerializer: serializer,
  299. completionHandler: $0)
  300. }
  301. }
  302. /// Creates a `DataTask` to `await` serialization using the provided `DataResponseSerializerProtocol` instance.
  303. ///
  304. /// - Parameters:
  305. /// - serializer: `DataResponseSerializerProtocol` responsible for serializing the request,
  306. /// response, and data.
  307. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  308. /// enclosing async context is cancelled. Only applies to `DataTask`'s async
  309. /// properties. `true` by default.
  310. ///
  311. /// - Returns: The `DataTask`.
  312. public func serializingResponse<Serializer: DataResponseSerializerProtocol>(using serializer: Serializer,
  313. automaticallyCancelling shouldAutomaticallyCancel: Bool = true)
  314. -> DataTask<Serializer.SerializedObject> {
  315. dataTask(automaticallyCancelling: shouldAutomaticallyCancel) { [self] in
  316. response(queue: underlyingQueue,
  317. responseSerializer: serializer,
  318. completionHandler: $0)
  319. }
  320. }
  321. private func dataTask<Value>(automaticallyCancelling shouldAutomaticallyCancel: Bool,
  322. forResponse onResponse: @escaping (@escaping (DataResponse<Value, AFError>) -> Void) -> Void)
  323. -> DataTask<Value> {
  324. let task = Task {
  325. await withTaskCancellationHandler {
  326. await withCheckedContinuation { continuation in
  327. onResponse {
  328. continuation.resume(returning: $0)
  329. }
  330. }
  331. } onCancel: {
  332. self.cancel()
  333. }
  334. }
  335. return DataTask<Value>(request: self, task: task, shouldAutomaticallyCancel: shouldAutomaticallyCancel)
  336. }
  337. }
  338. // MARK: - DownloadTask
  339. /// Value used to `await` a `DownloadResponse` and associated values.
  340. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  341. public struct DownloadTask<Value> {
  342. /// `DownloadResponse` produced by the `DownloadRequest` and its response handler.
  343. public var response: DownloadResponse<Value, AFError> {
  344. get async {
  345. if shouldAutomaticallyCancel {
  346. return await withTaskCancellationHandler {
  347. await task.value
  348. } onCancel: {
  349. cancel()
  350. }
  351. } else {
  352. return await task.value
  353. }
  354. }
  355. }
  356. /// `Result` of any response serialization performed for the `response`.
  357. public var result: Result<Value, AFError> {
  358. get async { await response.result }
  359. }
  360. /// `Value` returned by the `response`.
  361. public var value: Value {
  362. get async throws {
  363. try await result.get()
  364. }
  365. }
  366. private let task: Task<AFDownloadResponse<Value>, Never>
  367. private let request: DownloadRequest
  368. private let shouldAutomaticallyCancel: Bool
  369. fileprivate init(request: DownloadRequest, task: Task<AFDownloadResponse<Value>, Never>, shouldAutomaticallyCancel: Bool) {
  370. self.request = request
  371. self.task = task
  372. self.shouldAutomaticallyCancel = shouldAutomaticallyCancel
  373. }
  374. /// Cancel the underlying `DownloadRequest` and `Task`.
  375. public func cancel() {
  376. task.cancel()
  377. }
  378. /// Resume the underlying `DownloadRequest`.
  379. public func resume() {
  380. request.resume()
  381. }
  382. /// Suspend the underlying `DownloadRequest`.
  383. public func suspend() {
  384. request.suspend()
  385. }
  386. }
  387. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  388. extension DownloadRequest {
  389. /// Creates a `DownloadTask` to `await` a `Data` value.
  390. ///
  391. /// - Parameters:
  392. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  393. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  394. /// properties. `true` by default.
  395. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before completion.
  396. /// - emptyResponseCodes: HTTP response codes for which empty responses are allowed. `[204, 205]` by default.
  397. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  398. ///
  399. /// - Returns: The `DownloadTask`.
  400. public func serializingData(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  401. dataPreprocessor: DataPreprocessor = DataResponseSerializer.defaultDataPreprocessor,
  402. emptyResponseCodes: Set<Int> = DataResponseSerializer.defaultEmptyResponseCodes,
  403. emptyRequestMethods: Set<HTTPMethod> = DataResponseSerializer.defaultEmptyRequestMethods) -> DownloadTask<Data> {
  404. serializingDownload(using: DataResponseSerializer(dataPreprocessor: dataPreprocessor,
  405. emptyResponseCodes: emptyResponseCodes,
  406. emptyRequestMethods: emptyRequestMethods),
  407. automaticallyCancelling: shouldAutomaticallyCancel)
  408. }
  409. /// Creates a `DownloadTask` to `await` serialization of a `Decodable` value.
  410. ///
  411. /// - Note: This serializer reads the entire response into memory before parsing.
  412. ///
  413. /// - Parameters:
  414. /// - type: `Decodable` type to decode from response data.
  415. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  416. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  417. /// properties. `true` by default.
  418. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the serializer.
  419. /// `PassthroughPreprocessor()` by default.
  420. /// - decoder: `DataDecoder` to use to decode the response. `JSONDecoder()` by default.
  421. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  422. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  423. ///
  424. /// - Returns: The `DownloadTask`.
  425. public func serializingDecodable<Value: Decodable>(_ type: Value.Type = Value.self,
  426. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  427. dataPreprocessor: DataPreprocessor = DecodableResponseSerializer<Value>.defaultDataPreprocessor,
  428. decoder: DataDecoder = JSONDecoder(),
  429. emptyResponseCodes: Set<Int> = DecodableResponseSerializer<Value>.defaultEmptyResponseCodes,
  430. emptyRequestMethods: Set<HTTPMethod> = DecodableResponseSerializer<Value>.defaultEmptyRequestMethods) -> DownloadTask<Value> {
  431. serializingDownload(using: DecodableResponseSerializer<Value>(dataPreprocessor: dataPreprocessor,
  432. decoder: decoder,
  433. emptyResponseCodes: emptyResponseCodes,
  434. emptyRequestMethods: emptyRequestMethods),
  435. automaticallyCancelling: shouldAutomaticallyCancel)
  436. }
  437. /// Creates a `DownloadTask` to `await` serialization of the downloaded file's `URL` on disk.
  438. ///
  439. /// - Parameters:
  440. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  441. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  442. /// properties. `true` by default.
  443. ///
  444. /// - Returns: The `DownloadTask`.
  445. public func serializingDownloadedFileURL(automaticallyCancelling shouldAutomaticallyCancel: Bool = true) -> DownloadTask<URL> {
  446. serializingDownload(using: URLResponseSerializer(),
  447. automaticallyCancelling: shouldAutomaticallyCancel)
  448. }
  449. /// Creates a `DownloadTask` to `await` serialization of a `String` value.
  450. ///
  451. /// - Parameters:
  452. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  453. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  454. /// properties. `true` by default.
  455. /// - dataPreprocessor: `DataPreprocessor` which processes the received `Data` before calling the
  456. /// serializer. `PassthroughPreprocessor()` by default.
  457. /// - encoding: `String.Encoding` to use during serialization. Defaults to `nil`, in which case
  458. /// the encoding will be determined from the server response, falling back to the
  459. /// default HTTP character set, `ISO-8859-1`.
  460. /// - emptyResponseCodes: HTTP status codes for which empty responses are always valid. `[204, 205]` by default.
  461. /// - emptyRequestMethods: `HTTPMethod`s for which empty responses are always valid. `[.head]` by default.
  462. ///
  463. /// - Returns: The `DownloadTask`.
  464. public func serializingString(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  465. dataPreprocessor: DataPreprocessor = StringResponseSerializer.defaultDataPreprocessor,
  466. encoding: String.Encoding? = nil,
  467. emptyResponseCodes: Set<Int> = StringResponseSerializer.defaultEmptyResponseCodes,
  468. emptyRequestMethods: Set<HTTPMethod> = StringResponseSerializer.defaultEmptyRequestMethods) -> DownloadTask<String> {
  469. serializingDownload(using: StringResponseSerializer(dataPreprocessor: dataPreprocessor,
  470. encoding: encoding,
  471. emptyResponseCodes: emptyResponseCodes,
  472. emptyRequestMethods: emptyRequestMethods),
  473. automaticallyCancelling: shouldAutomaticallyCancel)
  474. }
  475. /// Creates a `DownloadTask` to `await` serialization using the provided `ResponseSerializer` instance.
  476. ///
  477. /// - Parameters:
  478. /// - serializer: `ResponseSerializer` responsible for serializing the request, response, and data.
  479. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  480. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  481. /// properties. `true` by default.
  482. ///
  483. /// - Returns: The `DownloadTask`.
  484. public func serializingDownload<Serializer: ResponseSerializer>(using serializer: Serializer,
  485. automaticallyCancelling shouldAutomaticallyCancel: Bool = true)
  486. -> DownloadTask<Serializer.SerializedObject> {
  487. downloadTask(automaticallyCancelling: shouldAutomaticallyCancel) { [self] in
  488. response(queue: underlyingQueue,
  489. responseSerializer: serializer,
  490. completionHandler: $0)
  491. }
  492. }
  493. /// Creates a `DownloadTask` to `await` serialization using the provided `DownloadResponseSerializerProtocol`
  494. /// instance.
  495. ///
  496. /// - Parameters:
  497. /// - serializer: `DownloadResponseSerializerProtocol` responsible for serializing the request,
  498. /// response, and data.
  499. /// - shouldAutomaticallyCancel: `Bool` determining whether or not the request should be cancelled when the
  500. /// enclosing async context is cancelled. Only applies to `DownloadTask`'s async
  501. /// properties. `true` by default.
  502. ///
  503. /// - Returns: The `DownloadTask`.
  504. public func serializingDownload<Serializer: DownloadResponseSerializerProtocol>(using serializer: Serializer,
  505. automaticallyCancelling shouldAutomaticallyCancel: Bool = true)
  506. -> DownloadTask<Serializer.SerializedObject> {
  507. downloadTask(automaticallyCancelling: shouldAutomaticallyCancel) { [self] in
  508. response(queue: underlyingQueue,
  509. responseSerializer: serializer,
  510. completionHandler: $0)
  511. }
  512. }
  513. private func downloadTask<Value>(automaticallyCancelling shouldAutomaticallyCancel: Bool,
  514. forResponse onResponse: @escaping (@escaping (DownloadResponse<Value, AFError>) -> Void) -> Void)
  515. -> DownloadTask<Value> {
  516. let task = Task {
  517. await withTaskCancellationHandler {
  518. await withCheckedContinuation { continuation in
  519. onResponse {
  520. continuation.resume(returning: $0)
  521. }
  522. }
  523. } onCancel: {
  524. self.cancel()
  525. }
  526. }
  527. return DownloadTask<Value>(request: self, task: task, shouldAutomaticallyCancel: shouldAutomaticallyCancel)
  528. }
  529. }
  530. // MARK: - DataStreamTask
  531. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  532. public struct DataStreamTask {
  533. // Type of created streams.
  534. public typealias Stream<Success, Failure: Error> = StreamOf<DataStreamRequest.Stream<Success, Failure>>
  535. private let request: DataStreamRequest
  536. fileprivate init(request: DataStreamRequest) {
  537. self.request = request
  538. }
  539. /// Creates a `Stream` of `Data` values from the underlying `DataStreamRequest`.
  540. ///
  541. /// - Parameters:
  542. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  543. /// which observation of the stream stops. `true` by default.
  544. /// - bufferingPolicy: ` BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  545. ///
  546. /// - Returns: The `Stream`.
  547. public func streamingData(automaticallyCancelling shouldAutomaticallyCancel: Bool = true, bufferingPolicy: Stream<Data, Never>.BufferingPolicy = .unbounded) -> Stream<Data, Never> {
  548. createStream(automaticallyCancelling: shouldAutomaticallyCancel, bufferingPolicy: bufferingPolicy) { onStream in
  549. request.responseStream(on: .streamCompletionQueue(forRequestID: request.id), stream: onStream)
  550. }
  551. }
  552. /// Creates a `Stream` of `UTF-8` `String`s from the underlying `DataStreamRequest`.
  553. ///
  554. /// - Parameters:
  555. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  556. /// which observation of the stream stops. `true` by default.
  557. /// - bufferingPolicy: ` BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  558. /// - Returns:
  559. public func streamingStrings(automaticallyCancelling shouldAutomaticallyCancel: Bool = true, bufferingPolicy: Stream<String, Never>.BufferingPolicy = .unbounded) -> Stream<String, Never> {
  560. createStream(automaticallyCancelling: shouldAutomaticallyCancel, bufferingPolicy: bufferingPolicy) { onStream in
  561. request.responseStreamString(on: .streamCompletionQueue(forRequestID: request.id), stream: onStream)
  562. }
  563. }
  564. /// Creates a `Stream` of `Decodable` values from the underlying `DataStreamRequest`.
  565. ///
  566. /// - Parameters:
  567. /// - type: `Decodable` type to be serialized from stream payloads.
  568. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  569. /// which observation of the stream stops. `true` by default.
  570. /// - bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  571. ///
  572. /// - Returns: The `Stream`.
  573. public func streamingDecodables<T>(_ type: T.Type = T.self,
  574. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  575. bufferingPolicy: Stream<T, AFError>.BufferingPolicy = .unbounded)
  576. -> Stream<T, AFError> where T: Decodable {
  577. streamingResponses(serializedUsing: DecodableStreamSerializer<T>(),
  578. automaticallyCancelling: shouldAutomaticallyCancel,
  579. bufferingPolicy: bufferingPolicy)
  580. }
  581. /// Creates a `Stream` of values using the provided `DataStreamSerializer` from the underlying `DataStreamRequest`.
  582. ///
  583. /// - Parameters:
  584. /// - serializer: `DataStreamSerializer` to use to serialize incoming `Data`.
  585. /// - shouldAutomaticallyCancel: `Bool` indicating whether the underlying `DataStreamRequest` should be canceled
  586. /// which observation of the stream stops. `true` by default.
  587. /// - bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  588. ///
  589. /// - Returns: The `Stream`.
  590. public func streamingResponses<Serializer: DataStreamSerializer>(serializedUsing serializer: Serializer,
  591. automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  592. bufferingPolicy: Stream<Serializer.SerializedObject, AFError>.BufferingPolicy = .unbounded)
  593. -> Stream<Serializer.SerializedObject, AFError> {
  594. createStream(automaticallyCancelling: shouldAutomaticallyCancel, bufferingPolicy: bufferingPolicy) { onStream in
  595. request.responseStream(using: serializer,
  596. on: .streamCompletionQueue(forRequestID: request.id),
  597. stream: onStream)
  598. }
  599. }
  600. private func createStream<Success, Failure: Error>(automaticallyCancelling shouldAutomaticallyCancel: Bool = true,
  601. bufferingPolicy: Stream<Success, Failure>.BufferingPolicy = .unbounded,
  602. forResponse onResponse: @escaping (@escaping (DataStreamRequest.Stream<Success, Failure>) -> Void) -> Void)
  603. -> Stream<Success, Failure> {
  604. StreamOf(bufferingPolicy: bufferingPolicy) {
  605. guard shouldAutomaticallyCancel,
  606. request.isInitialized || request.isResumed || request.isSuspended else { return }
  607. cancel()
  608. } builder: { continuation in
  609. onResponse { stream in
  610. continuation.yield(stream)
  611. if case .complete = stream.event {
  612. continuation.finish()
  613. }
  614. }
  615. }
  616. }
  617. /// Cancel the underlying `DataStreamRequest`.
  618. public func cancel() {
  619. request.cancel()
  620. }
  621. /// Resume the underlying `DataStreamRequest`.
  622. public func resume() {
  623. request.resume()
  624. }
  625. /// Suspend the underlying `DataStreamRequest`.
  626. public func suspend() {
  627. request.suspend()
  628. }
  629. }
  630. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  631. extension DataStreamRequest {
  632. /// Creates a `StreamOf<HTTPURLResponse>` for the instance's responses.
  633. ///
  634. /// - Parameter bufferingPolicy: `BufferingPolicy` that determines the stream's buffering behavior.`.unbounded` by default.
  635. ///
  636. /// - Returns: The `StreamOf<HTTPURLResponse>`.
  637. public func httpResponses(bufferingPolicy: StreamOf<HTTPURLResponse>.BufferingPolicy = .unbounded) -> StreamOf<HTTPURLResponse> {
  638. stream(bufferingPolicy: bufferingPolicy) { [unowned self] continuation in
  639. onHTTPResponse(on: underlyingQueue) { response in
  640. continuation.yield(response)
  641. }
  642. }
  643. }
  644. #if swift(>=5.7)
  645. /// Sets an async closure returning a `Request.ResponseDisposition`, called whenever the `DataStreamRequest`
  646. /// produces an `HTTPURLResponse`.
  647. ///
  648. /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
  649. /// However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
  650. /// where responses after the first will contain the part headers.
  651. ///
  652. /// - Parameters:
  653. /// - handler: Async closure executed when a new `HTTPURLResponse` is received and returning a
  654. /// `ResponseDisposition` value. This value determines whether to continue the request or cancel it as
  655. /// if `cancel()` had been called on the instance. Note, this closure is called on an arbitrary thread,
  656. /// so any synchronous calls in it will execute in that context.
  657. ///
  658. /// - Returns: The instance.
  659. @_disfavoredOverload
  660. @discardableResult
  661. public func onHTTPResponse(perform handler: @escaping @Sendable (HTTPURLResponse) async -> ResponseDisposition) -> Self {
  662. onHTTPResponse(on: underlyingQueue) { response, completionHandler in
  663. Task {
  664. let disposition = await handler(response)
  665. completionHandler(disposition)
  666. }
  667. }
  668. return self
  669. }
  670. /// Sets an async closure called whenever the `DataStreamRequest` produces an `HTTPURLResponse`.
  671. ///
  672. /// - Note: Most requests will only produce a single response for each outgoing attempt (initial + retries).
  673. /// However, some types of response may trigger multiple `HTTPURLResponse`s, such as multipart streams,
  674. /// where responses after the first will contain the part headers.
  675. ///
  676. /// - Parameters:
  677. /// - handler: Async closure executed when a new `HTTPURLResponse` is received. Note, this closure is called on an
  678. /// arbitrary thread, so any synchronous calls in it will execute in that context.
  679. ///
  680. /// - Returns: The instance.
  681. @discardableResult
  682. public func onHTTPResponse(perform handler: @escaping @Sendable (HTTPURLResponse) async -> Void) -> Self {
  683. onHTTPResponse { response in
  684. await handler(response)
  685. return .allow
  686. }
  687. return self
  688. }
  689. #endif
  690. /// Creates a `DataStreamTask` used to `await` streams of serialized values.
  691. ///
  692. /// - Returns: The `DataStreamTask`.
  693. public func streamTask() -> DataStreamTask {
  694. DataStreamTask(request: self)
  695. }
  696. }
  697. extension DispatchQueue {
  698. fileprivate static let singleEventQueue = DispatchQueue(label: "org.alamofire.concurrencySingleEventQueue",
  699. attributes: .concurrent)
  700. fileprivate static func streamCompletionQueue(forRequestID id: UUID) -> DispatchQueue {
  701. DispatchQueue(label: "org.alamofire.concurrencyStreamCompletionQueue-\(id)", target: .singleEventQueue)
  702. }
  703. }
  704. /// An asynchronous sequence generated from an underlying `AsyncStream`. Only produced by Alamofire.
  705. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
  706. public struct StreamOf<Element>: AsyncSequence {
  707. public typealias AsyncIterator = Iterator
  708. public typealias BufferingPolicy = AsyncStream<Element>.Continuation.BufferingPolicy
  709. fileprivate typealias Continuation = AsyncStream<Element>.Continuation
  710. private let bufferingPolicy: BufferingPolicy
  711. private let onTermination: (() -> Void)?
  712. private let builder: (Continuation) -> Void
  713. fileprivate init(bufferingPolicy: BufferingPolicy = .unbounded,
  714. onTermination: (() -> Void)? = nil,
  715. builder: @escaping (Continuation) -> Void) {
  716. self.bufferingPolicy = bufferingPolicy
  717. self.onTermination = onTermination
  718. self.builder = builder
  719. }
  720. public func makeAsyncIterator() -> Iterator {
  721. var continuation: AsyncStream<Element>.Continuation?
  722. let stream = AsyncStream<Element>(bufferingPolicy: bufferingPolicy) { innerContinuation in
  723. continuation = innerContinuation
  724. builder(innerContinuation)
  725. }
  726. return Iterator(iterator: stream.makeAsyncIterator()) {
  727. continuation?.finish()
  728. onTermination?()
  729. }
  730. }
  731. public struct Iterator: AsyncIteratorProtocol {
  732. private final class Token {
  733. private let onDeinit: () -> Void
  734. init(onDeinit: @escaping () -> Void) {
  735. self.onDeinit = onDeinit
  736. }
  737. deinit {
  738. onDeinit()
  739. }
  740. }
  741. private var iterator: AsyncStream<Element>.AsyncIterator
  742. private let token: Token
  743. init(iterator: AsyncStream<Element>.AsyncIterator, onCancellation: @escaping () -> Void) {
  744. self.iterator = iterator
  745. token = Token(onDeinit: onCancellation)
  746. }
  747. public mutating func next() async -> Element? {
  748. await iterator.next()
  749. }
  750. }
  751. }
  752. #endif