Skip to content

Commit

Permalink
refactor: remove ELF based query interface in favour of a/a
Browse files Browse the repository at this point in the history
  • Loading branch information
lovetodream committed Sep 6, 2023
1 parent fe19618 commit 5de85da
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ struct RowStreamStateMachine {
}

case .waitingForRead, .waitingForDemand, .waitingForReadOrDemand:
preconditionFailure("""
How can we receive a body part, after a channelReadComplete, \
but no read has been forwarded yet. Invalid state: \(self.state)
""")
// This case might occur, if we triggered a fetch request to get
// more rows from `ExtendedQueryStateMachine`. We'll just return
// nil indicating that we have to wait for now.
return nil

case .failed:
// Once the row stream state machine is marked as failed, no further
Expand Down
85 changes: 0 additions & 85 deletions Sources/OracleNIO/Connection/OracleConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -313,91 +313,6 @@ extension OracleConnection {
}
}

// MARK: EventLoopFuture Interface

extension OracleConnection {

/// Run a query on the Oracle server the connection is connected to and collect all rows.
///
/// - Parameters:
/// - query: The ``OracleQuery`` to run.
/// - options: A bunch of parameters to optimize the query in different ways. Normally this can
/// be ignored, but feel free to experiment based on your needs. Every option and
/// its impact is documented.
/// - logger: The `Logger` to log into for the query.
/// - file: The file, the query was started in. Used for better error reporting.
/// - line: The line, the query was started in. Used for better error reporting.
/// - Returns: An `EventLoopFuture`, that allows access to the future
/// ``OracleQueryResult``.
public func query(
_ query: OracleQuery,
options: QueryOptions = .init(),
logger: Logger,
file: String = #fileID,
line: Int = #line
) -> EventLoopFuture<OracleQueryResult> {
self.queryStream(
query, options: options, logger: logger
).flatMap { rowStream in
rowStream.all().flatMapThrowing { rows in
let metadata = OracleQueryMetadata()
return OracleQueryResult(metadata: metadata, rows: rows)
}
}.enrichOracleError(query: query, file: file, line: line)
}

/// Run a query on the Oracle server the connection is connected to and iterate the rows in a callback.
///
/// - Note: This API does not support back-pressure. If you need back-pressure please use the
/// query API, that supports structured concurrency.
/// - Parameters:
/// - query: The ``OracleQuery`` to run.
/// - options: A bunch of parameters to optimize the query in different ways. Normally this can
/// be ignored, but feel free to experiment based on your needs. Every option and
/// its impact is documented.
/// - logger: The `Logger` to log into for the query.
/// - file: The file, the query was started in. Used for better error reporting.
/// - line: The line, the query was started in. Used for better error reporting.
/// - onRow: A closure that is invoked on every row.
/// - Returns: An EventLoopFuture, that allows access to the future
/// ``OracleQueryMetadata``.
public func query(
_ query: OracleQuery,
options: QueryOptions = .init(),
logger: Logger,
file: String = #fileID,
line: Int = #line,
_ onRow: @escaping (OracleRow) throws -> Void
) -> EventLoopFuture<OracleQueryMetadata> {
self.queryStream(
query, options: options, logger: logger
).flatMap { rowStream in
rowStream.onRow(onRow).flatMapThrowing { _ in
let metadata = OracleQueryMetadata()
return metadata
}
}.enrichOracleError(query: query, file: file, line: line)
}

}

extension EventLoopFuture {
func enrichOracleError(
query: OracleQuery, file: String, line: Int
) -> EventLoopFuture<Value> {
return self.flatMapErrorThrowing { error in
if var error = error as? OracleSQLError {
error.file = file
error.line = line
error.query = query
throw error
} else {
throw error
}
}
}
}

