Skip to content

ClusterClient needs to client.Tell twice, and the second invocation by tell succeeded  #137

@ingted

Description

@ingted
#r "nuget: Akka.Serialization.Hyperion"
#r "nuget: Akka.Cluster"
#r "nuget: Akka.Remote"
#r "nuget: Akka.Cluster.Tools"
#r "nuget: Akkling"
#r "nuget: Akkling.Cluster.Sharding"
#r "nuget: Microsoft.Extensions.Logging"
#r "nuget: Microsoft.Extensions.Logging.Abstractions"

open System
open Akka.Cluster
open Akkling
open Akkling.Cluster
open Akka.Actor


let configWithPort port ifSeeding ifCluster =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
                #akka-pubsub = "Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools"
                akka-cluster-client : "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools"
              }
              serialization-bindings {
                "System.Object" = hyperion
                #"Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, Akka.Cluster.Tools" = akka-pubsub
                #"Akka.Cluster.Tools.PublishSubscribe.Internal.SendToOneSubscriber, Akka.Cluster.Tools" = akka-pubsub
                "Akka.Cluster.Tools.Client.IClusterClientMessage, Akka.Cluster.Tools" : akka-cluster-client
              }
              serialization-identifiers {
                #"Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools" = 9
                "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools" : 15
              }
            }
          remote {
            dot-netty.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
            }
          }""" + 
                    if ifCluster then """
          cluster {
            auto-down-unreachable-after = 5s""" + 
                                                    if ifSeeding then """
            seed-nodes = [ "akka.tcp://cluster-system@localhost:5000/" ]""" 
                                                    else "" + """
          }""" 
                    else "" + """
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"
          }""" +
                    if ifCluster then """
          extensions = ["Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider,Akka.Cluster.Tools"]
          """       else "" + """
        }
        """)
    if ifSeeding then 
        config.WithFallback(Akka.Cluster.Sharding.ClusterSharding.DefaultConfig())
    else
        config
          //.WithFallback(Akka.Cluster.Tools.Singleton.ClusterSingletonManager.DefaultConfig())
          //.WithFallback(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSub.DefaultConfig())

let system0 = Akkling.System.create "cluster-system" <| (configWithPort 5000 true true)
let system1 = Akkling.System.create "cluster-system" <| (configWithPort 5001 true true)
let system2 = Akkling.System.create "cluster-system" <| (configWithPort 5002 false false)
let ttc1:IActorRef<obj> = spawn system1 ("ttc" + Guid.NewGuid().ToString()) (props (Behaviors.printf "%A"))
let ttc2:IActorRef<obj> = spawn system2 ("ttc" + Guid.NewGuid().ToString()) (props (Behaviors.printf "%A"))

let receptionist1 = Akkling.Cluster.ClusterClient.receptionist system0
let receptionist2 = Akkling.Cluster.ClusterClient.receptionist system1

receptionist1.RegisterService (untyped ttc1)
receptionist2.RegisterService (untyped ttc2)

let initialContacts = 
    Collections.Immutable.ImmutableHashSet.Create<ActorPath>(
        [|
            ActorPath.Parse(Cluster.Get(system0).SelfAddress.ToString() + "/system/receptionist")
            ActorPath.Parse(Cluster.Get(system1).SelfAddress.ToString() + "/system/receptionist")
        |]
        )
let contacts = initialContacts :> Collections.Immutable.IImmutableSet<ActorPath>

let settingClient = 
    Akka.Cluster.Tools.Client.ClusterClientSettings.Create(system2).WithInitialContacts(contacts)

let propsClient = Akka.Cluster.Tools.Client.ClusterClient.Props(settingClient)

let client : IActorRef<obj> = 
    spawn system2 ("client" + Guid.NewGuid().ToString()) (Akkling.Props.Props.From propsClient)

let path = "/" +  ttc1.Path.ToString().Replace(ttc1.Path.Root.ToString(), "")

let send = 
    new Akka.Cluster.Tools.Client.ClusterClient.Send(
        path, 
        box "printIt", 
        true)
client.Tell(send, Akka.Actor.ActorRefs.Nobody) //nothing printed
client.Tell(send, Akka.Actor.ActorRefs.Nobody)

With Akkling the first tell doesn't work.

==================================================

#r "nuget: Akka.Serialization.Hyperion"
#r "nuget: Hyperion"
#r "nuget: Akka.Cluster"
#r "nuget: Akka.Remote"
#r "nuget: Akka.Cluster.Tools"
#r "nuget: Akka.Cluster.Sharding"
#r "nuget: Akka.FSharp"
#r "nuget: Microsoft.Extensions.Logging"
#r "nuget: Microsoft.Extensions.Logging.Abstractions"
#r "nuget: System.Collections.Immutable"
#r "nuget: System.Reflection.TypeExtensions"

open System
open Akka.Cluster
open Akka.FSharp
open Akka.Cluster
open Akka.Actor


let configWithPort port ifSeeding ifCluster =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
                #akka-pubsub = "Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools"
                akka-cluster-client : "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools"
              }
              serialization-bindings {
                "System.Object" = hyperion
                #"Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, Akka.Cluster.Tools" = akka-pubsub
                #"Akka.Cluster.Tools.PublishSubscribe.Internal.SendToOneSubscriber, Akka.Cluster.Tools" = akka-pubsub
                "Akka.Cluster.Tools.Client.IClusterClientMessage, Akka.Cluster.Tools" : akka-cluster-client
              }
              serialization-identifiers {
                #"Akka.Cluster.Tools.PublishSubscribe.Serialization.DistributedPubSubMessageSerializer, Akka.Cluster.Tools" = 9
                "Akka.Cluster.Tools.Client.Serialization.ClusterClientMessageSerializer, Akka.Cluster.Tools" : 15
              }
            }
          remote {
            dot-netty.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
            }
          }""" + 
                    if ifCluster then """
          cluster {
            auto-down-unreachable-after = 5s""" + 
                                                    if ifSeeding then """
            seed-nodes = [ "akka.tcp://cluster-system@localhost:5000/" ]""" 
                                                    else "" + """
          }""" 
                    else "" + """
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"
          }""" +
                    if ifCluster then """
          extensions = ["Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider,Akka.Cluster.Tools"]
          """       else "" + """
        }
        """)
    if ifSeeding then 
        config.WithFallback(Akka.Cluster.Sharding.ClusterSharding.DefaultConfig())
    else
        config
          //.WithFallback(Akka.Cluster.Tools.Singleton.ClusterSingletonManager.DefaultConfig())
          //.WithFallback(Akka.Cluster.Tools.PublishSubscribe.DistributedPubSub.DefaultConfig())

