OpenMAXBellagio  0.9.3
omx_base_sink.c
Go to the documentation of this file.
1 
28 #include <omxcore.h>
29 #include <omx_base_sink.h>
30 
33  omx_base_sink_PrivateType* omx_base_sink_Private;
34 
35  if (openmaxStandComp->pComponentPrivate) {
36  omx_base_sink_Private = (omx_base_sink_PrivateType*)openmaxStandComp->pComponentPrivate;
37  } else {
38  omx_base_sink_Private = calloc(1,sizeof(omx_base_sink_PrivateType));
39  if (!omx_base_sink_Private) {
41  }
42  }
43 
44  // we could create our own port structures here
45  // fixme maybe the base class could use a "port factory" function pointer?
46  err = omx_base_component_Constructor(openmaxStandComp,cComponentName);
47 
48  /* here we can override whatever defaults the base_component constructor set
49  * e.g. we can override the function pointers in the private struct */
50  omx_base_sink_Private = openmaxStandComp->pComponentPrivate;
51 
53 
54  return err;
55 }
56 
58 {
59  return omx_base_component_Destructor(openmaxStandComp);
60 }
61 
67 void* omx_base_sink_BufferMgmtFunction (void* param) {
68  OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
69  omx_base_component_PrivateType* omx_base_component_Private = (omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
70  omx_base_sink_PrivateType* omx_base_sink_Private = (omx_base_sink_PrivateType*)omx_base_component_Private;
71  omx_base_PortType *pInPort = (omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX];
72  tsem_t* pInputSem = pInPort->pBufferSem;
73  queue_t* pInputQueue = pInPort->pBufferQueue;
74  OMX_BUFFERHEADERTYPE* pInputBuffer = NULL;
75  OMX_COMPONENTTYPE* target_component;
76  OMX_BOOL isInputBufferNeeded = OMX_TRUE;
77  int inBufExchanged = 0;
78 
79  omx_base_sink_Private->bellagioThreads->nThreadBufferMngtID = (long int)syscall(__NR_gettid);
80  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s the thread ID is %i\n", __func__, (int)omx_base_sink_Private->bellagioThreads->nThreadBufferMngtID);
81 
82  DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__);
83  while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting || omx_base_component_Private->state == OMX_StatePause ||
84  omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){
85 
86  /*Wait till the ports are being flushed*/
87  pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
88  while( PORT_IS_BEING_FLUSHED(pInPort)) {
89  pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
90 
91  if(isInputBufferNeeded==OMX_FALSE) {
92  pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
93  inBufExchanged--;
94  pInputBuffer=NULL;
95  isInputBufferNeeded=OMX_TRUE;
96  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning input buffer\n");
97  }
98  DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__);
99 
100  tsem_up(omx_base_sink_Private->flush_all_condition);
101  tsem_down(omx_base_sink_Private->flush_condition);
102  pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
103  }
104  pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
105 
106  /*No buffer to process. So wait here*/
107  if((pInputSem->semval==0 && isInputBufferNeeded==OMX_TRUE ) &&
108  (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) {
109  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for input buffer \n");
110  tsem_down(omx_base_sink_Private->bMgmtSem);
111  }
112 
113  if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
114  DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
115  break;
116  }
117 
118  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for input buffer semval=%d in %s\n",pInputSem->semval, __func__);
119  if(pInputSem->semval>0 && isInputBufferNeeded==OMX_TRUE ) {
120  tsem_down(pInputSem);
121  if(pInputQueue->nelem>0){
122  inBufExchanged++;
123  isInputBufferNeeded=OMX_FALSE;
124  pInputBuffer = dequeue(pInputQueue);
125  if(pInputBuffer == NULL){
126  DEBUG(DEB_LEV_ERR, "Had NULL input buffer!!\n");
127  break;
128  }
129  }
130  }
131 
132  if(isInputBufferNeeded==OMX_FALSE) {
133  if((pInputBuffer->nFlags & OMX_BUFFERFLAG_EOS) ==OMX_BUFFERFLAG_EOS) {
134  DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in input buffer\n");
135 
136  (*(omx_base_component_Private->callbacks->EventHandler))
137  (openmaxStandComp,
138  omx_base_component_Private->callbackData,
139  OMX_EventBufferFlag, /* The command was completed */
140  0, /* The commands was a OMX_CommandStateSet */
141  pInputBuffer->nFlags, /* The state has been changed in message->messageParam2 */
142  NULL);
143  pInputBuffer->nFlags=0;
144  }
145 
146  target_component=(OMX_COMPONENTTYPE*)pInputBuffer->hMarkTargetComponent;
147  if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
148  /*Clear the mark and generate an event*/
149  (*(omx_base_component_Private->callbacks->EventHandler))
150  (openmaxStandComp,
151  omx_base_component_Private->callbackData,
152  OMX_EventMark, /* The command was completed */
153  1, /* The commands was a OMX_CommandStateSet */
154  0, /* The state has been changed in message->messageParam2 */
155  pInputBuffer->pMarkData);
156  } else if(pInputBuffer->hMarkTargetComponent!=NULL){
157  /*If this is not the target component then pass the mark*/
158  DEBUG(DEB_LEV_FULL_SEQ, "Can't Pass Mark. This is a Sink!!\n");
159  }
160 
161  if((omx_base_sink_Private->state == OMX_StateExecuting) || (omx_base_sink_Private->state == OMX_StateIdle)) {
162  if ((omx_base_sink_Private->BufferMgmtCallback && pInputBuffer->nFilledLen > 0)
163  || (pInputBuffer->nFlags)){
164  (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer);
165  }
166  else {
167  /*If no buffer management call back the explicitly consume input buffer*/
168  pInputBuffer->nFilledLen = 0;
169  }
170  } else {
171  DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%s) TrState (%s)\n",
172  __func__, stateName(omx_base_sink_Private->state),
173  transientStateName(omx_base_component_Private->transientState));
174  if(OMX_TransStateExecutingToIdle == omx_base_component_Private->transientState ||
175  OMX_TransStatePauseToIdle == omx_base_component_Private->transientState) {
176  pInputBuffer->nFilledLen = 0;
177  }
178  }
179  /*Input Buffer has been completely consumed. So, get new input buffer*/
180 
181  if(omx_base_sink_Private->state==OMX_StatePause && !PORT_IS_BEING_FLUSHED(pInPort)) {
182  /*Waiting at paused state*/
183  tsem_wait(omx_base_sink_Private->bStateSem);
184  }
185 
186  /*Input Buffer has been completely consumed. So, return input buffer*/
187  if(pInputBuffer->nFilledLen==0) {
188  pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
189  inBufExchanged--;
190  pInputBuffer=NULL;
191  isInputBufferNeeded = OMX_TRUE;
192  }
193 
194  }
195  }
196  DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
197  return NULL;
198 }
199 
207  OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
208  omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
209  omx_base_sink_PrivateType* omx_base_sink_Private = (omx_base_sink_PrivateType*)omx_base_component_Private;
210  omx_base_PortType *pInPort[2];
211  tsem_t* pInputSem[2];
212  queue_t* pInputQueue[2];
213  OMX_BUFFERHEADERTYPE* pInputBuffer[2];
214  OMX_COMPONENTTYPE* target_component;
215  OMX_BOOL isInputBufferNeeded[2];
216  int i,outBufExchanged[2];
217 
218  pInPort[0]=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX];
219  pInPort[1]=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX_1];
220  pInputSem[0] = pInPort[0]->pBufferSem;
221  pInputSem[1] = pInPort[1]->pBufferSem;
222  pInputQueue[0] = pInPort[0]->pBufferQueue;
223  pInputQueue[1] = pInPort[1]->pBufferQueue;
224  pInputBuffer[1]= pInputBuffer[0]=NULL;
225  isInputBufferNeeded[0]=isInputBufferNeeded[1]=OMX_TRUE;
226  outBufExchanged[0]=outBufExchanged[1]=0;
227 
228  DEBUG(DEB_LEV_FUNCTION_NAME, "In %s\n", __func__);
229  while(omx_base_sink_Private->state == OMX_StateIdle || omx_base_sink_Private->state == OMX_StateExecuting || omx_base_sink_Private->state == OMX_StatePause ||
230  omx_base_sink_Private->transientState == OMX_TransStateLoadedToIdle){
231 
232  /*Wait till the ports are being flushed*/
233  pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
234  while( PORT_IS_BEING_FLUSHED(pInPort[0]) ||
235  PORT_IS_BEING_FLUSHED(pInPort[1])) {
236  pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
237 
238  DEBUG(DEB_LEV_FULL_SEQ, "In %s 1 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
239  __func__,outBufExchanged[0],isInputBufferNeeded[0],outBufExchanged[1],isInputBufferNeeded[1],pInputSem[0]->semval,pInputSem[1]->semval);
240 
241  if(isInputBufferNeeded[1]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pInPort[1])) {
242  pInPort[1]->ReturnBufferFunction(pInPort[1],pInputBuffer[1]);
243  outBufExchanged[1]--;
244  pInputBuffer[1]=NULL;
245  isInputBufferNeeded[1]=OMX_TRUE;
246  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning Input 1 buffer\n");
247  }
248 
249  if(isInputBufferNeeded[0]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pInPort[0])) {
250  pInPort[0]->ReturnBufferFunction(pInPort[0],pInputBuffer[0]);
251  outBufExchanged[0]--;
252  pInputBuffer[0]=NULL;
253  isInputBufferNeeded[0]=OMX_TRUE;
254  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning Input 0 buffer\n");
255  }
256 
257  DEBUG(DEB_LEV_FULL_SEQ, "In %s 2 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
258  __func__,outBufExchanged[0],isInputBufferNeeded[0],outBufExchanged[1],isInputBufferNeeded[1],pInputSem[0]->semval,pInputSem[1]->semval);
259 
260  tsem_up(omx_base_sink_Private->flush_all_condition);
261  tsem_down(omx_base_sink_Private->flush_condition);
262  pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
263  }
264  pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
265 
266  /*No buffer to process. So wait here*/
267  if((isInputBufferNeeded[0]==OMX_TRUE && pInputSem[0]->semval==0) &&
268  (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) {
269  //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
270  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next Input buffer 0\n");
271  tsem_down(omx_base_sink_Private->bMgmtSem);
272 
273  }
274  if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
275  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
276  break;
277  }
278  if((isInputBufferNeeded[1]==OMX_TRUE && pInputSem[1]->semval==0) &&
279  (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid) &&
280  !(PORT_IS_BEING_FLUSHED(pInPort[0]) || PORT_IS_BEING_FLUSHED(pInPort[1]))) {
281  //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
282  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next Input buffer 1\n");
283  tsem_down(omx_base_sink_Private->bMgmtSem);
284 
285  }
286  if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
287  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
288  break;
289  }
290 
291  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for Input buffer 0 semval=%d \n",pInputSem[0]->semval);
292  if(pInputSem[0]->semval>0 && isInputBufferNeeded[0]==OMX_TRUE ) {
293  tsem_down(pInputSem[0]);
294  if(pInputQueue[0]->nelem>0){
295  outBufExchanged[0]++;
296  isInputBufferNeeded[0]=OMX_FALSE;
297  pInputBuffer[0] = dequeue(pInputQueue[0]);
298  if(pInputBuffer[0] == NULL){
299  DEBUG(DEB_LEV_ERR, "Had NULL Input buffer!!\n");
300  break;
301  }
302  }
303  }
304  /*When we have input buffer to process then get one Input buffer*/
305  if(pInputSem[1]->semval>0 && isInputBufferNeeded[1]==OMX_TRUE) {
306  tsem_down(pInputSem[1]);
307  DEBUG(DEB_LEV_FULL_SEQ, "Wait over for Input buffer 1 semval=%d \n",pInputSem[1]->semval);
308  if(pInputQueue[1]->nelem>0){
309  outBufExchanged[1]++;
310  isInputBufferNeeded[1]=OMX_FALSE;
311  pInputBuffer[1] = dequeue(pInputQueue[1]);
312  if(pInputBuffer[1] == NULL){
313  DEBUG(DEB_LEV_ERR, "Had NULL Input buffer!! op is=%d,iq=%d\n",pInputSem[1]->semval,pInputQueue[1]->nelem);
314  break;
315  }
316  }
317  }
318 
319  for(i=0;i < (omx_base_component_Private->sPortTypesParam[OMX_PortDomainAudio].nPorts +
320  omx_base_component_Private->sPortTypesParam[OMX_PortDomainVideo].nPorts +
321  omx_base_component_Private->sPortTypesParam[OMX_PortDomainImage].nPorts +
322  omx_base_component_Private->sPortTypesParam[OMX_PortDomainOther].nPorts);i++) {
323 
324  if(omx_base_sink_Private->ports[i]->sPortParam.eDomain != OMX_PortDomainOther){ /* clock ports are not to be processed */
325  /*Process Input buffer of Port i */
326  if(isInputBufferNeeded[i]==OMX_FALSE) {
327 
328  /*Pass the Mark to all outgoing buffers*/
329  if(omx_base_sink_Private->pMark.hMarkTargetComponent != NULL){
330  pInputBuffer[i]->hMarkTargetComponent = omx_base_sink_Private->pMark.hMarkTargetComponent;
331  pInputBuffer[i]->pMarkData = omx_base_sink_Private->pMark.pMarkData;
332  }
333 
334  target_component=(OMX_COMPONENTTYPE*)pInputBuffer[i]->hMarkTargetComponent;
335  if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
336  /*Clear the mark and generate an event*/
337  (*(omx_base_sink_Private->callbacks->EventHandler))
338  (openmaxStandComp,
339  omx_base_sink_Private->callbackData,
340  OMX_EventMark, /* The command was completed */
341  1, /* The commands was a OMX_CommandStateSet */
342  i, /* The state has been changed in message->messageParam2 */
343  pInputBuffer[i]->pMarkData);
344  } else if(pInputBuffer[i]->hMarkTargetComponent!=NULL){
345  /*If this is not the target component then pass the mark*/
346  //pInputBuffer[i]->pMarkData=NULL;
347  DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n");
348  }
349 
350  if(omx_base_sink_Private->state == OMX_StateExecuting) {
351  if (omx_base_sink_Private->BufferMgmtCallback && pInputBuffer[i]->nFilledLen > 0) {
352  (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer[i]);
353  } else {
354  /*If no buffer management call back then don't produce any Input buffer*/
355  pInputBuffer[i]->nFilledLen = 0;
356  }
357  } else {
358  DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_sink_Private->state);
359 
360  if(OMX_TransStateExecutingToIdle == omx_base_component_Private->transientState ||
361  OMX_TransStatePauseToIdle == omx_base_component_Private->transientState) {
362  pInputBuffer[i]->nFilledLen = 0;
363  }
364  }
365 
366  if((pInputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS && pInputBuffer[i]->nFilledLen==0) {
367  DEBUG(DEB_LEV_FULL_SEQ, "Detected EOS flags in input buffer filled len=%d\n", (int)pInputBuffer[i]->nFilledLen);
368  (*(omx_base_sink_Private->callbacks->EventHandler))
369  (openmaxStandComp,
370  omx_base_sink_Private->callbackData,
371  OMX_EventBufferFlag, /* The command was completed */
372  i, /* The commands was a OMX_CommandStateSet */
373  pInputBuffer[i]->nFlags, /* The state has been changed in message->messageParam2 */
374  NULL);
375  }
376  if(omx_base_sink_Private->state==OMX_StatePause && !(PORT_IS_BEING_FLUSHED(pInPort[0]) || PORT_IS_BEING_FLUSHED(pInPort[1]))) {
377  /*Waiting at paused state*/
378  tsem_wait(omx_base_component_Private->bStateSem);
379  }
380 
381  /*Input Buffer has been produced or EOS. So, return Input buffer and get new buffer*/
382  if(pInputBuffer[i]->nFilledLen ==0 || ((pInputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS)){
383  pInPort[i]->ReturnBufferFunction(pInPort[i],pInputBuffer[i]);
384  outBufExchanged[i]--;
385  pInputBuffer[i]=NULL;
386  isInputBufferNeeded[i]=OMX_TRUE;
387  }
388  }
389  }
390  }
391 
392  /*Clear the Mark*/
393  if(omx_base_sink_Private->pMark.hMarkTargetComponent != NULL){
394  omx_base_sink_Private->pMark.hMarkTargetComponent = NULL;
395  omx_base_sink_Private->pMark.pMarkData = NULL;
396  }
397  }
398  DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
399  return NULL;
400 }
#define OMX_BASE_SINK_INPUTPORT_INDEX
Definition: omx_base_sink.h:36
OMX_ERRORTYPE omx_base_sink_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
Definition: omx_base_sink.c:57
void tsem_wait(tsem_t *tsem)
Definition: tsemaphore.c:131
#define OMX_BASE_SINK_INPUTPORT_INDEX_1
Definition: omx_base_sink.h:38
OMX_ERRORTYPE omx_base_component_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName)
The base constructor for the OpenMAX ST components.
#define DEB_LEV_SIMPLE_SEQ
#define PORT_IS_BEING_FLUSHED(pPort)
Definition: omx_base_port.h:39
char * stateName(OMX_STATETYPE state)
Definition: utils.c:29
void * omx_base_sink_BufferMgmtFunction(void *param)
Definition: omx_base_sink.c:67
#define DEBUG(n, fmt, args...)
queue_t * pBufferQueue
char * OMX_STRING
Definition: OMX_Types.h:206
OMX_BOOL
Definition: OMX_Types.h:189
#define DEB_LEV_ERR
void tsem_up(tsem_t *tsem)
Definition: tsemaphore.c:110
void tsem_down(tsem_t *tsem)
Definition: tsemaphore.c:97
void *(* BufferMgmtFunction)(void *param)
Definition: omx_base_sink.h:50
Definition: queue.h:43
#define OMX_BUFFERFLAG_EOS
Definition: OMX_Core.h:299
OMX_ERRORTYPE(* ReturnBufferFunction)(omx_base_PortType *openmaxStandPort, OMX_BUFFERHEADERTYPE *pBuffer)
OMX_ERRORTYPE omx_base_component_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
The base destructor for ST OpenMAX components.
OMX_ERRORTYPE err
#define DEB_LEV_FULL_SEQ
OMX_HANDLETYPE hMarkTargetComponent
Definition: OMX_Core.h:417
OMX_PTR pComponentPrivate
#define DEB_LEV_FUNCTION_NAME
void * omx_base_sink_twoport_BufferMgmtFunction(void *param)
char * transientStateName(int state)
Definition: utils.c:55
OMX_ERRORTYPE omx_base_sink_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName)
Definition: omx_base_sink.c:31
void * dequeue(queue_t *queue)
Definition: queue.c:122
OMX_PORT_PARAM_TYPE sPortTypesParam[4]
OMX_ERRORTYPE(* EventHandler)(OMX_IN OMX_HANDLETYPE hComponent, OMX_IN OMX_PTR pAppData, OMX_IN OMX_EVENTTYPE eEvent, OMX_IN OMX_U32 nData1, OMX_IN OMX_U32 nData2, OMX_IN OMX_PTR pEventData)
Definition: OMX_Core.h:530
OMX_ERRORTYPE
Definition: OMX_Core.h:126

Generated for OpenMAX Bellagio rel. 0.9.3 by  doxygen 1.5.1
SourceForge.net Logo