2017-04-25 119 views
0

我正在使用AxonFramework实现JGroups,我指的是this链接。我在代码中做了一些更改,并在没有Docker的情况下运行该项目。以下是我的代码 -将CommandGateway的所有事件路由到单个事件处理程序

主类 -

public class ClusterRunner { 

    public static void main(String[] args) { 

     Thread t1 = new Thread(new PrimaryNode()); 
     Thread t2 = new Thread(new SecondaryNode()); 

     t1.start(); 
     t2.start(); 
    } 
} 

主节点 -

import org.axonframework.commandhandling.AggregateAnnotationCommandHandler; 
import org.axonframework.commandhandling.CommandBus; 
import org.axonframework.commandhandling.SimpleCommandBus; 
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy; 
import org.axonframework.commandhandling.distributed.DistributedCommandBus; 
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll; 
import org.axonframework.commandhandling.gateway.CommandGateway; 
import org.axonframework.commandhandling.gateway.DefaultCommandGateway; 
import org.axonframework.commandhandling.model.Repository; 
import org.axonframework.eventsourcing.EventSourcingRepository; 
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; 
import org.axonframework.eventsourcing.eventstore.EventStore; 
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; 
import org.axonframework.jgroups.commandhandling.JGroupsConnector; 
import org.axonframework.serialization.xml.XStreamSerializer; 
import org.jgroups.JChannel; 

public class PrimaryNode implements Runnable { 

    private JGroupsConnector connector; 

    private CommandGateway commandGateway; 

    private EventStore eventStore; 

    private CommandBus commandBus; 

    public PrimaryNode() { 

     eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine()); 

     try { 

      commandBus = configureDistributedCommandBus(); 

     } catch (Exception e) { 

      e.printStackTrace(); 
     } 

     Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore); 

     new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus); 

     commandGateway = new DefaultCommandGateway(commandBus); 
    } 

    public void run() { 

     for (int a = 0; a < 5; a++) { 

      System.out.println("Primary Node Created item " + a + " id: " + System.currentTimeMillis()); 
      commandGateway.sendAndWait(new CreateItem(Long.toString(a), Long.toString(System.currentTimeMillis()))); 
     } 
    } 

    private CommandBus configureDistributedCommandBus() throws Exception { 

     CommandBus commandBus = new SimpleCommandBus(); 

     JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml")); 

     connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), 
      new AnnotationRoutingStrategy()); 
     connector.updateMembership(100, AcceptAll.INSTANCE); 

     connector.connect(); 
     connector.awaitJoined(); 

     return new DistributedCommandBus(connector, connector); 
    } 
} 

Seconday节点 -

import org.axonframework.commandhandling.AggregateAnnotationCommandHandler; 
import org.axonframework.commandhandling.CommandBus; 
import org.axonframework.commandhandling.SimpleCommandBus; 
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy; 
import org.axonframework.commandhandling.distributed.DistributedCommandBus; 
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll; 
import org.axonframework.commandhandling.gateway.CommandGateway; 
import org.axonframework.commandhandling.gateway.DefaultCommandGateway; 
import org.axonframework.commandhandling.model.Repository; 
import org.axonframework.eventhandling.EventListener; 
import org.axonframework.eventhandling.SimpleEventHandlerInvoker; 
import org.axonframework.eventhandling.SubscribingEventProcessor; 
import org.axonframework.eventsourcing.EventSourcingRepository; 
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; 
import org.axonframework.eventsourcing.eventstore.EventStore; 
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; 
import org.axonframework.jgroups.commandhandling.JGroupsConnector; 
import org.axonframework.serialization.xml.XStreamSerializer; 
import org.jgroups.JChannel; 

public class SecondaryNode implements Runnable { 

    private JGroupsConnector connector; 

    private EventStore eventStore; 

    public SecondaryNode() { 

     eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine()); 

     CommandBus commandBus = null; 

     try { 
      commandBus = configureDistributedCommandBus(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore); 

     new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus); 

