Skip to content

Commit e74caed

Browse files
committed
- TP has its own MessageFactory (https://issues.redhat.com/browse/JGRP-2944)
1 parent 537df4a commit e74caed

22 files changed

+200
-99
lines changed

conf/tcp-new.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,5 @@
2323
<UNICAST4 />
2424
<pbcast.GMS print_local_addr="true" join_timeout="1s"/>
2525
<FRAG2 frag_size="60K" />
26-
<!--RSVP resend_interval="2s" timeout="10s"/-->
2726
<pbcast.STATE_TRANSFER/>
2827
</config>

src/org/jgroups/BatchMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException
152152
msgs=new Message[index]; // a bit of additional space should we add byte arrays
153153
for(int i=0; i < index; i++) {
154154
short type=in.readShort();
155-
msgs[i]=MessageFactory.create(type).setDest(dest()).setSrc(orig_src);
155+
msgs[i]=MessageFactory.get().create(type).setDest(dest()).setSrc(orig_src);
156156
msgs[i].readFrom(in);
157157
}
158158
}

src/org/jgroups/CompositeMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException
152152
msgs=new Message[index]; // a bit of additional space should we add byte arrays
153153
for(int i=0; i < index; i++) {
154154
short type=in.readShort();
155-
Message msg=MessageFactory.create(type).setDest(getDest());
155+
Message msg=MessageFactory.get().create(type).setDest(getDest());
156156
if(msg.getSrc() == null)
157157
msg.setSrc(getSrc());
158158
msg.readFrom(in);

src/org/jgroups/MessageFactory.java

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,33 @@
1212
* @since 5.0
1313
*/
1414
public class MessageFactory {
15-
protected static final byte MIN_TYPE=32;
16-
protected static final Supplier<? extends Message>[] creators=new Supplier[MIN_TYPE];
17-
protected static Map<Short,Supplier<? extends Message>> map=new HashMap<>();
18-
static {
19-
creators[Message.BYTES_MSG]=BytesMessage::new;
20-
creators[Message.NIO_MSG]=NioMessage::new;
21-
creators[Message.EMPTY_MSG]=EmptyMessage::new;
22-
creators[Message.OBJ_MSG]=ObjectMessage::new;
23-
creators[Message.LONG_MSG]=LongMessage::new;
24-
creators[Message.COMPOSITE_MSG]=CompositeMessage::new;
25-
creators[Message.FRAG_MSG]=FragmentedMessage::new;
26-
creators[Message.EARLYBATCH_MSG]=BatchMessage::new;
15+
protected static final byte MIN_TYPE=32;
16+
protected final Supplier<? extends Message>[] creators=new Supplier[MIN_TYPE];
17+
protected final Map<Short,Supplier<? extends Message>> map=new HashMap<>();
18+
19+
private static volatile MessageFactory singleton=null;
20+
21+
public static byte minType() {return MIN_TYPE;}
22+
23+
public synchronized static MessageFactory get() {
24+
MessageFactory mf=singleton;
25+
if(mf != null)
26+
return mf;
27+
if(singleton == null)
28+
singleton=new MessageFactory().registerDefaultTypes();
29+
return singleton;
30+
}
31+
32+
public MessageFactory registerDefaultTypes() {
33+
registerDefaultMessage(Message.BYTES_MSG, BytesMessage::new);
34+
registerDefaultMessage(Message.NIO_MSG, NioMessage::new);
35+
registerDefaultMessage(Message.EMPTY_MSG, EmptyMessage::new);
36+
registerDefaultMessage(Message.OBJ_MSG, ObjectMessage::new);
37+
registerDefaultMessage(Message.LONG_MSG, LongMessage::new);
38+
registerDefaultMessage(Message.COMPOSITE_MSG, CompositeMessage::new);
39+
registerDefaultMessage(Message.FRAG_MSG, FragmentedMessage::new);
40+
registerDefaultMessage(Message.EARLYBATCH_MSG, BatchMessage::new);
41+
return this;
2742
}
2843

2944
/**
@@ -32,25 +47,40 @@ public class MessageFactory {
3247
* @param <T> The type of the message
3348
* @return A message
3449
*/
35-
public static <T extends Message> T create(short type) {
50+
public <T extends Message> T create(short type) {
3651
Supplier<? extends Message> creator=type < MIN_TYPE? creators[type] : map.get(type);
3752
if(creator == null)
3853
throw new IllegalArgumentException("no creator found for type " + type);
3954
return (T)creator.get();
4055
}
4156

57+
public <T extends Message> T createIfExists(short type) {
58+
Supplier<? extends Message> creator=type < MIN_TYPE? creators[type] : map.get(type);
59+
return creator == null? null : (T)creator.get();
60+
}
61+
62+
public void registerDefaultMessage(short type, Supplier<? extends Message> generator) {
63+
Objects.requireNonNull(generator, "the creator must be non-null");
64+
if(type > MIN_TYPE)
65+
throw new IllegalArgumentException(String.format("type (%d) must be <= 32", type));
66+
if(creators[type] != null)
67+
throw new IllegalArgumentException(String.format("type %d is already taken", type));
68+
creators[type]=generator;
69+
}
70+
4271
/**
4372
* Registers a new creator of messages
4473
* @param type The type associated with the new payload. Needs to be the same in all nodes of the same cluster, and
4574
* needs to be available (ie., not taken by JGroups or other applications).
4675
* @param generator The creator of the payload associated with the given type
4776
*/
48-
public static void register(short type, Supplier<? extends Message> generator) {
77+
public void register(short type, Supplier<? extends Message> generator) {
4978
Objects.requireNonNull(generator, "the creator must be non-null");
5079
if(type < MIN_TYPE)
5180
throw new IllegalArgumentException(String.format("type (%d) must be >= 32", type));
5281
if(map.containsKey(type))
5382
throw new IllegalArgumentException(String.format("type %d is already taken", type));
5483
map.put(type, generator);
5584
}
85+
5686
}

src/org/jgroups/protocols/COMPRESS.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,19 @@ public class COMPRESS extends Protocol {
3636

3737
@Property(description="Compression level (from java.util.zip.Deflater) " +
3838
"(0=no compression, 1=best speed, 9=best compression). Default is 9")
39-
protected int compression_level=Deflater.BEST_COMPRESSION; // this is 9
39+
protected int compression_level=Deflater.BEST_COMPRESSION; // this is 9
4040

4141
@Property(description="Minimal payload size of a message (in bytes) for compression to kick in. Default is 500 bytes",
4242
type=AttributeType.BYTES)
43-
protected int min_size=500;
43+
protected int min_size=500;
4444

4545
@Property(description="Number of inflaters/deflaters for concurrent processing. Default is 2 ")
46-
protected int pool_size=2;
46+
protected int pool_size=2;
4747

4848
protected BlockingQueue<Deflater> deflater_pool;
4949
protected BlockingQueue<Inflater> inflater_pool;
5050
protected final LongAdder num_compressions=new LongAdder(), num_decompressions=new LongAdder();
51-
51+
protected TP transport;
5252

5353

5454

@@ -76,6 +76,7 @@ public void init() throws Exception {
7676
inflater_pool=new ArrayBlockingQueue<>(pool_size);
7777
for(int i=0; i < pool_size; i++)
7878
inflater_pool.add(new Inflater());
79+
transport=getTransport();
7980
}
8081

8182
public void destroy() {
@@ -219,9 +220,10 @@ protected static ByteArray messageToByteArray(Message msg) {
219220
}
220221
}
221222

222-
protected static Message messageFromByteArray(byte[] uncompressed_payload) {
223+
protected Message messageFromByteArray(byte[] uncompressed_payload) {
223224
try {
224-
return Util.messageFromBuffer(uncompressed_payload, 0, uncompressed_payload.length);
225+
return Util.messageFromBuffer(uncompressed_payload, 0, uncompressed_payload.length,
226+
transport.getMessageFactory());
225227
}
226228
catch(Exception ex) {
227229
throw new RuntimeException("failed unmarshalling message", ex);

src/org/jgroups/protocols/Encrypt.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public abstract class Encrypt<E extends KeyStore.Entry> extends Protocol {
5555
protected int key_map_max_size=20;
5656

5757
protected volatile View view;
58+
protected TP transport;
5859

5960
// Cipher pools used for encryption and decryption. Size is cipher_pool_size
6061
protected volatile BlockingQueue<Cipher> encoding_ciphers, decoding_ciphers;
@@ -113,6 +114,7 @@ public void init() throws Exception {
113114
}
114115
key_map=new BoundedHashMap<>(key_map_max_size);
115116
initSymCiphers(sym_algorithm, secret_key);
117+
transport=getTransport();
116118
}
117119

118120

@@ -317,7 +319,7 @@ protected Message _decrypt(final Cipher cipher, Key key, Message msg, EncryptHea
317319
decrypted_msg=cipher.doFinal(msg.getArray(), msg.getOffset(), msg.getLength());
318320
}
319321
if(hdr.needsDeserialization())
320-
return Util.messageFromBuffer(decrypted_msg, 0, decrypted_msg.length);
322+
return Util.messageFromBuffer(decrypted_msg, 0, decrypted_msg.length, transport.getMessageFactory());
321323
else
322324
return msg.setArray(decrypted_msg, 0, decrypted_msg.length);
323325
}

src/org/jgroups/protocols/FRAG.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ private Message unfragment(Message msg, FragHeader hdr) {
226226
return null;
227227

228228
try {
229-
Message assembled_msg=Util.messageFromBuffer(buf, 0, buf.length);
229+
Message assembled_msg=Util.messageFromBuffer(buf, 0, buf.length, transport.getMessageFactory());
230230
assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
231231
if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg);
232232
num_received_msgs++;

src/org/jgroups/protocols/FRAG2.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public class FRAG2 extends Fragmentation {
4444

4545
/** Used to assign fragmentation-specific sequence IDs (monotonically increasing) */
4646
protected final AtomicLong curr_id=new AtomicLong(1);
47-
4847
protected final List<Address> members=new ArrayList<>(11);
4948

5049
protected final AverageMinMax avg_size_down=new AverageMinMax();
@@ -66,14 +65,10 @@ public void init() throws Exception {
6665
int old_frag_size=frag_size;
6766
if(frag_size <=0)
6867
throw new Exception("frag_size=" + old_frag_size + ", new frag_size=" + frag_size + ": new frag_size is invalid");
69-
70-
TP transport=getTransport();
71-
if(transport != null) {
72-
int max_bundle_size=transport.getBundler().getMaxSize();
73-
if(frag_size >= max_bundle_size)
74-
throw new IllegalArgumentException("frag_size (" + frag_size + ") has to be < TP.max_bundle_size (" +
75-
max_bundle_size + ")");
76-
}
68+
int max_bundle_size=transport.getBundler().getMaxSize();
69+
if(frag_size >= max_bundle_size)
70+
throw new IllegalArgumentException("frag_size (" + frag_size + ") has to be < TP.max_bundle_size (" +
71+
max_bundle_size + ")");
7772
Map<String,Object> info=new HashMap<>(1);
7873
info.put("frag_size", frag_size);
7974
down_prot.down(new Event(Event.CONFIG, info));
@@ -312,7 +307,7 @@ protected Message assembleMessage(Message[] fragments, boolean needs_deserializa
312307
index+=length;
313308
}
314309
if(needs_deserialization)
315-
retval=Util.messageFromBuffer(combined_buffer, 0, combined_buffer.length);
310+
retval=Util.messageFromBuffer(combined_buffer, 0, combined_buffer.length, transport.getMessageFactory());
316311
else
317312
retval.setArray(combined_buffer, 0, combined_buffer.length);
318313
return retval;

src/org/jgroups/protocols/FRAG3.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ protected boolean isComplete() {
348348
* @return the complete message in one buffer
349349
*/
350350
protected Message assembleMessage() throws Exception {
351-
return needs_deserialization? Util.messageFromBuffer(buffer, 0, buffer.length)
351+
return needs_deserialization? Util.messageFromBuffer(buffer, 0, buffer.length, transport.getMessageFactory())
352352
: msg.setArray(buffer, 0, buffer.length);
353353
}
354354

src/org/jgroups/protocols/FRAG4.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
*/
4040
public class FRAG4 extends FRAG2 {
4141

42+
@Override
43+
public void init() throws Exception {
44+
super.init();
45+
}
4246

4347
protected void fragment(Message msg) {
4448
try {
@@ -82,7 +86,8 @@ protected Message assembleMessage(Message[] fragments, boolean needs_deserializa
8286
m.getOffset(),
8387
m.getLength())));
8488
DataInput in=new DataInputStream(seq);
85-
Message retval=MessageFactory.create(hdr.getOriginalType());
89+
MessageFactory msg_factory=transport.getMessageFactory();
90+
Message retval=Util.createMessage(hdr.getOriginalType(), msg_factory);
8691
retval.readFrom(in);
8792
return retval;
8893
}

0 commit comments

Comments
 (0)