Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions buildSrc/src/main/java/Dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ object Versions {
const val okhttp = "4.3.1"
const val protobuf = "3.11.0"
const val syftProto = "0.0.8"
const val retrofit = "2.7.1"
const val kotlinConverter = "0.4.0"

// release management
const val netflixPublishing = "14.0.0"
Expand All @@ -45,15 +47,19 @@ object ProjectDependencies {
const val androidGradlePlugin = "com.android.tools.build:gradle:${Versions.gradle}"
const val kotlinGradlePlugin = "org.jetbrains.kotlin:kotlin-gradle-plugin:${Versions.kotlin}"
const val kotlinSerialization = "org.jetbrains.kotlin:kotlin-serialization:${Versions.kotlin}"
const val netflixPublishingPlugin = "com.netflix.nebula:nebula-publishing-plugin:${Versions.netflixPublishing}"
const val netflixReleasePlugin = "com.netflix.nebula:nebula-release-plugin:${Versions.netflixRelease}"
const val netflixBintrayPlugin = "com.netflix.nebula:nebula-bintray-plugin:${Versions.netflixBintray}"
const val netflixPublishingPlugin = "com.netflix.nebula:nebula-publishing-plugin:" +
Versions.netflixPublishing
const val netflixReleasePlugin = "com.netflix.nebula:nebula-release-plugin:" +
Versions.netflixRelease
const val netflixBintrayPlugin = "com.netflix.nebula:nebula-bintray-plugin:" +
Versions.netflixBintray
}

object CommonDependencies {
const val appCompat = "androidx.appcompat:appcompat:${Versions.appCompat}"
const val coreKtx = "androidx.core:core-ktx:${Versions.coreKtx}"
const val kotlinSerialization = "org.jetbrains.kotlinx:kotlinx-serialization-runtime:${Versions.kotlinSerialization}"
const val kotlinSerializationFactory = "com.jakewharton.retrofit:retrofit2-kotlinx-serialization-converter:${Versions.kotlinConverter}"
const val rxJava = "io.reactivex.rxjava2:rxjava:${Versions.rxJava}"
const val rxAndroid = "io.reactivex.rxjava2:rxandroid:${Versions.rxAndroid}"
const val espresso = "androidx.test.espresso:espresso-core:${Versions.espresso}"
Expand All @@ -74,4 +80,6 @@ object SyftlibDependencies {
const val syftProto = "org.openmined.kotlinsyft:syft-proto-jvm:${Versions.syftProto}"
const val protobuf = "com.google.protobuf:protobuf-java:${Versions.protobuf}"
const val junitJupiter = "org.junit.jupiter:junit-jupiter:${Versions.junitJupiter}"
const val retrofit = "com.squareup.retrofit2:retrofit:${Versions.retrofit}"
const val retrofitAdapter = "com.squareup.retrofit2:adapter-rxjava2:${Versions.retrofit}"
}
23 changes: 16 additions & 7 deletions demo-app/src/main/java/org/openmined/syft/demo/StandaloneDemo.kt
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package org.openmined.syft.demo

import io.reactivex.Scheduler
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
import org.openmined.syft.Syft
import org.openmined.syft.networking.clients.SignallingClient
import org.openmined.syft.networking.requests.Protocol
import org.openmined.syft.networking.clients.HttpClient
import org.openmined.syft.networking.clients.SocketClient
import org.openmined.syft.threading.ProcessSchedulers

@ExperimentalUnsignedTypes
fun main() {
val syft = Syft.getInstance(SignallingClient(
Protocol.WSS,
"echo.websocket.org",
2000u
), object : ProcessSchedulers {
val networkingSchedulers = object : ProcessSchedulers {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need two different sets of schedulers? If using just one, wouldn't it work as expected?

Copy link
Copy Markdown
Member Author

@vkkhare vkkhare Mar 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is we need different type of thread scheduling for computation and networking. We can spawn multiple threads for networking however compute usually would be restricted to single thread. Then many would require the subscriber to run on mainThread example for network response. While a single compute thread might be busy. This makes the distinction clear for the user and provides larger flexibility

override val computeThreadScheduler: Scheduler
get() = Schedulers.io()
override val calleeThreadScheduler: Scheduler
get() = AndroidSchedulers.mainThread()
}
val computeSchedulers = object : ProcessSchedulers {
override val computeThreadScheduler: Scheduler
get() = Schedulers.computation()
override val calleeThreadScheduler: Scheduler
get() = Schedulers.single()
}
val syft = Syft.getInstance(
SocketClient(
"echo.websocket.org",
2000u
, computeSchedulers
), HttpClient("echo.websocket.org"), computeSchedulers, networkingSchedulers
)
}
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Sat Jan 11 12:13:19 GMT 2020
#Fri Feb 28 13:29:55 CET 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-all.zip
4 changes: 3 additions & 1 deletion syftlib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ dependencies {
implementation CommonDependencies.appCompat
implementation CommonDependencies.coreKtx
implementation CommonDependencies.kotlinSerialization

implementation SyftlibDependencies.webrtc
implementation CommonDependencies.rxJava
implementation CommonDependencies.rxAndroid
implementation SyftlibDependencies.okhttp
implementation CommonDependencies.kotlinSerializationFactory
implementation SyftlibDependencies.retrofitAdapter
implementation SyftlibDependencies.retrofit

implementation SyftlibDependencies.syftProto
implementation SyftlibDependencies.protobuf
Expand Down
182 changes: 126 additions & 56 deletions syftlib/src/main/java/org/openmined/syft/Syft.kt
Original file line number Diff line number Diff line change
@@ -1,85 +1,155 @@
package org.openmined.syft

import android.util.Log
import io.reactivex.Completable
import io.reactivex.disposables.CompositeDisposable
import org.openmined.syft.networking.clients.NetworkMessage
import org.openmined.syft.networking.clients.SignallingClient
import org.openmined.syft.networking.datamodels.AuthenticationSuccess
import org.openmined.syft.networking.datamodels.CycleResponseData
import org.openmined.syft.networking.datamodels.SocketResponse
import org.openmined.syft.networking.requests.CommunicationDataFactory
import org.openmined.syft.networking.requests.REQUESTS
import org.openmined.syft.networking.clients.HttpClient
import org.openmined.syft.networking.clients.SocketClient
import org.openmined.syft.networking.datamodels.syft.AuthenticationSuccess
import org.openmined.syft.networking.datamodels.syft.CycleRequest
import org.openmined.syft.networking.datamodels.syft.CycleResponseData
import org.openmined.syft.networking.requests.CommunicationAPI
import org.openmined.syft.networking.requests.HttpAPI
import org.openmined.syft.processes.JobStatusSubscriber
import org.openmined.syft.processes.SyftJob
import org.openmined.syft.threading.ProcessSchedulers
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

private const val TAG = "Syft"

@ExperimentalUnsignedTypes
class Syft private constructor(
private val signallingClient: SignallingClient,
private val schedulers: ProcessSchedulers
private val socketClient: SocketClient,
private val httpClient: HttpClient,
//todo this will be removed by syft configuration class
private val computeSchedulers: ProcessSchedulers,
//todo change this to read from syft configuration
private val networkingSchedulers: ProcessSchedulers

) {
companion object {
@Volatile
private var INSTANCE: Syft? = null

fun getInstance(signallingClient: SignallingClient, schedulers: ProcessSchedulers): Syft =
fun getInstance(
socketClient: SocketClient,
httpClient: HttpClient,
networkingSchedulers: ProcessSchedulers,
//todo this will be removed by syft configuration class
computeSchedulers: ProcessSchedulers
): Syft =
INSTANCE ?: synchronized(this) {
INSTANCE ?: Syft(
signallingClient,
schedulers
socketClient,
httpClient,
networkingSchedulers,
computeSchedulers
).also { INSTANCE = it }
}
}

private lateinit var workerId: String
private val workerJobs = ConcurrentHashMap<SyftJob.JobID, SyftJob>()
private val compositeDisposable = CompositeDisposable()

private val workerJobs = mutableListOf<SyftJob>()
private val compositeDisposable = CompositeDisposable().add(
signallingClient.start()
.map {
when (it) {
is NetworkMessage.SocketOpen ->
signallingClient.send(REQUESTS.AUTHENTICATION)
//todo decide if this can be changed by pygrid or will remain same irrespective of the requests we make
Comment thread
mccorby marked this conversation as resolved.
@Volatile
lateinit var workerId: String

is NetworkMessage.SocketClosed -> Log.d(
TAG,
"Socket was closed successfully"
)
is NetworkMessage.SocketError -> Log.e(TAG, "socket error", it.throwable)
is NetworkMessage.MessageReceived -> handleResponse(
CommunicationDataFactory.deserializeSocket(
it.message
)
fun newJob(
model: String,
version: String? = null
): SyftJob {
val job = SyftJob(this, computeSchedulers, networkingSchedulers, model, version)
val jobId = SyftJob.JobID(model, version)
workerJobs[jobId] = job
job.subscribe(object : JobStatusSubscriber() {
override fun onComplete() {
workerJobs.remove(jobId)
}

override fun onError(throwable: Throwable) {
workerJobs.remove(jobId)
}
}, networkingSchedulers)

return job
}

fun requestCycle(job: SyftJob) {
if (this::workerId.isInitialized)
Comment thread
mccorby marked this conversation as resolved.
socketClient.getCycle(
CycleRequest(
workerId,
job.modelName,
job.version,
getPing(),
getDownloadSpeed(),
getUploadSpeed()
)
is NetworkMessage.MessageSent -> println("Message sent successfully")
).compose(networkingSchedulers.applySingleSchedulers())
.subscribe { response: CycleResponseData ->
when (response) {
is CycleResponseData.CycleAccept -> handleCycleAccept(response)
is CycleResponseData.CycleReject -> handleCycleReject(response)
}
}
}
.subscribeOn(schedulers.computeThreadScheduler)
.observeOn(schedulers.calleeThreadScheduler)
.subscribe()
)

fun newJob(modelName: String, version: String): SyftJob {
val job = SyftJob(modelName, version)
signallingClient.send(
REQUESTS.CYCLE_REQUEST,
CommunicationDataFactory.requestCycle(workerId, job, "", "", "")
)
workerJobs.add(job)
return job
else {
compositeDisposable.add(socketClient.authenticate()
.compose(networkingSchedulers.applySingleSchedulers())
.subscribe { t: AuthenticationSuccess ->
if (!this::workerId.isInitialized)
setSyftWorkerId(t.workerId)
requestCycle(job)
}
)
}
}

private fun handleResponse(response: SocketResponse) {
when (response.data) {
is AuthenticationSuccess ->
this.workerId = response.data.workerId
is CycleResponseData -> {
when (response.data) {
is CycleResponseData.CycleAccept -> "accept here"
is CycleResponseData.CycleReject -> "set timeout for job"
}
}
fun getDownloader(): HttpAPI = httpClient.apiClient

}
//todo decide this based on configuration
fun getSignallingClient(): CommunicationAPI = socketClient

fun getWebRTCSignallingClient(): SocketClient = socketClient

@Synchronized
private fun setSyftWorkerId(workerId: String) {
if (!this::workerId.isInitialized)
this.workerId = workerId
else if (workerJobs.isEmpty())
this.workerId = workerId
}

private fun getPing() = ""
private fun getDownloadSpeed() = ""
private fun getUploadSpeed() = ""

private fun handleCycleReject(responseData: CycleResponseData.CycleReject) {
var jobId = SyftJob.JobID(responseData.modelName, responseData.version)
val job = workerJobs.getOrElse(jobId, {
jobId = SyftJob.JobID(responseData.modelName)
workerJobs.getValue(jobId)
})
job.cycleStatus.set(SyftJob.CycleStatus.REJECT)
compositeDisposable.add(
Completable
.timer(responseData.timeout.toLong(), TimeUnit.MILLISECONDS)
.compose(networkingSchedulers.applyCompletableSchedulers())
.subscribe {
job.cycleStatus.set(SyftJob.CycleStatus.APPLY)
job.start()
}
)
}

private fun handleCycleAccept(responseData: CycleResponseData.CycleAccept) {
val jobId = SyftJob.JobID(responseData.modelName, responseData.version)
val job = workerJobs.getOrElse(jobId, {
Comment thread
vkkhare marked this conversation as resolved.
workerJobs.getValue(SyftJob.JobID(responseData.modelName))
})
job.setRequestKey(responseData)
job.downloadData()
}


}
38 changes: 0 additions & 38 deletions syftlib/src/main/java/org/openmined/syft/SyftJob.kt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.openmined.syft.networking.clients

import com.jakewharton.retrofit2.converter.kotlinx.serialization.asConverterFactory
import kotlinx.serialization.json.Json
import okhttp3.MediaType.Companion.toMediaType
import org.openmined.syft.networking.requests.HttpAPI
import retrofit2.Retrofit
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory

class HttpClient(baseUrl: String) {
val apiClient: HttpAPI = Retrofit.Builder()
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(Json.asConverterFactory("application/json".toMediaType()))
.baseUrl(baseUrl)
.build().create(HttpAPI::class.java)

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.openmined.syft.networking.clients

sealed class NetworkMessage() {
object SocketClosed : NetworkMessage()
sealed class NetworkMessage {
object SocketOpen : NetworkMessage()
data class SocketError(val throwable: Throwable) : NetworkMessage()
object MessageSent : NetworkMessage()
data class MessageReceived(val message: String) : NetworkMessage()
}
Loading