     @SuppressWarnings("unused") 
     CommandGateway commandGateway = new DefaultCommandGateway(commandBus); 
    } 

    public void run() { 

     new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> { 

      System.out.println("Secondary Node -- " + event.getPayload()); 
     }), eventStore).start(); 
    } 

    private CommandBus configureDistributedCommandBus() throws Exception { 

     CommandBus commandBus = new SimpleCommandBus(); 

     JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp_test.xml")); 

     connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), 
       new AnnotationRoutingStrategy()); 
     connector.updateMembership(100, AcceptAll.INSTANCE); 

     connector.connect(); 
     connector.awaitJoined(); 

     return new DistributedCommandBus(connector, connector); 
    } 
} 

项目 -

import org.axonframework.commandhandling.CommandHandler; 
import org.axonframework.commandhandling.TargetAggregateIdentifier; 
import org.axonframework.commandhandling.model.AggregateIdentifier; 
import org.axonframework.eventhandling.EventHandler; 

import static org.axonframework.commandhandling.model.AggregateLifecycle.apply; 

class CreateItem { 

    @TargetAggregateIdentifier 
    private final String itemId; 
    private final String name; 

    public CreateItem(String itemId, String naam) { 
     this.itemId = itemId; 
     this.name = naam; 
    } 

    public String getItemId() { 
     return itemId; 
    } 

    public String getName() { 
     return name; 
    } 
} 

class ItemCreated { 
    private final String itemId; 
    private final String name; 

    public ItemCreated(String itemId, String naam) { 
     this.itemId = itemId; 
     this.name = naam; 
    } 

    public String getItemId() { 
     return itemId; 
    } 

    public String getName() { 
     return name; 
    } 

    @Override 
    public String toString() { 

     return itemId + " " + name; 
    } 
} 

class Item { 
    @AggregateIdentifier 
    private String itemId; 
    private String name; 

    public Item() { 

    } 

    @CommandHandler 
    public Item(CreateItem createItem) { 
     apply(new ItemCreated(createItem.getItemId(), createItem.getName())); 
    } 

    @EventHandler 
    public void itemCreated(ItemCreated itemCreated) { 
     itemId = itemCreated.getItemId(); 
     name = itemCreated.getName(); 
    } 
} 

现在我的问题是,当我运行的主类,主节点产生5个事件,但辅助节点没有得到所有的事件。它可能会得到2或3或4个事件,但不是全部。我希望将所有事件传递到辅助节点。我对AxonFramework和JGroups很新。请帮我理解这里有什么问题。

回答

0

因此,在尝试了一切后,我决定尝试路由策略。我决定使用AbstractRoutingStrategy,它基本上可以帮助决策制定没有决定性目标的命令消息。以下是JGroup的主节点(发件人)中的工作代码。从PrimaryNode类作为修改configureDistributedCommandBus()方法 -

private CommandBus configureDistributedCommandBus() throws Exception { 

    CommandBus commandBus = new SimpleCommandBus(); 

    channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml")); 

    RoutingStrategy rs = new AbstractRoutingStrategy(UnresolvedRoutingKeyPolicy.STATIC_KEY) { 

     @Override 
     protected String doResolveRoutingKey(CommandMessage<?> cmdMsg) { 

      View view = channel.getView(); 

      if (view.getMembers().size() == 2) { 

       return "secondary"; 

      } else if (view.getMembers().size() == 1) { 

      } 

      return cmdMsg.getIdentifier(); 
     } 
    }; 

    connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), rs); 
    connector.updateMembership(100, AcceptAll.INSTANCE); 

    connector.connect(); 
    connector.awaitJoined(); 

    return new DistributedCommandBus(connector, connector); 
} 

由于我使用的JGroups的,我可以得到集群的角度来看,许多节点即如何在那里。在此基础上,我将决定命令消息路由。

1

默认情况下,Axon会将每个事件处理程序订阅到事件总线(在您的情况下是EmbeddedEventStore)。这意味着当特定的本地实例发布一个事件时会调用一个处理程序。并且该事件在处理命令时发布。所以基本上,在处理该命令的节点上调用事件处理程序。

或者,您可以配置您的事件处理程序以“跟踪”模式运行。在这种情况下,他们将打开与Event Store的连接。在这种情况下,根据确切的配置,每个节点都可以获取自己的事件副本,而不管它在哪里发布。