1616import java .net .SocketAddress ;
1717import java .util .Optional ;
1818import java .util .concurrent .CompletableFuture ;
19+ import java .util .concurrent .ConcurrentHashMap ;
1920import java .util .concurrent .atomic .AtomicBoolean ;
2021import lombok .extern .slf4j .Slf4j ;
2122import org .apache .pulsar .broker .PulsarService ;
@@ -41,14 +42,14 @@ public class KafkaTopicManager {
4142
4243 private final AtomicBoolean closed = new AtomicBoolean (false );
4344
44- KafkaTopicManager (KafkaRequestHandler kafkaRequestHandler ) {
45+ KafkaTopicManager (KafkaRequestHandler kafkaRequestHandler , KafkaTopicLookupService kafkaTopicLookupService ) {
4546 this .requestHandler = kafkaRequestHandler ;
4647 PulsarService pulsarService = kafkaRequestHandler .getPulsarService ();
4748 this .brokerService = pulsarService .getBrokerService ();
4849 this .internalServerCnx = new InternalServerCnx (requestHandler );
4950 this .lookupClient = kafkaRequestHandler .getLookupClient ();
50- this .kafkaTopicLookupService = new KafkaTopicLookupService ( pulsarService . getBrokerService ()) ;
51- }
51+ this .kafkaTopicLookupService = kafkaTopicLookupService ;
52+ }
5253
5354 // update Ctx information, since at internalServerCnx create time there is no ctx passed into kafkaRequestHandler.
5455 public void setRemoteAddress (SocketAddress remoteAddress ) {
@@ -72,41 +73,41 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
7273 return CompletableFuture .completedFuture (null );
7374 }
7475 return requestHandler .getKafkaTopicManagerSharedState ().getKafkaTopicConsumerManagerCache ().computeIfAbsent (
75- topicName ,
76- remoteAddress ,
77- () -> {
78- final CompletableFuture <KafkaTopicConsumerManager > tcmFuture = new CompletableFuture <>();
79- getTopic (topicName ).whenComplete ((persistentTopic , throwable ) -> {
80- if (throwable == null && persistentTopic .isPresent ()) {
81- if (log .isDebugEnabled ()) {
82- log .debug ("[{}] Call getTopicConsumerManager for {}, and create TCM for {}." ,
83- requestHandler .ctx .channel (), topicName , persistentTopic );
84- }
85- tcmFuture .complete (new KafkaTopicConsumerManager (requestHandler , persistentTopic .get ()));
86- } else {
87- if (throwable != null ) {
88- log .error ("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' throws {}" ,
89- requestHandler .ctx .channel (), topicName , throwable .getMessage ());
90- } else { // persistentTopic == null
91- log .error ("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' returns empty" ,
92- requestHandler .ctx .channel (), topicName );
76+ topicName ,
77+ remoteAddress ,
78+ () -> {
79+ final CompletableFuture <KafkaTopicConsumerManager > tcmFuture = new CompletableFuture <>();
80+ getTopic (topicName ).whenComplete ((persistentTopic , throwable ) -> {
81+ if (throwable == null && persistentTopic .isPresent ()) {
82+ if (log .isDebugEnabled ()) {
83+ log .debug ("[{}] Call getTopicConsumerManager for {}, and create TCM for {}." ,
84+ requestHandler .ctx .channel (), topicName , persistentTopic );
85+ }
86+ tcmFuture .complete (new KafkaTopicConsumerManager (requestHandler , persistentTopic .get ()));
87+ } else {
88+ if (throwable != null ) {
89+ log .error ("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' throws {}" ,
90+ requestHandler .ctx .channel (), topicName , throwable .getMessage ());
91+ } else { // persistentTopic == null
92+ log .error ("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' returns empty" ,
93+ requestHandler .ctx .channel (), topicName );
94+ }
95+ tcmFuture .complete (null );
9396 }
94- tcmFuture .complete (null );
95- }
96- });
97- return tcmFuture ;
98- }
97+ });
98+ return tcmFuture ;
99+ }
99100 );
100101 }
101102
102103 private Producer registerInPersistentTopic (PersistentTopic persistentTopic ) {
103104 Producer producer = new InternalProducer (persistentTopic , internalServerCnx ,
104- lookupClient .getPulsarClient ().newRequestId (),
105- brokerService .generateUniqueProducerName ());
105+ lookupClient .getPulsarClient ().newRequestId (),
106+ brokerService .generateUniqueProducerName ());
106107
107108 if (log .isDebugEnabled ()) {
108109 log .debug ("[{}] Register Mock Producer {} into PersistentTopic {}" ,
109- requestHandler .ctx .channel (), producer , persistentTopic .getName ());
110+ requestHandler .ctx .channel (), producer , persistentTopic .getName ());
110111 }
111112
112113 // this will register and add USAGE_COUNT_UPDATER.
@@ -122,8 +123,9 @@ public Optional<Producer> registerProducerInPersistentTopic(String topicName, Pe
122123 }
123124 return Optional .empty ();
124125 }
125- return Optional .of (requestHandler .getKafkaTopicManagerSharedState ()
126- .getReferences ().computeIfAbsent (topicName , (__ ) -> registerInPersistentTopic (persistentTopic )));
126+ ConcurrentHashMap <String , Producer > references = requestHandler
127+ .getKafkaTopicManagerSharedState ().getReferences ();
128+ return Optional .of (references .computeIfAbsent (topicName , (__ ) -> registerInPersistentTopic (persistentTopic )));
127129 }
128130
129131 // when channel close, release all the topics reference in persistentTopic
@@ -141,18 +143,23 @@ public void close() {
141143 }
142144
143145 public CompletableFuture <Optional <PersistentTopic >> getTopic (String topicName ) {
144- if (closed .get ()) {
145- if (log .isDebugEnabled ()) {
146- log .debug ("[{}] Return null for getTopic({}) since channel is closing" ,
147- requestHandler .ctx .channel (), topicName );
146+ try {
147+ if (closed .get ()) {
148+ if (log .isDebugEnabled ()) {
149+ log .debug ("[{}] Return null for getTopic({}) since channel is closing" ,
150+ requestHandler .ctx .channel (), topicName );
151+ }
152+ return CompletableFuture .completedFuture (Optional .empty ());
148153 }
154+ CompletableFuture <Optional <PersistentTopic >> topicCompletableFuture =
155+ kafkaTopicLookupService .getTopic (topicName , requestHandler .ctx .channel ());
156+ // cache for removing producer
157+ requestHandler .getKafkaTopicManagerSharedState ().getTopics ().put (topicName , topicCompletableFuture );
158+ return topicCompletableFuture ;
159+ } catch (Throwable error ) {
160+ log .error ("Unhandled error here for {}" , topicName , error );
149161 return CompletableFuture .completedFuture (Optional .empty ());
150162 }
151- CompletableFuture <Optional <PersistentTopic >> topicCompletableFuture =
152- kafkaTopicLookupService .getTopic (topicName , requestHandler .ctx .channel ());
153- // cache for removing producer
154- requestHandler .getKafkaTopicManagerSharedState ().getTopics ().put (topicName , topicCompletableFuture );
155- return topicCompletableFuture ;
156163 }
157164
158165 public void invalidateCacheForFencedManagerLedgerOnTopic (String fullTopicName ) {
0 commit comments