let system0 = System.create "cluster-system" <| (configWithPort 5000 true true)
let system1 = System.create "cluster-system" <| (configWithPort 5001 true true)
let system2 = System.create "cluster-system" <| (configWithPort 5002 false false)
let ttc1 = spawn system1 ("ttc-" + Guid.NewGuid().ToString()) (actorOf (fun m -> printfn "%A" m))
let ttc2 = spawn system1 ("ttc-" + Guid.NewGuid().ToString()) (actorOf (fun m -> printfn "%A" m))

Akka.Cluster.Tools.Client.ClusterClientReceptionist.Get(system0).RegisterService(ttc1)
Akka.Cluster.Tools.Client.ClusterClientReceptionist.Get(system1).RegisterService(ttc2)

let initialContacts = 
    Collections.Immutable.ImmutableHashSet.Create<ActorPath>(
        [|
            ActorPath.Parse(Cluster.Get(system0).SelfAddress.ToString() + "/system/receptionist")
            ActorPath.Parse(Cluster.Get(system1).SelfAddress.ToString() + "/system/receptionist")
        |]
        )
let contacts = initialContacts :> Collections.Immutable.IImmutableSet<ActorPath>

let settingClient = 
    Akka.Cluster.Tools.Client.ClusterClientSettings.Create(system2).WithInitialContacts(contacts)

let propsClient = Akka.Cluster.Tools.Client.ClusterClient.Props(settingClient)

let client : IActorRef = 
    system2.ActorOf(propsClient, ("client" + Guid.NewGuid().ToString()))

let path = "/" +  ttc1.Path.ToString().Replace(ttc1.Path.Root.ToString(), "")

let send = 
    new Akka.Cluster.Tools.Client.ClusterClient.Send(
        path, 
        box "printIt", 
        true)
client.Tell(send, Akka.Actor.ActorRefs.Nobody)

With Akka.FSharp, the first tell immediately works.

Not sure if there is anything I missed... but this is a littly weired...

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions