Skip to content

Commit 22f8096

Browse files
authored
Fix more bugs (#46)
- Fix race condition when subscribing to a database change publisher - Unregister file presenter on deinit - Return true from SQLiteError.isDatabaseClosed if the database is suspended
1 parent bc933f7 commit 22f8096

File tree

5 files changed

+163
-61
lines changed

5 files changed

+163
-61
lines changed

Sources/SQLite/CrossProcessChangeNotifier.swift

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ final class CrossProcessChangeNotifier: NSObject, @unchecked Sendable {
88
private let changeTrackerURL: URL?
99
private let isStarted = Locked<Bool>(false)
1010

11-
private let localChange = PassthroughSubject<Void, Never>()
1211
private var localChangeSubscription: AnyCancellable?
1312

1413
private let remoteChange = PassthroughSubject<Void, Never>()
@@ -25,6 +24,7 @@ final class CrossProcessChangeNotifier: NSObject, @unchecked Sendable {
2524

2625
init(
2726
databasePath: String,
27+
databaseChangePublisher: AnyPublisher<Void, Error>,
2828
onRemoteChange: @Sendable @escaping () -> Void
2929
) {
3030
changeTrackerURL = Self.changeTrackerURL(
@@ -33,30 +33,23 @@ final class CrossProcessChangeNotifier: NSObject, @unchecked Sendable {
3333

3434
super.init()
3535

36-
localChangeSubscription = localChange.throttle(
37-
for: .seconds(1),
38-
scheduler: RunLoop.main,
39-
latest: true
40-
)
41-
.receive(on: queue)
42-
.sink { [weak self] _ in
43-
guard let self, let url = changeTrackerURL else { return }
44-
let coordinator = NSFileCoordinator(filePresenter: self)
45-
coordinator.coordinate(
46-
writingItemAt: url,
47-
options: .forReplacing,
48-
error: nil,
49-
byAccessor: touch
36+
localChangeSubscription = databaseChangePublisher
37+
.replaceError(with: ())
38+
.receive(on: queue)
39+
.sink { [weak self] in self?.notifyOtherProcesses() }
40+
41+
remoteChangeSubscription = remoteChange
42+
.throttle(
43+
for: .seconds(1),
44+
scheduler: RunLoop.main,
45+
latest: true
5046
)
51-
}
47+
.receive(on: queue)
48+
.sink { onRemoteChange() }
49+
}
5250

53-
remoteChangeSubscription = remoteChange.throttle(
54-
for: .seconds(1),
55-
scheduler: RunLoop.main,
56-
latest: true
57-
)
58-
.receive(on: queue)
59-
.sink { onRemoteChange() }
51+
deinit {
52+
removeFilePresenter()
6053
}
6154

6255
func start() {
@@ -99,7 +92,14 @@ extension CrossProcessChangeNotifier: NSFilePresenter {
9992
}
10093

10194
private func notifyOtherProcesses() {
102-
localChange.send()
95+
guard let url = changeTrackerURL else { return }
96+
let coordinator = NSFileCoordinator(filePresenter: self)
97+
coordinator.coordinate(
98+
writingItemAt: url,
99+
options: .forReplacing,
100+
error: nil,
101+
byAccessor: touch
102+
)
103103
}
104104

105105
private var touch: (URL?) -> Void {

Sources/SQLite/SQLiteDatabase.swift

Lines changed: 96 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public final class SQLiteDatabase: DatabaseProtocol, @unchecked Sendable {
7373
self.sqliteVersion = sqliteVersion.description
7474
changeNotifier = CrossProcessChangeNotifier(
7575
databasePath: path,
76+
databaseChangePublisher: database.databaseChangePublisher(),
7677
onRemoteChange: { [weak self] in
7778
self?.triggerObservers.send()
7879
}
@@ -85,6 +86,10 @@ public final class SQLiteDatabase: DatabaseProtocol, @unchecked Sendable {
8586
changeNotifier.start()
8687
}
8788

89+
deinit {
90+
changeNotifier.stop()
91+
}
92+
8893
func resume() {
8994
NotificationCenter.default.post(
9095
name: Self.resumeNotification,
@@ -808,6 +813,14 @@ private enum Database {
808813
.mapToSQLiteError(sql: sql)
809814
.eraseToAnyPublisher()
810815
}
816+
817+
func databaseChangePublisher() -> AnyPublisher<Void, Error> {
818+
let observation = DatabaseRegionObservation(tracking: .fullDatabase)
819+
return observation
820+
.publisher(in: writer)
821+
.map { _ in }
822+
.eraseToAnyPublisher()
823+
}
811824
}
812825

813826
private final class SQLitePublisher: Publisher, @unchecked Sendable {
@@ -817,8 +830,6 @@ private final class SQLitePublisher: Publisher, @unchecked Sendable {
817830
private let sql: SQL
818831
private let arguments: SQLiteArguments
819832
private let request: SQLRequest<Row>
820-
private let trigger: AnyPublisher<Void, Never>
821-
private let queue: DispatchQueue
822833

823834
private let subject = CurrentValueSubject<Output, Failure>([])
824835
private var subscriptions = Locked<Set<AnyCancellable>>([])
@@ -836,34 +847,31 @@ private final class SQLitePublisher: Publisher, @unchecked Sendable {
836847
sql: sql,
837848
arguments: arguments.statementArguments
838849
)
839-
self.trigger = trigger
840-
self.queue = queue
841850

842-
var didFetchInitialValue = false
851+
let demands = Locked(Demands { [database, subject] in
852+
do {
853+
subject.send(try database.read(
854+
sql, arguments: arguments
855+
))
856+
} catch {
857+
subject.send(completion: .failure(error))
858+
}
859+
})
860+
843861
let observationSub = DatabaseRegionObservation(tracking: [request])
844862
.publisher(in: database.writer)
863+
.handleEvents(receiveRequest: { demand in
864+
demands.access { demands in
865+
demands.receiveObservationDemand(demand)
866+
}
867+
})
845868
.receive(on: queue)
846869
.tryMap { _ in try database.read(sql, arguments: arguments) }
847-
.handleEvents(
848-
receiveRequest: { [subject] demand in
849-
// NOTE: To match the previous version's behavior, we
850-
// fetch the initial value as soon as a downstream
851-
// publisher requests some values.
852-
guard !didFetchInitialValue,
853-
demand > .none
854-
else { return }
855-
856-
didFetchInitialValue = true
857-
858-
do {
859-
subject.send(try database.read(
860-
sql, arguments: arguments
861-
))
862-
} catch {
863-
subject.send(completion: .failure(error))
864-
}
870+
.handleEvents(receiveRequest: { demand in
871+
demands.access { demands in
872+
demands.receiveObservationDownstreamDemand(demand)
865873
}
866-
)
874+
})
867875
.sink(
868876
receiveCompletion: { [subject] completion in
869877
subject.send(completion: completion)
@@ -874,6 +882,11 @@ private final class SQLitePublisher: Publisher, @unchecked Sendable {
874882
)
875883

876884
let triggerSub = trigger
885+
.handleEvents(receiveRequest: { demand in
886+
demands.access { demands in
887+
demands.receiveTriggerDemand(demand)
888+
}
889+
})
877890
.receive(on: queue)
878891
.tryMap { _ in try database.read(sql, arguments: arguments) }
879892
.sink(
@@ -963,3 +976,62 @@ private extension GRDB.Database {
963976
}
964977
}
965978
}
979+
980+
private enum Demands {
981+
struct Source: OptionSet {
982+
let rawValue: Int
983+
984+
static let observation = Source(rawValue: 1 << 0)
985+
static let observationDownstream = Source(rawValue: 1 << 1)
986+
static let trigger = Source(rawValue: 1 << 2)
987+
988+
var isComplete: Bool {
989+
self == [.observation, .observationDownstream, .trigger]
990+
}
991+
}
992+
993+
case finished
994+
case waiting(Source, () -> Void)
995+
996+
init(_ block: @escaping () -> Void) {
997+
self = .waiting(Source(), block)
998+
}
999+
1000+
mutating func receiveObservationDemand(
1001+
_ demand: Subscribers.Demand
1002+
) {
1003+
receiveDemand(demand, source: .observation)
1004+
}
1005+
1006+
mutating func receiveObservationDownstreamDemand(
1007+
_ demand: Subscribers.Demand
1008+
) {
1009+
receiveDemand(demand, source: .observationDownstream)
1010+
}
1011+
1012+
mutating func receiveTriggerDemand(
1013+
_ demand: Subscribers.Demand
1014+
) {
1015+
receiveDemand(demand, source: .trigger)
1016+
}
1017+
1018+
private mutating func receiveDemand(
1019+
_ demand: Subscribers.Demand,
1020+
source: Source
1021+
) {
1022+
guard case .waiting(var sources, let block) = self,
1023+
demand > .none
1024+
else {
1025+
return
1026+
}
1027+
1028+
sources.insert(source)
1029+
1030+
if sources.isComplete {
1031+
self = .finished
1032+
block()
1033+
} else {
1034+
self = .waiting(sources, block)
1035+
}
1036+
}
1037+
}

Sources/SQLite/SQLiteError.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,11 @@ extension SQLiteError: CustomStringConvertible {
137137

138138
public extension SQLiteError {
139139
var isBusy: Bool { self == .SQLITE_BUSY }
140-
var isDatabaseClosed: Bool { self == .SQLITE_MISUSE }
140+
var isDatabaseClosed: Bool {
141+
self == .SQLITE_MISUSE // Database is closed error
142+
|| self == .SQLITE_INTERRUPT // Database is suspended
143+
|| self == .SQLITE_ABORT // Database is suspended
144+
}
141145

142146
var isInterrupt: Bool {
143147
switch self {

Tests/SQLiteTests/CrossProcessChangeNotifierTests.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ final class CrossProcessChangeNotifierTests: XCTestCase {
5151
arguments: ["id": .integer(1), "text": .text("one")]
5252
)
5353

54-
wait(for: [ex1, ex2], timeout: 4) // Coordinated writes can be very slow
54+
// Coordinated writes can be very slow
55+
wait(for: [ex1, ex2], timeout: 4)
5556
}
5657
}
5758

Tests/SQLiteTests/SQLitePublisherTests.swift

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,45 @@ final class SQLitePublisherTests: XCTestCase {
5252

5353
try db.write(Person.createTable)
5454

55+
let expectedOutput = Locked([
56+
(0, []),
57+
(1, [_person1]),
58+
(2, [_person1, _person2]),
59+
])
5560
let ex = db
5661
.publisher(Person.self, Person.getAll)
5762
.removeDuplicates()
58-
.expectOutput(
59-
[[], [_person1], [_person1, _person2]],
60-
failsOnCompletion: true
61-
)
62-
63-
try db.write(Person.insert, arguments: _person1.asArguments)
64-
65-
db.suspend()
66-
db.resume()
67-
68-
try db.write(Person.insert, arguments: _person2.asArguments)
63+
.expectOutput({ [_person1, _person2] people in
64+
let (i, expected) = expectedOutput.access { $0.removeFirst() }
65+
XCTAssertEqual(expected, people)
66+
67+
switch i {
68+
case 0:
69+
try db.write(
70+
Person.insert,
71+
arguments: _person1.asArguments
72+
)
73+
74+
db.suspend()
75+
db.resume()
76+
77+
return .moreExpected
78+
79+
case 1:
80+
try db.write(
81+
Person.insert,
82+
arguments: _person2.asArguments
83+
)
84+
return .moreExpected
85+
86+
case 2:
87+
return .finished
88+
89+
default:
90+
XCTFail()
91+
return .finished
92+
}
93+
}, failsOnCompletion: true)
6994

7095
wait(for: [ex], timeout: 2)
7196
}

0 commit comments

Comments
 (0)