extension OracleConnection {
/// Returns the default `EventLoopGroup` singleton, automatically selecting the best for the
/// platform.
Expand Down
24 changes: 0 additions & 24 deletions Sources/OracleNIO/OracleQueryResult.swift

This file was deleted.

125 changes: 53 additions & 72 deletions Tests/OracleNIOTests/OracleNIOTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ final class OracleNIOTests: XCTestCase {

// MARK: Tests

func testConnectionAndClose() {
var conn: OracleConnection?
XCTAssertNoThrow(conn = try OracleConnection.test(on: eventLoop).wait())
XCTAssertNoThrow(try conn?.close().wait())
func testConnectionAndClose() async throws {
let conn = try await OracleConnection.test(on: eventLoop)
XCTAssertNoThrow(try conn.close().wait())
}

func testAuthenticationFailure() throws {
Expand All @@ -49,96 +48,78 @@ final class OracleNIOTests: XCTestCase {
XCTAssertNoThrow(try conn?.close().wait())
}

func testSimpleQuery() {
var conn: OracleConnection?
XCTAssertNoThrow(conn = try OracleConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow(try conn?.close().wait()) }
var rows: [OracleRow]?
XCTAssertNoThrow(
rows = try conn?.query(
"SELECT 'test' FROM dual", logger: .oracleTest
).wait().rows
)
XCTAssertEqual(rows?.count, 1)
XCTAssertEqual(try rows?.first?.decode(String.self), "test")
func testSimpleQuery() async throws {
let conn = try await OracleConnection.test(on: eventLoop)
defer { XCTAssertNoThrow(try conn.close().wait()) }
let rows = try await conn.query(
"SELECT 'test' FROM dual", logger: .oracleTest
).collect()
XCTAssertEqual(rows.count, 1)
XCTAssertEqual(try rows.first?.decode(String.self), "test")
}

func testSimpleDateQuery() {
var conn: OracleConnection?
XCTAssertNoThrow(conn = try OracleConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow(try conn?.close().wait()) }
var rows: [OracleRow]?
XCTAssertNoThrow(
rows = try conn?.query(
func testSimpleDateQuery() async throws {
let conn = try await OracleConnection.test(on: eventLoop)
defer { XCTAssertNoThrow(try conn.close().wait()) }
let rows = try await conn.query(
"SELECT systimestamp FROM dual", logger: .oracleTest
).wait().rows
)
XCTAssertEqual(rows?.count, 1)
).collect()
XCTAssertEqual(rows.count, 1)
var value: Date?
XCTAssertNoThrow(value = try rows?.first?.decode(Date.self))
XCTAssertNoThrow(value = try rows.first?.decode(Date.self))
XCTAssertNoThrow(try XCTUnwrap(value))
}

func testSimpleOptionalBinds() {
var conn: OracleConnection?
XCTAssertNoThrow(conn = try OracleConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow(try conn?.close().wait()) }
var rows: [OracleRow]?
XCTAssertNoThrow(
rows = try conn?.query(
"SELECT \(Optional("test")) FROM dual", logger: .oracleTest
).wait().rows
)
XCTAssertEqual(rows?.count, 1)
XCTAssertEqual(try rows?.first?.decode(String?.self), "test")
XCTAssertNoThrow(
rows = try conn?.query(
"SELECT \(String?.none) FROM dual", logger: .oracleTest
).wait().rows
)
XCTAssertEqual(rows?.count, 1)
XCTAssertEqual(try rows?.first?.decode(String?.self), nil)
func testSimpleOptionalBinds() async throws {
let conn = try await OracleConnection.test(on: eventLoop)
defer { XCTAssertNoThrow(try conn.close().wait()) }
var rows = try await conn.query(
"SELECT \(Optional("test")) FROM dual", logger: .oracleTest
).collect()
XCTAssertEqual(rows.count, 1)
XCTAssertEqual(try rows.first?.decode(String?.self), "test")
rows = try await conn.query(
"SELECT \(String?.none) FROM dual", logger: .oracleTest
).collect()
XCTAssertEqual(rows.count, 1)
XCTAssertEqual(try rows.first?.decode(String?.self), nil)
}

func testQuery10kItems() {
var conn: OracleConnection?
XCTAssertNoThrow(conn = try OracleConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow(try conn?.close().wait()) }
func testQuery10kItems() async throws {
let conn = try await OracleConnection.test(on: eventLoop)
defer { XCTAssertNoThrow(try conn.close().wait()) }

var received: Int64 = 0
XCTAssertNoThrow(_ = try conn?.query(
let rows = try await conn.query(
"SELECT to_number(column_value) AS id FROM xmltable ('1 to 10000')",
options: .init(arraySize: 1000),
logger: .oracleTest
) { row in
func workaround() {
var number: Int64?
XCTAssertNoThrow(
number = try row.decode(Int64.self, context: .default)
)
received += 1
XCTAssertEqual(number, received)
}

workaround()
}.wait())
)
var received: Int64 = 0
for try await row in rows {
var number: Int64?
XCTAssertNoThrow(
number = try row.decode(Int64.self, context: .default)
)
received += 1
XCTAssertEqual(number, received)
}

XCTAssertEqual(received, 10_000)
}

func testFloatingPointNumbers() {
var conn: OracleConnection?
XCTAssertNoThrow(conn = try OracleConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow(try conn?.close().wait()) }
func testFloatingPointNumbers() async throws {
let conn = try await OracleConnection.test(on: eventLoop)
defer { XCTAssertNoThrow(try conn.close().wait()) }

var received: Int64 = 0
XCTAssertNoThrow(_ = try conn?.query(
let rows = try await conn.query(
"""
SELECT to_number(column_value) / 100 AS id
FROM xmltable ('1 to 100')
""",
logger: .oracleTest
) { row in
)
for try await row in rows {
func workaround() {
var number: Float?
XCTAssertNoThrow(number = try row.decode(
Expand All @@ -149,13 +130,13 @@ final class OracleNIOTests: XCTestCase {
}

workaround()
}.wait())
}

XCTAssertEqual(received, 100)
}

func testDuplicateColumn() async throws {
let connection = try await OracleConnection.test(on: eventLoop).get()
let connection = try await OracleConnection.test(on: eventLoop)
defer { XCTAssertNoThrow(try connection.close().wait()) }
do {
try await connection.query(
Expand Down
26 changes: 11 additions & 15 deletions Tests/OracleNIOTests/Utility.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,20 @@ extension OracleConnection {
static func test(
on eventLoop: EventLoop,
logLevel: Logger.Level = Logger.getLogLevel()
) -> EventLoopFuture<OracleConnection> {
) async throws -> OracleConnection {
var logger = Logger(label: "oracle.connection.test")
logger.logLevel = logLevel

do {
let config = OracleConnection.Configuration(
address: try Self.address(),
serviceName: env("ORA_SERVICE_NAME") ?? "XEPDB1",
username: env("ORA_USERNAME") ?? "my_user",
password: env("ORA_PASSWORD") ?? "my_passwor"
)

return OracleConnection.connect(
on: eventLoop, configuration: config, id: 0, logger: logger
)
} catch {
return eventLoop.makeFailedFuture(error)
}
let config = OracleConnection.Configuration(
address: try Self.address(),
serviceName: env("ORA_SERVICE_NAME") ?? "XEPDB1",
username: env("ORA_USERNAME") ?? "my_user",
password: env("ORA_PASSWORD") ?? "my_passwor"
)

return try await OracleConnection.connect(
on: eventLoop, configuration: config, id: 0, logger: logger
)
}
}

Expand Down

0 comments on commit 5de85da

Please sign in to comment.